From 86f1e0f58940904b8c55284f6787e7422a6665ff Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sat, 11 Jan 2020 13:42:06 -0500 Subject: refactor --- src/server/session/agents/message_router.ts | 45 --------------------- src/server/session/agents/monitor.ts | 17 ++++---- .../session/agents/process_message_router.ts | 46 ++++++++++++++++++++++ src/server/session/agents/server_worker.ts | 18 +++++---- 4 files changed, 64 insertions(+), 62 deletions(-) delete mode 100644 src/server/session/agents/message_router.ts create mode 100644 src/server/session/agents/process_message_router.ts (limited to 'src/server/session/agents') diff --git a/src/server/session/agents/message_router.ts b/src/server/session/agents/message_router.ts deleted file mode 100644 index 707f771d9..000000000 --- a/src/server/session/agents/message_router.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { MessageHandler, Message } from "../utilities/ipc"; - -export default abstract class MessageRouter { - - private onMessage: { [name: string]: MessageHandler[] | undefined } = {}; - - /** - * Add a listener at this message. When the monitor process - * receives a message, it will invoke all registered functions. - */ - public on = (name: string, handler: MessageHandler, exclusive = false) => { - const handlers = this.onMessage[name]; - if (exclusive || !handlers) { - this.onMessage[name] = [handler]; - } else { - handlers.push(handler); - } - } - - /** - * Unregister a given listener at this message. - */ - public off = (name: string, handler: MessageHandler) => { - const handlers = this.onMessage[name]; - if (handlers) { - const index = handlers.indexOf(handler); - if (index > -1) { - handlers.splice(index, 1); - } - } - } - - /** - * Unregister all listeners at this message. - */ - public clearMessageListeners = (...names: string[]) => names.map(name => this.onMessage[name] = undefined); - - protected route: MessageHandler = async ({ name, args }) => { - const handlers = this.onMessage[name]; - if (handlers) { - await Promise.all(handlers.map(handler => handler({ name, args }))); - } - } - -} \ No newline at end of file diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index ccba8199e..d4abbb51e 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -8,14 +8,13 @@ import { exec, ExecOptions } from "child_process"; import { validate, ValidationError } from "jsonschema"; import { Utilities } from "../utilities/utilities"; import { readFileSync } from "fs"; -import MessageRouter from "./message_router"; +import ProcessMessageRouter from "./process_message_router"; /** * Validates and reads the configuration file, accordingly builds a child process factory * and spawns off an initial process that will respawn as predecessors die. */ -export class Monitor extends MessageRouter { - private static IPCManager: PromisifiedIPCManager; +export class Monitor extends ProcessMessageRouter { private static count = 0; private finalized = false; private exitHandlers: ExitHandler[] = []; @@ -84,9 +83,9 @@ export class Monitor extends MessageRouter { this.spawn(); } - public readonly hooks = Object.freeze({ - crashDetected: (listener: MessageHandler<{ error: Error }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener), - serverRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener) + public readonly coreHooks = Object.freeze({ + onCrashDetected: (listener: MessageHandler<{ error: Error }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener), + onServerRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener) }); /** @@ -219,7 +218,7 @@ export class Monitor extends MessageRouter { if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) { this.config.polling.intervalSeconds = newPollingIntervalSeconds; if (args[2] === "true") { - return Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds }, true); + return Monitor.IPCManager.emitPromise("updatePollingInterval", { newPollingIntervalSeconds }); } } } @@ -286,8 +285,8 @@ export class Monitor extends MessageRouter { Monitor.IPCManager = IPC(this.activeWorker); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); - this.on("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true); - this.on("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); + this.on("kill", ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode), true); + this.on("lifecycle", ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); Monitor.IPCManager.setRouter(this.route); } diff --git a/src/server/session/agents/process_message_router.ts b/src/server/session/agents/process_message_router.ts new file mode 100644 index 000000000..f60343514 --- /dev/null +++ b/src/server/session/agents/process_message_router.ts @@ -0,0 +1,46 @@ +import { MessageHandler, PromisifiedIPCManager } from "../utilities/ipc"; + +export default abstract class ProcessMessageRouter { + + protected static IPCManager: PromisifiedIPCManager; + private onMessage: { [name: string]: MessageHandler[] | undefined } = {}; + + /** + * Add a listener at this message. When the monitor process + * receives a message, it will invoke all registered functions. + */ + public on = (name: string, handler: MessageHandler, exclusive = false) => { + const handlers = this.onMessage[name]; + if (exclusive || !handlers) { + this.onMessage[name] = [handler]; + } else { + handlers.push(handler); + } + } + + /** + * Unregister a given listener at this message. + */ + public off = (name: string, handler: MessageHandler) => { + const handlers = this.onMessage[name]; + if (handlers) { + const index = handlers.indexOf(handler); + if (index > -1) { + handlers.splice(index, 1); + } + } + } + + /** + * Unregister all listeners at this message. + */ + public clearMessageListeners = (...names: string[]) => names.map(name => this.onMessage[name] = undefined); + + protected route: MessageHandler = async ({ name, args }) => { + const handlers = this.onMessage[name]; + if (handlers) { + await Promise.all(handlers.map(handler => handler(args))); + } + } + +} \ No newline at end of file diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index 50abe398d..23ffb2650 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -1,7 +1,7 @@ import { ExitHandler } from "./applied_session_agent"; import { isMaster } from "cluster"; -import { PromisifiedIPCManager, Message } from "../utilities/ipc"; -import MessageRouter from "./message_router"; +import { PromisifiedIPCManager, Message, MessageHandler } from "../utilities/ipc"; +import ProcessMessageRouter from "./process_message_router"; import { red, green, white, yellow } from "colors"; import { get } from "request-promise"; import { Monitor } from "./monitor"; @@ -11,8 +11,7 @@ import { Monitor } from "./monitor"; * if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification * email if the server encounters an uncaught exception or if the server cannot be reached. */ -export class ServerWorker extends MessageRouter { - private static IPCManager = new PromisifiedIPCManager(process); +export class ServerWorker extends ProcessMessageRouter { private static count = 0; private shouldServerBeResponsive = false; private exitHandlers: ExitHandler[] = []; @@ -56,10 +55,13 @@ export class ServerWorker extends MessageRouter { * A convenience wrapper to tell the session monitor (parent process) * to carry out the action with the specified message and arguments. */ - public emitToMonitor = (name: string, args?: any, awaitResponse = false) => ServerWorker.IPCManager.emit(name, args, awaitResponse); + public emitToMonitor = (name: string, args?: any) => ServerWorker.IPCManager.emit(name, args); + + public emitToMonitorPromise = (name: string, args?: any) => ServerWorker.IPCManager.emitPromise(name, args); private constructor(work: Function) { super(); + ServerWorker.IPCManager = new PromisifiedIPCManager(process); this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env; @@ -80,11 +82,11 @@ export class ServerWorker extends MessageRouter { private configureProcess = () => { ServerWorker.IPCManager.setRouter(this.route); // updates the local values of variables to the those sent from master - this.on("updatePollingInterval", ({ args }: Message<{ newPollingIntervalSeconds: number }>) => { - this.pollingIntervalSeconds = args.newPollingIntervalSeconds; + this.on("updatePollingInterval", ({ newPollingIntervalSeconds }) => { + this.pollingIntervalSeconds = newPollingIntervalSeconds; return new Promise(resolve => setTimeout(resolve, 1000 * 10)); }); - this.on("manualExit", async ({ args: { isSessionEnd } }: Message<{ isSessionEnd: boolean }>) => { + this.on("manualExit", async ({ isSessionEnd }) => { await this.executeExitHandlers(isSessionEnd); process.exit(0); }); -- cgit v1.2.3-70-g09d2