From 148a599d67b57b8a31697b4d54da1a652a71e441 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Fri, 10 Jan 2020 16:45:09 -0500 Subject: ipc --- src/server/session/agents/monitor.ts | 18 ++++++++++-------- src/server/session/agents/server_worker.ts | 12 ++++++------ 2 files changed, 16 insertions(+), 14 deletions(-) (limited to 'src/server/session/agents') diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index e1709f5e6..f6738a23f 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -2,8 +2,8 @@ import { ExitHandler } from "./applied_session_agent"; import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config"; import Repl, { ReplAction } from "../utilities/repl"; import { isWorker, setupMaster, on, Worker, fork } from "cluster"; -import { IPC } from "../utilities/ipc"; -import { red, cyan, white, yellow, blue, green } from "colors"; +import { PromisifiedIPCManager, suffix } from "../utilities/ipc"; +import { red, cyan, white, yellow, blue } from "colors"; import { exec, ExecOptions } from "child_process"; import { Utils } from "../../../Utils"; import { validate, ValidationError } from "jsonschema"; @@ -16,7 +16,8 @@ import { EventEmitter } from "events"; * and spawns off an initial process that will respawn as predecessors die. */ export class Monitor extends EventEmitter { - + private static readonly localIPCManager = new PromisifiedIPCManager(process); + private static childIPCManager: PromisifiedIPCManager; private static count = 0; private finalized = false; private exitHandlers: ExitHandler[] = []; @@ -28,7 +29,7 @@ export class Monitor extends EventEmitter { public static Create() { if (isWorker) { - IPC.dispatchMessage(process, { + this.localIPCManager.emit({ action: { message: "kill", args: { @@ -250,7 +251,7 @@ export class Monitor extends EventEmitter { if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) { this.config.polling.intervalSeconds = newPollingIntervalSeconds; if (args[2] === "true") { - return IPC.dispatchMessage(this.activeWorker!, { newPollingIntervalSeconds }, true); + return Monitor.localIPCManager.emit({ newPollingIntervalSeconds }, true); } } } @@ -266,7 +267,7 @@ export class Monitor extends EventEmitter { private killActiveWorker = (graceful = true, isSessionEnd = false): void => { if (this.activeWorker && !this.activeWorker.isDead()) { if (graceful) { - IPC.dispatchMessage(this.activeWorker, { manualExit: { isSessionEnd } }); + Monitor.childIPCManager.emit({ manualExit: { isSessionEnd } }); } else { this.activeWorker.process.kill(); } @@ -312,11 +313,12 @@ export class Monitor extends EventEmitter { socketPort: ports.socket, pollingIntervalSeconds: intervalSeconds, session_key: this.key, - ipc_suffix: IPC.suffix + ipc_suffix: suffix }); + Monitor.childIPCManager = new PromisifiedIPCManager(this.activeWorker); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); // an IPC message handler that executes actions on the master thread when prompted by the active worker - IPC.addMessagesHandler(this.activeWorker!, async ({ lifecycle, action }) => { + Monitor.childIPCManager.addMessagesHandler(async ({ lifecycle, action }) => { if (action) { const { message, args } = action as Monitor.Action; console.log(this.timestamp(), `${this.config.identifiers.worker.text} action requested (${cyan(message)})`); diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index e9fdaf923..278cbb42f 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -1,6 +1,6 @@ import { ExitHandler } from "./applied_session_agent"; import { isMaster } from "cluster"; -import { IPC } from "../utilities/ipc"; +import { PromisifiedIPCManager } from "../utilities/ipc"; import { red, green, white, yellow } from "colors"; import { get } from "request-promise"; import { Monitor } from "./monitor"; @@ -11,7 +11,7 @@ import { Monitor } from "./monitor"; * email if the server encounters an uncaught exception or if the server cannot be reached. */ export class ServerWorker { - + private static localIPCManager = new PromisifiedIPCManager(process); private static count = 0; private shouldServerBeResponsive = false; private exitHandlers: ExitHandler[] = []; @@ -27,7 +27,7 @@ export class ServerWorker { console.error(red("cannot create a worker on the monitor process.")); process.exit(1); } else if (++ServerWorker.count > 1) { - IPC.dispatchMessage(process, { + ServerWorker.localIPCManager.emit({ action: { message: "kill", args: { reason: "cannot create more than one worker on a given worker process.", @@ -59,7 +59,7 @@ export class ServerWorker { * A convenience wrapper to tell the session monitor (parent process) * to carry out the action with the specified message and arguments. */ - public sendMonitorAction = (message: string, args?: any, expectResponse = false) => IPC.dispatchMessage(process, { action: { message, args } }, expectResponse); + public sendMonitorAction = (message: string, args?: any, expectResponse = false) => ServerWorker.localIPCManager.emit({ action: { message, args } }, expectResponse); private constructor(work: Function) { this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); @@ -81,7 +81,7 @@ export class ServerWorker { */ private configureProcess = () => { // updates the local values of variables to the those sent from master - IPC.addMessagesHandler(process, async ({ newPollingIntervalSeconds, manualExit }) => { + ServerWorker.localIPCManager.addMessagesHandler(async ({ newPollingIntervalSeconds, manualExit }) => { if (newPollingIntervalSeconds !== undefined) { this.pollingIntervalSeconds = newPollingIntervalSeconds; } @@ -109,7 +109,7 @@ export class ServerWorker { /** * Notify master thread (which will log update in the console) of initialization via IPC. */ - public lifecycleNotification = (event: string) => IPC.dispatchMessage(process, { lifecycle: event }); + public lifecycleNotification = (event: string) => ServerWorker.localIPCManager.emit({ lifecycle: event }); /** * Called whenever the process has a reason to terminate, either through an uncaught exception -- cgit v1.2.3-70-g09d2