diff options
author | Sam Wilkins <samwilkins333@gmail.com> | 2020-01-11 02:51:33 -0500 |
---|---|---|
committer | Sam Wilkins <samwilkins333@gmail.com> | 2020-01-11 02:51:33 -0500 |
commit | 7a20f573f4f428bfc779797d437fa9525b6976f8 (patch) | |
tree | 5d8d1028f3ee21d279066e3a410659dbd79ad791 /src | |
parent | 2c83f136771794565350d229a238b3f01cc60aca (diff) |
standardized ipc message
Diffstat (limited to 'src')
-rw-r--r-- | src/server/session/agents/monitor.ts | 86 | ||||
-rw-r--r-- | src/server/session/agents/server_worker.ts | 38 | ||||
-rw-r--r-- | src/server/session/utilities/ipc.ts | 88 |
3 files changed, 95 insertions, 117 deletions
diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index cd09c9e41..18fa6df24 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -2,7 +2,7 @@ import { ExitHandler } from "./applied_session_agent"; import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config"; import Repl, { ReplAction } from "../utilities/repl"; import { isWorker, setupMaster, on, Worker, fork } from "cluster"; -import { PromisifiedIPCManager, suffix } from "../utilities/ipc"; +import { PromisifiedIPCManager, suffix, IPC, Message } from "../utilities/ipc"; import { red, cyan, white, yellow, blue } from "colors"; import { exec, ExecOptions } from "child_process"; import { Utils } from "../../../Utils"; @@ -16,28 +16,21 @@ import { EventEmitter } from "events"; * and spawns off an initial process that will respawn as predecessors die. */ export class Monitor extends EventEmitter { - private static readonly localIPCManager = new PromisifiedIPCManager(process); - private static childIPCManager: PromisifiedIPCManager; + private static IPCManager: PromisifiedIPCManager; private static count = 0; private finalized = false; private exitHandlers: ExitHandler[] = []; private readonly config: Configuration; - private onMessage: { [message: string]: Monitor.ServerMessageHandler[] | undefined } = {}; private activeWorker: Worker | undefined; private key: string | undefined; private repl: Repl; public static Create() { if (isWorker) { - this.localIPCManager.emit({ - action: { - message: "kill", - args: { - reason: "cannot create a monitor on the worker process.", - graceful: false, - errorCode: 1 - } - } + IPC(process).emit("kill", { + reason: "cannot create a monitor on the worker process.", + graceful: false, + errorCode: 1 }); process.exit(1); } else if (++Monitor.count > 1) { @@ -100,37 +93,6 @@ export class Monitor extends EventEmitter { }); } - /** - * Add a listener at this message. When the monitor process - * receives a message, it will invoke all registered functions. - */ - public addServerMessageListener = (message: string, handler: Monitor.ServerMessageHandler) => { - const handlers = this.onMessage[message]; - if (handlers) { - handlers.push(handler); - } else { - this.onMessage[message] = [handler]; - } - } - - /** - * Unregister a given listener at this message. - */ - public removeServerMessageListener = (message: string, handler: Monitor.ServerMessageHandler) => { - const handlers = this.onMessage[message]; - if (handlers) { - const index = handlers.indexOf(handler); - if (index > -1) { - handlers.splice(index, 1); - } - } - } - - /** - * Unregister all listeners at this message. - */ - public clearServerMessageListeners = (message: string) => this.onMessage[message] = undefined; - private constructor() { super(); @@ -255,7 +217,7 @@ export class Monitor extends EventEmitter { if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) { this.config.polling.intervalSeconds = newPollingIntervalSeconds; if (args[2] === "true") { - return Monitor.localIPCManager.emit({ newPollingIntervalSeconds }, true); + return Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds }, true); } } } @@ -271,7 +233,7 @@ export class Monitor extends EventEmitter { private killActiveWorker = (graceful = true, isSessionEnd = false): void => { if (this.activeWorker && !this.activeWorker.isDead()) { if (graceful) { - Monitor.childIPCManager.emit({ manualExit: { isSessionEnd } }); + Monitor.IPCManager.emit("manualExit", { isSessionEnd }); } else { this.activeWorker.process.kill(); } @@ -319,40 +281,20 @@ export class Monitor extends EventEmitter { session_key: this.key, ipc_suffix: suffix }); - Monitor.childIPCManager = new PromisifiedIPCManager(this.activeWorker); + Monitor.IPCManager = IPC(this.activeWorker); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); - this.addServerMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode)); - this.addServerMessageListener(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, ({ args: { error } }) => this.emit(Monitor.IntrinsicEvents.CrashDetected, error)); - this.addServerMessageListener(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, ({ args: { firstTime } }) => this.emit(Monitor.IntrinsicEvents.ServerRunning, firstTime)); - - // an IPC message handler that executes actions on the master thread when prompted by the active worker - Monitor.childIPCManager.addMessagesHandler(async ({ lifecycle, action }) => { - if (action) { - const { message, args } = action as Monitor.Action; - console.log(this.timestamp(), `${this.config.identifiers.worker.text} action requested (${cyan(message)})`); - const handlers = this.onMessage[message]; - if (handlers) { - await Promise.all(handlers.map(handler => handler({ message, args }))); - } - } - if (lifecycle) { - console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${lifecycle})`); - } - }); + const { addMessageListener } = Monitor.IPCManager; + addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode)); + addMessageListener(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, ({ args: { error } }) => this.emit(Monitor.IntrinsicEvents.CrashDetected, error)); + addMessageListener(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, ({ args: { firstTime } }) => this.emit(Monitor.IntrinsicEvents.ServerRunning, firstTime)); + addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`)); } } export namespace Monitor { - export interface Action { - message: string; - args: any; - } - - export type ServerMessageHandler = (action: Action) => any | Promise<any>; - export enum IntrinsicEvents { KeyGenerated = "key_generated", CrashDetected = "crash_detected", diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index b279a19d8..9e471366a 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -11,7 +11,7 @@ import { Monitor } from "./monitor"; * email if the server encounters an uncaught exception or if the server cannot be reached. */ export class ServerWorker { - private static localIPCManager = new PromisifiedIPCManager(process); + private static IPCManager = new PromisifiedIPCManager(process); private static count = 0; private shouldServerBeResponsive = false; private exitHandlers: ExitHandler[] = []; @@ -27,14 +27,10 @@ export class ServerWorker { console.error(red("cannot create a worker on the monitor process.")); process.exit(1); } else if (++ServerWorker.count > 1) { - ServerWorker.localIPCManager.emit({ - action: { - message: "kill", args: { - reason: "cannot create more than one worker on a given worker process.", - graceful: false, - errorCode: 1 - } - } + ServerWorker.IPCManager.emit("kill", { + reason: "cannot create more than one worker on a given worker process.", + graceful: false, + errorCode: 1 }); process.exit(1); } else { @@ -53,13 +49,13 @@ export class ServerWorker { * server worker (child process). This will also kill * this process (child process). */ - public killSession = (reason: string, graceful = true, errorCode = 0) => this.sendMonitorAction("kill", { reason, graceful, errorCode }); + public killSession = (reason: string, graceful = true, errorCode = 0) => this.emitToMonitor("kill", { reason, graceful, errorCode }); /** * A convenience wrapper to tell the session monitor (parent process) * to carry out the action with the specified message and arguments. */ - public sendMonitorAction = (message: string, args?: any, expectResponse = false) => ServerWorker.localIPCManager.emit({ action: { message, args } }, expectResponse); + public emitToMonitor = (name: string, args?: any, expectResponse = false) => ServerWorker.IPCManager.emit(name, args, expectResponse); private constructor(work: Function) { this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); @@ -81,15 +77,11 @@ export class ServerWorker { */ private configureProcess = () => { // updates the local values of variables to the those sent from master - ServerWorker.localIPCManager.addMessagesHandler(async ({ newPollingIntervalSeconds, manualExit }) => { - if (newPollingIntervalSeconds !== undefined) { - this.pollingIntervalSeconds = newPollingIntervalSeconds; - } - if (manualExit !== undefined) { - const { isSessionEnd } = manualExit; - await this.executeExitHandlers(isSessionEnd); - process.exit(0); - } + const { addMessageListener } = ServerWorker.IPCManager; + addMessageListener("updatePollingInterval", ({ args }) => this.pollingIntervalSeconds = args.newPollingIntervalSeconds); + addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => { + await this.executeExitHandlers(isSessionEnd); + process.exit(0); }); // one reason to exit, as the process might be in an inconsistent state after such an exception @@ -109,7 +101,7 @@ export class ServerWorker { /** * Notify master thread (which will log update in the console) of initialization via IPC. */ - public lifecycleNotification = (event: string) => ServerWorker.localIPCManager.emit({ lifecycle: event }); + public lifecycleNotification = (event: string) => ServerWorker.IPCManager.emit("lifecycle", { event }); /** * Called whenever the process has a reason to terminate, either through an uncaught exception @@ -118,7 +110,7 @@ export class ServerWorker { private proactiveUnplannedExit = async (error: Error): Promise<void> => { this.shouldServerBeResponsive = false; // communicates via IPC to the master thread that it should dispatch a crash notification email - this.sendMonitorAction(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, { error }); + this.emitToMonitor(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, { error }); await this.executeExitHandlers(error); // notify master thread (which will log update in the console) of crash event via IPC this.lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`)); @@ -138,7 +130,7 @@ export class ServerWorker { if (!this.shouldServerBeResponsive) { // notify monitor thread that the server is up and running this.lifecycleNotification(green(`listening on ${this.serverPort}...`)); - this.sendMonitorAction(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, { firstTime: !this.isInitialized }); + this.emitToMonitor(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, { firstTime: !this.isInitialized }); this.isInitialized = true; } this.shouldServerBeResponsive = true; diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts index 888377c93..2faf9f63e 100644 --- a/src/server/session/utilities/ipc.ts +++ b/src/server/session/utilities/ipc.ts @@ -2,11 +2,19 @@ import { isMaster } from "cluster"; import { Utils } from "../../../Utils"; export type IPCTarget = NodeJS.EventEmitter & { send?: Function }; -export type Listener = (message: any) => void | Promise<void>; +export type Router = (message: Message) => void | Promise<void>; export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix; +export interface Message { + name: string; + args: any; +} + +export type MessageHandler = (message: Message) => any | Promise<any>; + export class PromisifiedIPCManager { + private onMessage: { [message: string]: MessageHandler[] | undefined } = {}; private readonly target: IPCTarget; private readonly ipc_id = `ipc_id_${suffix}`; private readonly response_expected = `response_expected_${suffix}`; @@ -14,17 +22,66 @@ export class PromisifiedIPCManager { constructor(target: IPCTarget) { this.target = target; + + this.target.addListener("message", async ({ name, args }: Message) => { + let error: Error | undefined; + try { + const handlers = this.onMessage[name]; + if (handlers) { + await Promise.all(handlers.map(handler => handler({ name, args }))); + } + } catch (e) { + error = e; + } + if (args[this.response_expected] && this.target.send) { + const response: any = { error }; + response[this.ipc_id] = args[this.ipc_id]; + response[this.is_response] = true; + this.target.send(response); + } + }); + } + + /** + * Add a listener at this message. When the monitor process + * receives a message, it will invoke all registered functions. + */ + public addMessageListener = (name: string, handler: MessageHandler) => { + const handlers = this.onMessage[name]; + if (handlers) { + handlers.push(handler); + } else { + this.onMessage[name] = [handler]; + } } - public emit = async (message: any, expectResponse = false): Promise<Error | undefined> => { + /** + * Unregister a given listener at this message. + */ + public removeMessageListener = (name: string, handler: MessageHandler) => { + const handlers = this.onMessage[name]; + if (handlers) { + const index = handlers.indexOf(handler); + if (index > -1) { + handlers.splice(index, 1); + } + } + } + + /** + * Unregister all listeners at this message. + */ + public clearMessageListeners = (message: string) => this.onMessage[message] = undefined; + + public emit = async (name: string, args: any, expectResponse = false): Promise<Error | undefined> => { if (!this.target.send) { return new Error("Cannot dispatch when send is undefined."); } - message[this.response_expected] = expectResponse; + args[this.response_expected] = expectResponse; if (expectResponse) { return new Promise(resolve => { const messageId = Utils.GenerateGuid(); - message[this.ipc_id] = messageId; + args[this.ipc_id] = messageId; const responseHandler: (args: any) => void = response => { const { error } = response; if (response[this.is_response] && response[this.ipc_id] === messageId) { @@ -33,28 +90,15 @@ export class PromisifiedIPCManager { } }; this.target.addListener("message", responseHandler); - this.target.send!(message); + this.target.send!({ name, args }); }); } else { - this.target.send(message); + this.target.send({ name, args }); } } - public addMessagesHandler = (handler: Listener): void => { - this.target.addListener("message", async incoming => { - let error: Error | undefined; - try { - await handler(incoming); - } catch (e) { - error = e; - } - if (incoming[this.response_expected] && this.target.send) { - const response: any = { error }; - response[this.ipc_id] = incoming[this.ipc_id]; - response[this.is_response] = true; - this.target.send(response); - } - }); - } +} +export function IPC(target: IPCTarget) { + return new PromisifiedIPCManager(target); }
\ No newline at end of file |