diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/server/ApiManagers/SessionManager.ts | 9 | ||||
-rw-r--r-- | src/server/DashSession/DashSessionAgent.ts | 4 | ||||
-rw-r--r-- | src/server/session/agents/monitor.ts | 16 | ||||
-rw-r--r-- | src/server/session/agents/process_message_router.ts | 2 | ||||
-rw-r--r-- | src/server/session/agents/promisified_ipc_manager.ts | 97 | ||||
-rw-r--r-- | src/server/session/agents/server_worker.ts | 10 | ||||
-rw-r--r-- | src/server/session/utilities/ipc.ts | 66 |
7 files changed, 115 insertions, 89 deletions
diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts index a40b86dc5..d989d8d1b 100644 --- a/src/server/ApiManagers/SessionManager.ts +++ b/src/server/ApiManagers/SessionManager.ts @@ -30,13 +30,14 @@ export default class SessionManager extends ApiManager { method: Method.GET, subscription: this.secureSubscriber("debug", "mode?", "recipient?"), secureHandler: this.authorizedAction(async ({ req, res }) => { - let { mode } = req.params; + const { mode, recipient } = req.params; if (mode && !["passive", "active"].includes(mode)) { res.send(`Your request failed. '${mode}' is not a valid mode: please choose either 'active' or 'passive'`); } else { - !mode && (mode = "active"); - const recipient = req.params.recipient || DashSessionAgent.notificationRecipient; - const response = await sessionAgent.serverWorker.emitToMonitorPromise("debug", { mode, recipient }); + const response = await sessionAgent.serverWorker.emitToMonitorPromise("debug", { + mode: mode || "active", + recipient: recipient || DashSessionAgent.notificationRecipient + }); if (response instanceof Error) { res.send(response); } else { diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index 3c98c1e9d..fe7cdae88 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -11,7 +11,7 @@ import { resolve } from "path"; import { AppliedSessionAgent, ExitHandler } from "../session/agents/applied_session_agent"; import { Monitor } from "../session/agents/monitor"; import { ServerWorker } from "../session/agents/server_worker"; -import { MessageHandler } from "../session/utilities/ipc"; +import { MessageHandler } from "../session/agents/promisified_ipc_manager"; /** * If we're the monitor (master) thread, we should launch the monitor logic for the session. @@ -110,7 +110,7 @@ export class DashSessionAgent extends AppliedSessionAgent { content: this.generateCrashInstructions(crashCause) }); if (error) { - this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`)); + this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} ${yellow(`(${error.message})`)}`)); mainLog(red("distribution of crash notification experienced errors")); } else { mainLog(green("successfully distributed crash notification to recipients")); diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index d4abbb51e..5ea950b2b 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -2,13 +2,14 @@ import { ExitHandler } from "./applied_session_agent"; import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config"; import Repl, { ReplAction } from "../utilities/repl"; import { isWorker, setupMaster, on, Worker, fork } from "cluster"; -import { PromisifiedIPCManager, suffix, IPC, MessageHandler } from "../utilities/ipc"; +import { IPC_Promisify, MessageHandler } from "./promisified_ipc_manager"; import { red, cyan, white, yellow, blue } from "colors"; import { exec, ExecOptions } from "child_process"; import { validate, ValidationError } from "jsonschema"; import { Utilities } from "../utilities/utilities"; import { readFileSync } from "fs"; import ProcessMessageRouter from "./process_message_router"; +import { ServerWorker } from "./server_worker"; /** * Validates and reads the configuration file, accordingly builds a child process factory @@ -25,7 +26,7 @@ export class Monitor extends ProcessMessageRouter { public static Create(sessionKey: string) { if (isWorker) { - IPC(process).emit("kill", { + ServerWorker.IPCManager.emit("kill", { reason: "cannot create a monitor on the worker process.", graceful: false, errorCode: 1 @@ -210,7 +211,7 @@ export class Monitor extends ProcessMessageRouter { repl.registerCommand("exit", [/clean|force/], args => this.killSession("manual exit requested by repl", args[0] === "clean", 0)); repl.registerCommand("restart", [/clean|force/], args => this.killActiveWorker(args[0] === "clean")); repl.registerCommand("set", [letters, "port", number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === "true")); - repl.registerCommand("set", [/polling/, number, boolean], async args => { + repl.registerCommand("set", [/polling/, number, boolean], args => { const newPollingIntervalSeconds = Math.floor(Number(args[1])); if (newPollingIntervalSeconds < 0) { this.mainLog(red("the polling interval must be a non-negative integer")); @@ -218,7 +219,7 @@ export class Monitor extends ProcessMessageRouter { if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) { this.config.polling.intervalSeconds = newPollingIntervalSeconds; if (args[2] === "true") { - return Monitor.IPCManager.emitPromise("updatePollingInterval", { newPollingIntervalSeconds }); + Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds }); } } } @@ -279,16 +280,13 @@ export class Monitor extends ProcessMessageRouter { serverPort: ports.server, socketPort: ports.socket, pollingIntervalSeconds: intervalSeconds, - session_key: this.key, - ipc_suffix: suffix + session_key: this.key }); - Monitor.IPCManager = IPC(this.activeWorker); + Monitor.IPCManager = IPC_Promisify(this.activeWorker, this.route); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); this.on("kill", ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode), true); this.on("lifecycle", ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); - - Monitor.IPCManager.setRouter(this.route); } } diff --git a/src/server/session/agents/process_message_router.ts b/src/server/session/agents/process_message_router.ts index f60343514..d359e97c3 100644 --- a/src/server/session/agents/process_message_router.ts +++ b/src/server/session/agents/process_message_router.ts @@ -1,4 +1,4 @@ -import { MessageHandler, PromisifiedIPCManager } from "../utilities/ipc"; +import { MessageHandler, PromisifiedIPCManager } from "./promisified_ipc_manager"; export default abstract class ProcessMessageRouter { diff --git a/src/server/session/agents/promisified_ipc_manager.ts b/src/server/session/agents/promisified_ipc_manager.ts new file mode 100644 index 000000000..216e9be44 --- /dev/null +++ b/src/server/session/agents/promisified_ipc_manager.ts @@ -0,0 +1,97 @@ +import { Utils } from "../../../Utils"; +import { isMaster } from "cluster"; + +/** + * Convenience constructor + * @param target the process / worker to which to attach the specialized listeners + */ +export function IPC_Promisify(target: IPCTarget, router: Router) { + return new PromisifiedIPCManager(target, router); +} + +/** + * Essentially, a node process or node cluster worker + */ +export type IPCTarget = NodeJS.EventEmitter & { send?: Function }; + +/** + * Some external code that maps the name of incoming messages to registered handlers, if any + * when this returns, the message is assumed to have been handled in its entirety by the process, so + * await any asynchronous code inside this router. + */ +export type Router = (message: Message) => void | Promise<void>; + +/** + * Specifies a general message format for this API + */ +export type Message<T = any> = { name: string; args: T; }; +export type MessageHandler<T = any> = (args: T) => any | Promise<any>; + +/** + * When a message is emitted, it + */ +type InternalMessage = Message & { metadata: any }; +type InternalMessageHandler = (message: InternalMessage) => any | Promise<any>; + +/** + * This is a wrapper utility class that allows the caller process + * to emit an event and return a promise that resolves when it and all + * other processes listening to its emission of this event have completed. + */ +export class PromisifiedIPCManager { + private readonly target: IPCTarget; + + constructor(target: IPCTarget, router: Router) { + this.target = target; + this.target.addListener("message", this.internalHandler(router)); + } + + /** + * A convenience wrapper around the standard process emission. + * Does not wait for a response. + */ + public emit = async (name: string, args?: any) => this.target.send?.({ name, args }); + + /** + * This routine uniquely identifies each message, then adds a general + * message listener that waits for a response with the same id before resolving + * the promise. + */ + public emitPromise = async (name: string, args?: any) => { + return new Promise(resolve => { + const messageId = Utils.GenerateGuid(); + const responseHandler: InternalMessageHandler = ({ metadata: { id, isResponse }, args, name }) => { + if (isResponse && id === messageId) { + this.target.removeListener("message", responseHandler); + resolve(args?.error as Error | undefined); + } + }; + this.target.addListener("message", responseHandler); + const message = { name, args, metadata: { id: messageId } }; + this.target.send?.(message); + }); + } + + /** + * This routine receives a uniquely identified message. If the message is itself a response, + * it is ignored to avoid infinite mutual responses. Otherwise, the routine awaits its completion using whatever + * router the caller has installed, and then sends a response containing the original message id, + * which will ultimately invoke the responseHandler of the original emission and resolve the + * sender's promise. + */ + private internalHandler = (router: Router) => async ({ name, args, metadata }: InternalMessage) => { + if (name && (!metadata || !metadata.isResponse)) { + let error: Error | undefined; + try { + await router({ name, args }); + } catch (e) { + error = e; + } + if (metadata && this.target.send) { + metadata.isResponse = true; + this.target.send({ name, args: { error }, metadata }); + } + } + } + +}
\ No newline at end of file diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index 23ffb2650..705307030 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -1,6 +1,6 @@ import { ExitHandler } from "./applied_session_agent"; import { isMaster } from "cluster"; -import { PromisifiedIPCManager, Message, MessageHandler } from "../utilities/ipc"; +import { PromisifiedIPCManager } from "./promisified_ipc_manager"; import ProcessMessageRouter from "./process_message_router"; import { red, green, white, yellow } from "colors"; import { get } from "request-promise"; @@ -61,7 +61,7 @@ export class ServerWorker extends ProcessMessageRouter { private constructor(work: Function) { super(); - ServerWorker.IPCManager = new PromisifiedIPCManager(process); + ServerWorker.IPCManager = new PromisifiedIPCManager(process, this.route); this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env; @@ -80,12 +80,8 @@ export class ServerWorker extends ProcessMessageRouter { * server process. */ private configureProcess = () => { - ServerWorker.IPCManager.setRouter(this.route); // updates the local values of variables to the those sent from master - this.on("updatePollingInterval", ({ newPollingIntervalSeconds }) => { - this.pollingIntervalSeconds = newPollingIntervalSeconds; - return new Promise<void>(resolve => setTimeout(resolve, 1000 * 10)); - }); + this.on("updatePollingInterval", ({ newPollingIntervalSeconds }) => this.pollingIntervalSeconds = newPollingIntervalSeconds); this.on("manualExit", async ({ isSessionEnd }) => { await this.executeExitHandlers(isSessionEnd); process.exit(0); diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts deleted file mode 100644 index c90b15907..000000000 --- a/src/server/session/utilities/ipc.ts +++ /dev/null @@ -1,66 +0,0 @@ -import { isMaster } from "cluster"; -import { Utils } from "../../../Utils"; - -export function IPC(target: IPCTarget) { - return new PromisifiedIPCManager(target); -} - -export type IPCTarget = NodeJS.EventEmitter & { send?: Function }; -export type Router = (message: Message) => void | Promise<void>; - -export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix; - -type InternalMessage<T = any> = Message<T> & { metadata: any }; - -export interface Message<T = any> { - name: string; - args: T; -} - -export type MessageHandler<T = any> = (message: T) => any | Promise<any>; - -export class PromisifiedIPCManager { - private readonly target: IPCTarget; - private readonly ipc_id = `ipc_id_${suffix}`; - private readonly is_response = `is_response_${suffix}`; - - constructor(target: IPCTarget) { - this.target = target; - } - - public emit = async (name: string, args?: any) => this.target.send?.({ name, args }); - - public emitPromise = async (name: string, args?: any) => { - return new Promise(resolve => { - const messageId = Utils.GenerateGuid(); - const metadata: any = {}; - metadata[this.ipc_id] = messageId; - const responseHandler: MessageHandler<any> = ({ metadata, args }) => { - if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) { - this.target.removeListener("message", responseHandler); - resolve(args?.error as Error | undefined); - } - }; - this.target.addListener("message", responseHandler); - this.target.send?.({ name, args, metadata }); - }); - } - - public setRouter = (router: Router) => { - this.target.addListener("message", async ({ name, args, metadata }: InternalMessage) => { - if (name && (!metadata || !metadata[this.is_response])) { - let error: Error | undefined; - try { - await router({ name, args }); - } catch (e) { - error = e; - } - if (metadata && this.target.send) { - metadata[this.is_response] = true; - this.target.send({ name, args: { error }, metadata }); - } - } - }); - } - -}
\ No newline at end of file |