diff options
-rw-r--r-- | src/server/RouteManager.ts | 4 | ||||
-rw-r--r-- | src/server/session/agents/monitor.ts | 18 | ||||
-rw-r--r-- | src/server/session/agents/server_worker.ts | 12 | ||||
-rw-r--r-- | src/server/session/utilities/ipc.ts | 48 |
4 files changed, 46 insertions, 36 deletions
diff --git a/src/server/RouteManager.ts b/src/server/RouteManager.ts index a8ad81bf7..b146d7b72 100644 --- a/src/server/RouteManager.ts +++ b/src/server/RouteManager.ts @@ -68,7 +68,7 @@ export default class RouteManager { console.log('please remove all duplicate routes before continuing'); } if (malformedCount) { - console.log(`please ensure all routes adhere to ^\/$|^\/[A-Za-z]+(\/\:[A-Za-z]+)*$`); + console.log(`please ensure all routes adhere to ^\/$|^\/[A-Za-z]+(\/\:[A-Za-z?]+)*$`); } process.exit(1); } else { @@ -128,7 +128,7 @@ export default class RouteManager { } else { route = subscriber.build; } - if (!/^\/$|^\/[A-Za-z]+(\/\:[A-Za-z]+)*$/g.test(route)) { + if (!/^\/$|^\/[A-Za-z]+(\/\:[A-Za-z?]+)*$/g.test(route)) { this.failedRegistrations.push({ reason: RegistrationError.Malformed, route diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index e1709f5e6..f6738a23f 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -2,8 +2,8 @@ import { ExitHandler } from "./applied_session_agent"; import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config"; import Repl, { ReplAction } from "../utilities/repl"; import { isWorker, setupMaster, on, Worker, fork } from "cluster"; -import { IPC } from "../utilities/ipc"; -import { red, cyan, white, yellow, blue, green } from "colors"; +import { PromisifiedIPCManager, suffix } from "../utilities/ipc"; +import { red, cyan, white, yellow, blue } from "colors"; import { exec, ExecOptions } from "child_process"; import { Utils } from "../../../Utils"; import { validate, ValidationError } from "jsonschema"; @@ -16,7 +16,8 @@ import { EventEmitter } from "events"; * and spawns off an initial process that will respawn as predecessors die. */ export class Monitor extends EventEmitter { - + private static readonly localIPCManager = new PromisifiedIPCManager(process); + private static childIPCManager: PromisifiedIPCManager; private static count = 0; private finalized = false; private exitHandlers: ExitHandler[] = []; @@ -28,7 +29,7 @@ export class Monitor extends EventEmitter { public static Create() { if (isWorker) { - IPC.dispatchMessage(process, { + this.localIPCManager.emit({ action: { message: "kill", args: { @@ -250,7 +251,7 @@ export class Monitor extends EventEmitter { if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) { this.config.polling.intervalSeconds = newPollingIntervalSeconds; if (args[2] === "true") { - return IPC.dispatchMessage(this.activeWorker!, { newPollingIntervalSeconds }, true); + return Monitor.localIPCManager.emit({ newPollingIntervalSeconds }, true); } } } @@ -266,7 +267,7 @@ export class Monitor extends EventEmitter { private killActiveWorker = (graceful = true, isSessionEnd = false): void => { if (this.activeWorker && !this.activeWorker.isDead()) { if (graceful) { - IPC.dispatchMessage(this.activeWorker, { manualExit: { isSessionEnd } }); + Monitor.childIPCManager.emit({ manualExit: { isSessionEnd } }); } else { this.activeWorker.process.kill(); } @@ -312,11 +313,12 @@ export class Monitor extends EventEmitter { socketPort: ports.socket, pollingIntervalSeconds: intervalSeconds, session_key: this.key, - ipc_suffix: IPC.suffix + ipc_suffix: suffix }); + Monitor.childIPCManager = new PromisifiedIPCManager(this.activeWorker); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); // an IPC message handler that executes actions on the master thread when prompted by the active worker - IPC.addMessagesHandler(this.activeWorker!, async ({ lifecycle, action }) => { + Monitor.childIPCManager.addMessagesHandler(async ({ lifecycle, action }) => { if (action) { const { message, args } = action as Monitor.Action; console.log(this.timestamp(), `${this.config.identifiers.worker.text} action requested (${cyan(message)})`); diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index e9fdaf923..278cbb42f 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -1,6 +1,6 @@ import { ExitHandler } from "./applied_session_agent"; import { isMaster } from "cluster"; -import { IPC } from "../utilities/ipc"; +import { PromisifiedIPCManager } from "../utilities/ipc"; import { red, green, white, yellow } from "colors"; import { get } from "request-promise"; import { Monitor } from "./monitor"; @@ -11,7 +11,7 @@ import { Monitor } from "./monitor"; * email if the server encounters an uncaught exception or if the server cannot be reached. */ export class ServerWorker { - + private static localIPCManager = new PromisifiedIPCManager(process); private static count = 0; private shouldServerBeResponsive = false; private exitHandlers: ExitHandler[] = []; @@ -27,7 +27,7 @@ export class ServerWorker { console.error(red("cannot create a worker on the monitor process.")); process.exit(1); } else if (++ServerWorker.count > 1) { - IPC.dispatchMessage(process, { + ServerWorker.localIPCManager.emit({ action: { message: "kill", args: { reason: "cannot create more than one worker on a given worker process.", @@ -59,7 +59,7 @@ export class ServerWorker { * A convenience wrapper to tell the session monitor (parent process) * to carry out the action with the specified message and arguments. */ - public sendMonitorAction = (message: string, args?: any, expectResponse = false) => IPC.dispatchMessage(process, { action: { message, args } }, expectResponse); + public sendMonitorAction = (message: string, args?: any, expectResponse = false) => ServerWorker.localIPCManager.emit({ action: { message, args } }, expectResponse); private constructor(work: Function) { this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); @@ -81,7 +81,7 @@ export class ServerWorker { */ private configureProcess = () => { // updates the local values of variables to the those sent from master - IPC.addMessagesHandler(process, async ({ newPollingIntervalSeconds, manualExit }) => { + ServerWorker.localIPCManager.addMessagesHandler(async ({ newPollingIntervalSeconds, manualExit }) => { if (newPollingIntervalSeconds !== undefined) { this.pollingIntervalSeconds = newPollingIntervalSeconds; } @@ -109,7 +109,7 @@ export class ServerWorker { /** * Notify master thread (which will log update in the console) of initialization via IPC. */ - public lifecycleNotification = (event: string) => IPC.dispatchMessage(process, { lifecycle: event }); + public lifecycleNotification = (event: string) => ServerWorker.localIPCManager.emit({ lifecycle: event }); /** * Called whenever the process has a reason to terminate, either through an uncaught exception diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts index b20f3d337..888377c93 100644 --- a/src/server/session/utilities/ipc.ts +++ b/src/server/session/utilities/ipc.ts @@ -1,50 +1,58 @@ import { isMaster } from "cluster"; import { Utils } from "../../../Utils"; -export namespace IPC { +export type IPCTarget = NodeJS.EventEmitter & { send?: Function }; +export type Listener = (message: any) => void | Promise<void>; - export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix; - const ipc_id = `ipc_id_${suffix}`; - const response_expected = `response_expected_${suffix}`; - const is_response = `is_response_${suffix}`; +export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix; - export async function dispatchMessage(target: NodeJS.EventEmitter & { send?: Function }, message: any, expectResponse = false): Promise<Error | undefined> { - if (!target.send) { +export class PromisifiedIPCManager { + private readonly target: IPCTarget; + private readonly ipc_id = `ipc_id_${suffix}`; + private readonly response_expected = `response_expected_${suffix}`; + private readonly is_response = `is_response_${suffix}`; + + constructor(target: IPCTarget) { + this.target = target; + } + + public emit = async (message: any, expectResponse = false): Promise<Error | undefined> => { + if (!this.target.send) { return new Error("Cannot dispatch when send is undefined."); } - message[response_expected] = expectResponse; + message[this.response_expected] = expectResponse; if (expectResponse) { return new Promise(resolve => { const messageId = Utils.GenerateGuid(); - message[ipc_id] = messageId; + message[this.ipc_id] = messageId; const responseHandler: (args: any) => void = response => { const { error } = response; - if (response[is_response] && response[ipc_id] === messageId) { - target.removeListener("message", responseHandler); + if (response[this.is_response] && response[this.ipc_id] === messageId) { + this.target.removeListener("message", responseHandler); resolve(error); } }; - target.addListener("message", responseHandler); - target.send!(message); + this.target.addListener("message", responseHandler); + this.target.send!(message); }); } else { - target.send(message); + this.target.send(message); } } - export function addMessagesHandler(target: NodeJS.EventEmitter & { send?: Function }, handler: (message: any) => void | Promise<void>): void { - target.addListener("message", async incoming => { + public addMessagesHandler = (handler: Listener): void => { + this.target.addListener("message", async incoming => { let error: Error | undefined; try { await handler(incoming); } catch (e) { error = e; } - if (incoming[response_expected] && target.send) { + if (incoming[this.response_expected] && this.target.send) { const response: any = { error }; - response[ipc_id] = incoming[ipc_id]; - response[is_response] = true; - target.send(response); + response[this.ipc_id] = incoming[this.ipc_id]; + response[this.is_response] = true; + this.target.send(response); } }); } |