aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Wilkins <samwilkins333@gmail.com>2020-01-11 11:23:11 -0500
committerSam Wilkins <samwilkins333@gmail.com>2020-01-11 11:23:11 -0500
commit120fa84b3e8c794dd882d3613067c5b18ee7ba04 (patch)
tree36964a80e2043d9948a993a903b6efec257ae2f2
parent126f05056b64fa98e9b13210eedae711bfc3f38f (diff)
typed messages and handlers
-rw-r--r--src/server/DashSession/DashSessionAgent.ts6
-rw-r--r--src/server/session/agents/message_router.ts4
-rw-r--r--src/server/session/agents/monitor.ts100
-rw-r--r--src/server/session/agents/server_worker.ts8
-rw-r--r--src/server/session/utilities/ipc.ts13
5 files changed, 68 insertions, 63 deletions
diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts
index b7e741525..23d421835 100644
--- a/src/server/DashSession/DashSessionAgent.ts
+++ b/src/server/DashSession/DashSessionAgent.ts
@@ -33,9 +33,9 @@ export class DashSessionAgent extends AppliedSessionAgent {
monitor.addReplCommand("solr", [/start|stop|index/], this.executeSolrCommand);
monitor.addReplCommand("backup", [], this.backup);
monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient));
- monitor.addMessageListener("backup", this.backup);
- monitor.addMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient));
- monitor.onCrashDetected(this.dispatchCrashReport);
+ monitor.on("backup", this.backup);
+ monitor.on("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient));
+ monitor.hooks.crashDetected(this.dispatchCrashReport);
}
/**
diff --git a/src/server/session/agents/message_router.ts b/src/server/session/agents/message_router.ts
index 5848e27ab..707f771d9 100644
--- a/src/server/session/agents/message_router.ts
+++ b/src/server/session/agents/message_router.ts
@@ -8,7 +8,7 @@ export default abstract class MessageRouter {
* Add a listener at this message. When the monitor process
* receives a message, it will invoke all registered functions.
*/
- public addMessageListener = (name: string, handler: MessageHandler, exclusive = false) => {
+ public on = (name: string, handler: MessageHandler, exclusive = false) => {
const handlers = this.onMessage[name];
if (exclusive || !handlers) {
this.onMessage[name] = [handler];
@@ -20,7 +20,7 @@ export default abstract class MessageRouter {
/**
* Unregister a given listener at this message.
*/
- public removeMessageListener = (name: string, handler: MessageHandler) => {
+ public off = (name: string, handler: MessageHandler) => {
const handlers = this.onMessage[name];
if (handlers) {
const index = handlers.indexOf(handler);
diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts
index 5f4543606..ccba8199e 100644
--- a/src/server/session/agents/monitor.ts
+++ b/src/server/session/agents/monitor.ts
@@ -2,7 +2,7 @@ import { ExitHandler } from "./applied_session_agent";
import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config";
import Repl, { ReplAction } from "../utilities/repl";
import { isWorker, setupMaster, on, Worker, fork } from "cluster";
-import { PromisifiedIPCManager, suffix, IPC, MessageHandler, Message } from "../utilities/ipc";
+import { PromisifiedIPCManager, suffix, IPC, MessageHandler } from "../utilities/ipc";
import { red, cyan, white, yellow, blue } from "colors";
import { exec, ExecOptions } from "child_process";
import { validate, ValidationError } from "jsonschema";
@@ -40,8 +40,54 @@ export class Monitor extends MessageRouter {
}
}
- public onCrashDetected = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.CrashDetected, listener);
- public onServerRunning = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.ServerRunning, listener);
+ private constructor(sessionKey: string) {
+ super();
+ this.config = this.loadAndValidateConfiguration();
+ this.initialize(sessionKey);
+ this.repl = this.initializeRepl();
+ }
+
+ private initialize = (sessionKey: string) => {
+ console.log(this.timestamp(), cyan("initializing session..."));
+ this.key = sessionKey;
+
+ // determines whether or not we see the compilation / initialization / runtime output of each child server process
+ const output = this.config.showServerOutput ? "inherit" : "ignore";
+ setupMaster({ stdio: ["ignore", output, output, "ipc"] });
+
+ // handle exceptions in the master thread - there shouldn't be many of these
+ // the IPC (inter process communication) channel closed exception can't seem
+ // to be caught in a try catch, and is inconsequential, so it is ignored
+ process.on("uncaughtException", ({ message, stack }): void => {
+ if (message !== "Channel closed") {
+ this.mainLog(red(message));
+ if (stack) {
+ this.mainLog(`uncaught exception\n${red(stack)}`);
+ }
+ }
+ });
+
+ // a helpful cluster event called on the master thread each time a child process exits
+ on("exit", ({ process: { pid } }, code, signal) => {
+ const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`;
+ this.mainLog(cyan(prompt));
+ // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one
+ this.spawn();
+ });
+ }
+
+ public finalize = (): void => {
+ if (this.finalized) {
+ throw new Error("Session monitor is already finalized");
+ }
+ this.finalized = true;
+ this.spawn();
+ }
+
+ public readonly hooks = Object.freeze({
+ crashDetected: (listener: MessageHandler<{ error: Error }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener),
+ serverRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener)
+ });
/**
* Kill this session and its active child
@@ -91,48 +137,6 @@ export class Monitor extends MessageRouter {
});
}
- private constructor(sessionKey: string) {
- super();
- console.log(this.timestamp(), cyan("initializing session..."));
- this.key = sessionKey;
- this.config = this.loadAndValidateConfiguration();
-
- // determines whether or not we see the compilation / initialization / runtime output of each child server process
- const output = this.config.showServerOutput ? "inherit" : "ignore";
- setupMaster({ stdio: ["ignore", output, output, "ipc"] });
-
- // handle exceptions in the master thread - there shouldn't be many of these
- // the IPC (inter process communication) channel closed exception can't seem
- // to be caught in a try catch, and is inconsequential, so it is ignored
- process.on("uncaughtException", ({ message, stack }): void => {
- if (message !== "Channel closed") {
- this.mainLog(red(message));
- if (stack) {
- this.mainLog(`uncaught exception\n${red(stack)}`);
- }
- }
- });
-
- // a helpful cluster event called on the master thread each time a child process exits
- on("exit", ({ process: { pid } }, code, signal) => {
- const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`;
- this.mainLog(cyan(prompt));
- // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one
- this.spawn();
- });
-
- this.repl = this.initializeRepl();
- Monitor.IPCManager.setRouter(this.route);
- }
-
- public finalize = (): void => {
- if (this.finalized) {
- throw new Error("Session monitor is already finalized");
- }
- this.finalized = true;
- this.spawn();
- }
-
/**
* Generates a blue UTC string associated with the time
* of invocation.
@@ -282,8 +286,10 @@ export class Monitor extends MessageRouter {
Monitor.IPCManager = IPC(this.activeWorker);
this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`));
- this.addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true);
- this.addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true);
+ this.on("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true);
+ this.on("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true);
+
+ Monitor.IPCManager.setRouter(this.route);
}
}
diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts
index 2c77cfb29..50abe398d 100644
--- a/src/server/session/agents/server_worker.ts
+++ b/src/server/session/agents/server_worker.ts
@@ -1,6 +1,6 @@
import { ExitHandler } from "./applied_session_agent";
import { isMaster } from "cluster";
-import { PromisifiedIPCManager } from "../utilities/ipc";
+import { PromisifiedIPCManager, Message } from "../utilities/ipc";
import MessageRouter from "./message_router";
import { red, green, white, yellow } from "colors";
import { get } from "request-promise";
@@ -80,11 +80,11 @@ export class ServerWorker extends MessageRouter {
private configureProcess = () => {
ServerWorker.IPCManager.setRouter(this.route);
// updates the local values of variables to the those sent from master
- this.addMessageListener("updatePollingInterval", ({ args }) => {
+ this.on("updatePollingInterval", ({ args }: Message<{ newPollingIntervalSeconds: number }>) => {
this.pollingIntervalSeconds = args.newPollingIntervalSeconds;
return new Promise<void>(resolve => setTimeout(resolve, 1000 * 10));
});
- this.addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => {
+ this.on("manualExit", async ({ args: { isSessionEnd } }: Message<{ isSessionEnd: boolean }>) => {
await this.executeExitHandlers(isSessionEnd);
process.exit(0);
});
@@ -135,7 +135,7 @@ export class ServerWorker extends MessageRouter {
if (!this.shouldServerBeResponsive) {
// notify monitor thread that the server is up and running
this.lifecycleNotification(green(`listening on ${this.serverPort}...`));
- this.emitToMonitor(Monitor.IntrinsicEvents.ServerRunning, { firstTime: !this.isInitialized });
+ this.emitToMonitor(Monitor.IntrinsicEvents.ServerRunning, { isFirstTime: !this.isInitialized });
this.isInitialized = true;
}
this.shouldServerBeResponsive = true;
diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts
index fd8bf6075..7ad00596d 100644
--- a/src/server/session/utilities/ipc.ts
+++ b/src/server/session/utilities/ipc.ts
@@ -6,13 +6,12 @@ export type Router = (message: Message) => void | Promise<void>;
export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix;
-export interface Message {
+export interface Message<T = any> {
name: string;
- args?: any;
+ args: T;
}
-type InternalMessage = Message & { metadata: any };
-
-export type MessageHandler<T extends Message = Message> = (message: T) => any | Promise<any>;
+type InternalMessage<T = any> = Message<T> & { metadata: any };
+export type MessageHandler<A = any, T extends Message<A> = Message<A>> = (message: T) => any | Promise<any>;
export class PromisifiedIPCManager {
private readonly target: IPCTarget;
@@ -32,10 +31,10 @@ export class PromisifiedIPCManager {
const messageId = Utils.GenerateGuid();
const metadata: any = {};
metadata[this.ipc_id] = messageId;
- const responseHandler: MessageHandler<InternalMessage> = ({ args, metadata }) => {
+ const responseHandler: MessageHandler<any, InternalMessage> = ({ metadata, args }) => {
if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) {
this.target.removeListener("message", responseHandler);
- resolve(args.error as Error | undefined);
+ resolve(args?.error as Error | undefined);
}
};
this.target.addListener("message", responseHandler);