diff options
author | Sam Wilkins <samwilkins333@gmail.com> | 2020-01-11 11:23:11 -0500 |
---|---|---|
committer | Sam Wilkins <samwilkins333@gmail.com> | 2020-01-11 11:23:11 -0500 |
commit | 120fa84b3e8c794dd882d3613067c5b18ee7ba04 (patch) | |
tree | 36964a80e2043d9948a993a903b6efec257ae2f2 | |
parent | 126f05056b64fa98e9b13210eedae711bfc3f38f (diff) |
typed messages and handlers
-rw-r--r-- | src/server/DashSession/DashSessionAgent.ts | 6 | ||||
-rw-r--r-- | src/server/session/agents/message_router.ts | 4 | ||||
-rw-r--r-- | src/server/session/agents/monitor.ts | 100 | ||||
-rw-r--r-- | src/server/session/agents/server_worker.ts | 8 | ||||
-rw-r--r-- | src/server/session/utilities/ipc.ts | 13 |
5 files changed, 68 insertions, 63 deletions
diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index b7e741525..23d421835 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -33,9 +33,9 @@ export class DashSessionAgent extends AppliedSessionAgent { monitor.addReplCommand("solr", [/start|stop|index/], this.executeSolrCommand); monitor.addReplCommand("backup", [], this.backup); monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.addMessageListener("backup", this.backup); - monitor.addMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.onCrashDetected(this.dispatchCrashReport); + monitor.on("backup", this.backup); + monitor.on("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); + monitor.hooks.crashDetected(this.dispatchCrashReport); } /** diff --git a/src/server/session/agents/message_router.ts b/src/server/session/agents/message_router.ts index 5848e27ab..707f771d9 100644 --- a/src/server/session/agents/message_router.ts +++ b/src/server/session/agents/message_router.ts @@ -8,7 +8,7 @@ export default abstract class MessageRouter { * Add a listener at this message. When the monitor process * receives a message, it will invoke all registered functions. */ - public addMessageListener = (name: string, handler: MessageHandler, exclusive = false) => { + public on = (name: string, handler: MessageHandler, exclusive = false) => { const handlers = this.onMessage[name]; if (exclusive || !handlers) { this.onMessage[name] = [handler]; @@ -20,7 +20,7 @@ export default abstract class MessageRouter { /** * Unregister a given listener at this message. */ - public removeMessageListener = (name: string, handler: MessageHandler) => { + public off = (name: string, handler: MessageHandler) => { const handlers = this.onMessage[name]; if (handlers) { const index = handlers.indexOf(handler); diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index 5f4543606..ccba8199e 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -2,7 +2,7 @@ import { ExitHandler } from "./applied_session_agent"; import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config"; import Repl, { ReplAction } from "../utilities/repl"; import { isWorker, setupMaster, on, Worker, fork } from "cluster"; -import { PromisifiedIPCManager, suffix, IPC, MessageHandler, Message } from "../utilities/ipc"; +import { PromisifiedIPCManager, suffix, IPC, MessageHandler } from "../utilities/ipc"; import { red, cyan, white, yellow, blue } from "colors"; import { exec, ExecOptions } from "child_process"; import { validate, ValidationError } from "jsonschema"; @@ -40,8 +40,54 @@ export class Monitor extends MessageRouter { } } - public onCrashDetected = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.CrashDetected, listener); - public onServerRunning = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.ServerRunning, listener); + private constructor(sessionKey: string) { + super(); + this.config = this.loadAndValidateConfiguration(); + this.initialize(sessionKey); + this.repl = this.initializeRepl(); + } + + private initialize = (sessionKey: string) => { + console.log(this.timestamp(), cyan("initializing session...")); + this.key = sessionKey; + + // determines whether or not we see the compilation / initialization / runtime output of each child server process + const output = this.config.showServerOutput ? "inherit" : "ignore"; + setupMaster({ stdio: ["ignore", output, output, "ipc"] }); + + // handle exceptions in the master thread - there shouldn't be many of these + // the IPC (inter process communication) channel closed exception can't seem + // to be caught in a try catch, and is inconsequential, so it is ignored + process.on("uncaughtException", ({ message, stack }): void => { + if (message !== "Channel closed") { + this.mainLog(red(message)); + if (stack) { + this.mainLog(`uncaught exception\n${red(stack)}`); + } + } + }); + + // a helpful cluster event called on the master thread each time a child process exits + on("exit", ({ process: { pid } }, code, signal) => { + const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`; + this.mainLog(cyan(prompt)); + // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one + this.spawn(); + }); + } + + public finalize = (): void => { + if (this.finalized) { + throw new Error("Session monitor is already finalized"); + } + this.finalized = true; + this.spawn(); + } + + public readonly hooks = Object.freeze({ + crashDetected: (listener: MessageHandler<{ error: Error }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener), + serverRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener) + }); /** * Kill this session and its active child @@ -91,48 +137,6 @@ export class Monitor extends MessageRouter { }); } - private constructor(sessionKey: string) { - super(); - console.log(this.timestamp(), cyan("initializing session...")); - this.key = sessionKey; - this.config = this.loadAndValidateConfiguration(); - - // determines whether or not we see the compilation / initialization / runtime output of each child server process - const output = this.config.showServerOutput ? "inherit" : "ignore"; - setupMaster({ stdio: ["ignore", output, output, "ipc"] }); - - // handle exceptions in the master thread - there shouldn't be many of these - // the IPC (inter process communication) channel closed exception can't seem - // to be caught in a try catch, and is inconsequential, so it is ignored - process.on("uncaughtException", ({ message, stack }): void => { - if (message !== "Channel closed") { - this.mainLog(red(message)); - if (stack) { - this.mainLog(`uncaught exception\n${red(stack)}`); - } - } - }); - - // a helpful cluster event called on the master thread each time a child process exits - on("exit", ({ process: { pid } }, code, signal) => { - const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`; - this.mainLog(cyan(prompt)); - // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one - this.spawn(); - }); - - this.repl = this.initializeRepl(); - Monitor.IPCManager.setRouter(this.route); - } - - public finalize = (): void => { - if (this.finalized) { - throw new Error("Session monitor is already finalized"); - } - this.finalized = true; - this.spawn(); - } - /** * Generates a blue UTC string associated with the time * of invocation. @@ -282,8 +286,10 @@ export class Monitor extends MessageRouter { Monitor.IPCManager = IPC(this.activeWorker); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); - this.addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true); - this.addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); + this.on("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true); + this.on("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); + + Monitor.IPCManager.setRouter(this.route); } } diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index 2c77cfb29..50abe398d 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -1,6 +1,6 @@ import { ExitHandler } from "./applied_session_agent"; import { isMaster } from "cluster"; -import { PromisifiedIPCManager } from "../utilities/ipc"; +import { PromisifiedIPCManager, Message } from "../utilities/ipc"; import MessageRouter from "./message_router"; import { red, green, white, yellow } from "colors"; import { get } from "request-promise"; @@ -80,11 +80,11 @@ export class ServerWorker extends MessageRouter { private configureProcess = () => { ServerWorker.IPCManager.setRouter(this.route); // updates the local values of variables to the those sent from master - this.addMessageListener("updatePollingInterval", ({ args }) => { + this.on("updatePollingInterval", ({ args }: Message<{ newPollingIntervalSeconds: number }>) => { this.pollingIntervalSeconds = args.newPollingIntervalSeconds; return new Promise<void>(resolve => setTimeout(resolve, 1000 * 10)); }); - this.addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => { + this.on("manualExit", async ({ args: { isSessionEnd } }: Message<{ isSessionEnd: boolean }>) => { await this.executeExitHandlers(isSessionEnd); process.exit(0); }); @@ -135,7 +135,7 @@ export class ServerWorker extends MessageRouter { if (!this.shouldServerBeResponsive) { // notify monitor thread that the server is up and running this.lifecycleNotification(green(`listening on ${this.serverPort}...`)); - this.emitToMonitor(Monitor.IntrinsicEvents.ServerRunning, { firstTime: !this.isInitialized }); + this.emitToMonitor(Monitor.IntrinsicEvents.ServerRunning, { isFirstTime: !this.isInitialized }); this.isInitialized = true; } this.shouldServerBeResponsive = true; diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts index fd8bf6075..7ad00596d 100644 --- a/src/server/session/utilities/ipc.ts +++ b/src/server/session/utilities/ipc.ts @@ -6,13 +6,12 @@ export type Router = (message: Message) => void | Promise<void>; export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix; -export interface Message { +export interface Message<T = any> { name: string; - args?: any; + args: T; } -type InternalMessage = Message & { metadata: any }; - -export type MessageHandler<T extends Message = Message> = (message: T) => any | Promise<any>; +type InternalMessage<T = any> = Message<T> & { metadata: any }; +export type MessageHandler<A = any, T extends Message<A> = Message<A>> = (message: T) => any | Promise<any>; export class PromisifiedIPCManager { private readonly target: IPCTarget; @@ -32,10 +31,10 @@ export class PromisifiedIPCManager { const messageId = Utils.GenerateGuid(); const metadata: any = {}; metadata[this.ipc_id] = messageId; - const responseHandler: MessageHandler<InternalMessage> = ({ args, metadata }) => { + const responseHandler: MessageHandler<any, InternalMessage> = ({ metadata, args }) => { if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) { this.target.removeListener("message", responseHandler); - resolve(args.error as Error | undefined); + resolve(args?.error as Error | undefined); } }; this.target.addListener("message", responseHandler); |