diff options
Diffstat (limited to 'src/server/Session/session.ts')
-rw-r--r-- | src/server/Session/session.ts | 79 |
1 files changed, 67 insertions, 12 deletions
diff --git a/src/server/Session/session.ts b/src/server/Session/session.ts index d46e6b6e7..7b194598b 100644 --- a/src/server/Session/session.ts +++ b/src/server/Session/session.ts @@ -133,6 +133,56 @@ export namespace Session { export type ExitHandler = (reason: Error | boolean) => void | Promise<void>; + namespace IPC { + + export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix; + const ipc_id = `ipc_id_${suffix}`; + const response_expected = `response_expected_${suffix}`; + const is_response = `is_response_${suffix}`; + + export async function dispatchMessage(target: NodeJS.EventEmitter & { send?: Function }, message: any, expectResponse = false): Promise<Error | undefined> { + if (!target.send) { + return new Error("Cannot dispatch when send is undefined."); + } + message[response_expected] = expectResponse; + if (expectResponse) { + return new Promise(resolve => { + const messageId = Utils.GenerateGuid(); + message[ipc_id] = messageId; + const responseHandler: (args: any) => void = response => { + const { error } = response; + if (response[is_response] && response[ipc_id] === messageId) { + target.removeListener("message", responseHandler); + resolve(error); + } + }; + target.addListener("message", responseHandler); + target.send!(message); + }); + } else { + target.send(message); + } + } + + export function addMessagesHandler(target: NodeJS.EventEmitter & { send?: Function }, handler: (message: any) => void | Promise<void>): void { + target.addListener("message", async incoming => { + let error: Error | undefined; + try { + await handler(incoming); + } catch (e) { + error = e; + } + if (incoming[response_expected] && target.send) { + const response: any = { error }; + response[ipc_id] = incoming[ipc_id]; + response[is_response] = true; + target.send(response); + } + }); + } + + } + export namespace Monitor { export interface NotifierHooks { @@ -166,7 +216,7 @@ export namespace Session { public static Create(notifiers?: Monitor.NotifierHooks) { if (isWorker) { - process.send?.({ + IPC.dispatchMessage(process, { action: { message: "kill", args: { @@ -418,15 +468,15 @@ export namespace Session { repl.registerCommand("exit", [/clean|force/], args => this.killSession("manual exit requested by repl", args[0] === "clean", 0)); repl.registerCommand("restart", [/clean|force/], args => this.killActiveWorker(args[0] === "clean")); repl.registerCommand("set", [letters, "port", number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === "true")); - repl.registerCommand("set", [/polling/, number, boolean], args => { - const newPollingIntervalSeconds = Math.floor(Number(args[2])); + repl.registerCommand("set", [/polling/, number, boolean], async args => { + const newPollingIntervalSeconds = Math.floor(Number(args[1])); if (newPollingIntervalSeconds < 0) { this.mainLog(red("the polling interval must be a non-negative integer")); } else { if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) { this.config.polling.intervalSeconds = newPollingIntervalSeconds; - if (args[3] === "true") { - this.activeWorker?.send({ newPollingIntervalSeconds }); + if (args[2] === "true") { + return IPC.dispatchMessage(this.activeWorker!, { newPollingIntervalSeconds }, true); } } } @@ -442,7 +492,7 @@ export namespace Session { private killActiveWorker = (graceful = true, isSessionEnd = false): void => { if (this.activeWorker && !this.activeWorker.isDead()) { if (graceful) { - this.activeWorker.send({ manualExit: { isSessionEnd } }); + IPC.dispatchMessage(this.activeWorker, { manualExit: { isSessionEnd } }); } else { this.activeWorker.process.kill(); } @@ -487,11 +537,12 @@ export namespace Session { serverPort: ports.server, socketPort: ports.socket, pollingIntervalSeconds: intervalSeconds, - session_key: this.key + session_key: this.key, + ipc_suffix: IPC.suffix }); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker.process.pid}`)); // an IPC message handler that executes actions on the master thread when prompted by the active worker - this.activeWorker.on("message", async ({ lifecycle, action }) => { + IPC.addMessagesHandler(this.activeWorker, async ({ lifecycle, action }) => { if (action) { const { message, args } = action as Monitor.Action; console.log(this.timestamp(), `${this.config.identifiers.worker.text} action requested (${cyan(message)})`); @@ -547,7 +598,7 @@ export namespace Session { console.error(red("cannot create a worker on the monitor process.")); process.exit(1); } else if (++ServerWorker.count > 1) { - process.send?.({ + IPC.dispatchMessage(process, { action: { message: "kill", args: { reason: "cannot create more than one worker on a given worker process.", @@ -579,7 +630,7 @@ export namespace Session { * A convenience wrapper to tell the session monitor (parent process) * to carry out the action with the specified message and arguments. */ - public sendMonitorAction = (message: string, args?: any) => process.send!({ action: { message, args } }); + public sendMonitorAction = (message: string, args?: any, expectResponse = false) => IPC.dispatchMessage(process, { action: { message, args } }, expectResponse); private constructor(work: Function) { this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); @@ -601,8 +652,11 @@ export namespace Session { */ private configureProcess = () => { // updates the local values of variables to the those sent from master - process.on("message", async ({ newPollingIntervalSeconds, manualExit }) => { + IPC.addMessagesHandler(process, async ({ newPollingIntervalSeconds, manualExit }) => { if (newPollingIntervalSeconds !== undefined) { + await new Promise<void>(resolve => { + setTimeout(resolve, 1000 * 10); + }); this.pollingIntervalSeconds = newPollingIntervalSeconds; } if (manualExit !== undefined) { @@ -629,7 +683,7 @@ export namespace Session { /** * Notify master thread (which will log update in the console) of initialization via IPC. */ - public lifecycleNotification = (event: string) => process.send?.({ lifecycle: event }); + public lifecycleNotification = (event: string) => IPC.dispatchMessage(process, { lifecycle: event }); /** * Called whenever the process has a reason to terminate, either through an uncaught exception @@ -643,6 +697,7 @@ export namespace Session { // notify master thread (which will log update in the console) of crash event via IPC this.lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`)); this.lifecycleNotification(red(error.message)); + console.log("GAH!", error); process.exit(1); } |