diff options
author | Sam Wilkins <samwilkins333@gmail.com> | 2020-01-11 09:54:48 -0500 |
---|---|---|
committer | Sam Wilkins <samwilkins333@gmail.com> | 2020-01-11 09:54:48 -0500 |
commit | 27c93abd49ca8a519d2aa3cf7938434fe25947d7 (patch) | |
tree | 14f137aa8332ef1ce0cf95ff5cde52246637d45f /src | |
parent | 7a20f573f4f428bfc779797d437fa9525b6976f8 (diff) |
extends message, removed duplicate handlers, IPC streamlined
Diffstat (limited to 'src')
-rw-r--r-- | src/server/ApiManagers/SessionManager.ts | 4 | ||||
-rw-r--r-- | src/server/DashSession/DashSessionAgent.ts | 15 | ||||
-rw-r--r-- | src/server/session/agents/applied_session_agent.ts | 8 | ||||
-rw-r--r-- | src/server/session/agents/message_router.ts | 45 | ||||
-rw-r--r-- | src/server/session/agents/monitor.ts | 30 | ||||
-rw-r--r-- | src/server/session/agents/server_worker.ts | 17 | ||||
-rw-r--r-- | src/server/session/utilities/ipc.ts | 93 |
7 files changed, 116 insertions, 96 deletions
diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts index 21103fdd5..91ef7e298 100644 --- a/src/server/ApiManagers/SessionManager.ts +++ b/src/server/ApiManagers/SessionManager.ts @@ -33,7 +33,7 @@ export default class SessionManager extends ApiManager { const { mode } = req.params; if (["passive", "active"].includes(mode)) { const recipient = req.params.recipient || DashSessionAgent.notificationRecipient; - const response = await sessionAgent.serverWorker.sendMonitorAction("debug", { mode, recipient }, true); + const response = await sessionAgent.serverWorker.emitToMonitor("debug", { mode, recipient }, true); if (response instanceof Error) { res.send(response); } else { @@ -49,7 +49,7 @@ export default class SessionManager extends ApiManager { method: Method.GET, subscription: this.secureSubscriber("backup"), secureHandler: this.authorizedAction(async ({ res }) => { - const response = await sessionAgent.serverWorker.sendMonitorAction("backup"); + const response = await sessionAgent.serverWorker.emitToMonitor("backup"); if (response instanceof Error) { res.send(response); } else { diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index 0d9486757..b7e741525 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -11,6 +11,7 @@ import { resolve } from "path"; import { AppliedSessionAgent, ExitHandler } from "../session/agents/applied_session_agent"; import { Monitor } from "../session/agents/monitor"; import { ServerWorker } from "../session/agents/server_worker"; +import { Message } from "../session/utilities/ipc"; /** * If we're the monitor (master) thread, we should launch the monitor logic for the session. @@ -26,14 +27,14 @@ export class DashSessionAgent extends AppliedSessionAgent { * The core method invoked when the single master thread is initialized. * Installs event hooks, repl commands and additional IPC listeners. */ - protected async initializeMonitor(monitor: Monitor) { + protected async initializeMonitor(monitor: Monitor, sessionKey: string) { + await this.dispatchSessionPassword(sessionKey); monitor.addReplCommand("pull", [], () => monitor.exec("git pull")); monitor.addReplCommand("solr", [/start|stop|index/], this.executeSolrCommand); monitor.addReplCommand("backup", [], this.backup); monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.addServerMessageListener("backup", this.backup); - monitor.addServerMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.onKeyGenerated(this.dispatchSessionPassword); + monitor.addMessageListener("backup", this.backup); + monitor.addMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); monitor.onCrashDetected(this.dispatchCrashReport); } @@ -80,14 +81,14 @@ export class DashSessionAgent extends AppliedSessionAgent { * This sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone * to kill the server via the /kill/:key route. */ - private dispatchSessionPassword = async (key: string) => { + private dispatchSessionPassword = async (sessionKey: string) => { const { mainLog } = this.sessionMonitor; const { notificationRecipient } = DashSessionAgent; mainLog(green("dispatching session key...")); const error = await Email.dispatch({ to: notificationRecipient, subject: "Dash Release Session Admin Authentication Key", - content: `The key for this session (started @ ${new Date().toUTCString()}) is ${key}.\n\n${this.signature}` + content: `The key for this session (started @ ${new Date().toUTCString()}) is ${sessionKey}.\n\n${this.signature}` }); if (error) { this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`)); @@ -100,7 +101,7 @@ export class DashSessionAgent extends AppliedSessionAgent { /** * This sends an email with the generated crash report. */ - private dispatchCrashReport = async (crashCause: Error) => { + private dispatchCrashReport = async ({ args: { error: crashCause } }: Message) => { const { mainLog } = this.sessionMonitor; const { notificationRecipient } = DashSessionAgent; const error = await Email.dispatch({ diff --git a/src/server/session/agents/applied_session_agent.ts b/src/server/session/agents/applied_session_agent.ts index 53293d3bf..48226dab6 100644 --- a/src/server/session/agents/applied_session_agent.ts +++ b/src/server/session/agents/applied_session_agent.ts @@ -1,6 +1,7 @@ import { isMaster } from "cluster"; import { Monitor } from "./monitor"; import { ServerWorker } from "./server_worker"; +import { Utils } from "../../../Utils"; export type ExitHandler = (reason: Error | boolean) => void | Promise<void>; @@ -8,7 +9,7 @@ export abstract class AppliedSessionAgent { // the following two methods allow the developer to create a custom // session and use the built in customization options for each thread - protected abstract async initializeMonitor(monitor: Monitor): Promise<void>; + protected abstract async initializeMonitor(monitor: Monitor, key: string): Promise<void>; protected abstract async initializeServerWorker(): Promise<ServerWorker>; private launched = false; @@ -21,7 +22,7 @@ export abstract class AppliedSessionAgent { private sessionMonitorRef: Monitor | undefined; public get sessionMonitor(): Monitor { if (!isMaster) { - this.serverWorker.sendMonitorAction("kill", { + this.serverWorker.emitToMonitor("kill", { graceful: false, reason: "Cannot access the session monitor directly from the server worker thread.", errorCode: 1 @@ -43,7 +44,8 @@ export abstract class AppliedSessionAgent { if (!this.launched) { this.launched = true; if (isMaster) { - await this.initializeMonitor(this.sessionMonitorRef = Monitor.Create()); + const sessionKey = Utils.GenerateGuid(); + await this.initializeMonitor(this.sessionMonitorRef = Monitor.Create(sessionKey), sessionKey); this.sessionMonitorRef.finalize(); } else { this.serverWorkerRef = await this.initializeServerWorker(); diff --git a/src/server/session/agents/message_router.ts b/src/server/session/agents/message_router.ts new file mode 100644 index 000000000..5848e27ab --- /dev/null +++ b/src/server/session/agents/message_router.ts @@ -0,0 +1,45 @@ +import { MessageHandler, Message } from "../utilities/ipc"; + +export default abstract class MessageRouter { + + private onMessage: { [name: string]: MessageHandler[] | undefined } = {}; + + /** + * Add a listener at this message. When the monitor process + * receives a message, it will invoke all registered functions. + */ + public addMessageListener = (name: string, handler: MessageHandler, exclusive = false) => { + const handlers = this.onMessage[name]; + if (exclusive || !handlers) { + this.onMessage[name] = [handler]; + } else { + handlers.push(handler); + } + } + + /** + * Unregister a given listener at this message. + */ + public removeMessageListener = (name: string, handler: MessageHandler) => { + const handlers = this.onMessage[name]; + if (handlers) { + const index = handlers.indexOf(handler); + if (index > -1) { + handlers.splice(index, 1); + } + } + } + + /** + * Unregister all listeners at this message. + */ + public clearMessageListeners = (...names: string[]) => names.map(name => this.onMessage[name] = undefined); + + protected route: MessageHandler = async ({ name, args }) => { + const handlers = this.onMessage[name]; + if (handlers) { + await Promise.all(handlers.map(handler => handler({ name, args }))); + } + } + +}
\ No newline at end of file diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index 18fa6df24..96f1f8130 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -2,20 +2,19 @@ import { ExitHandler } from "./applied_session_agent"; import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config"; import Repl, { ReplAction } from "../utilities/repl"; import { isWorker, setupMaster, on, Worker, fork } from "cluster"; -import { PromisifiedIPCManager, suffix, IPC, Message } from "../utilities/ipc"; +import { PromisifiedIPCManager, suffix, IPC, MessageHandler, Message } from "../utilities/ipc"; import { red, cyan, white, yellow, blue } from "colors"; import { exec, ExecOptions } from "child_process"; -import { Utils } from "../../../Utils"; import { validate, ValidationError } from "jsonschema"; import { Utilities } from "../utilities/utilities"; import { readFileSync } from "fs"; -import { EventEmitter } from "events"; +import MessageRouter from "./message_router"; /** * Validates and reads the configuration file, accordingly builds a child process factory * and spawns off an initial process that will respawn as predecessors die. */ -export class Monitor extends EventEmitter { +export class Monitor extends MessageRouter { private static IPCManager: PromisifiedIPCManager; private static count = 0; private finalized = false; @@ -25,7 +24,7 @@ export class Monitor extends EventEmitter { private key: string | undefined; private repl: Repl; - public static Create() { + public static Create(sessionKey: string) { if (isWorker) { IPC(process).emit("kill", { reason: "cannot create a monitor on the worker process.", @@ -37,13 +36,12 @@ export class Monitor extends EventEmitter { console.error(red("cannot create more than one monitor.")); process.exit(1); } else { - return new Monitor(); + return new Monitor(sessionKey); } } - public onCrashDetected = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener); - public onKeyGenerated = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.KeyGenerated, listener); - public onServerRunning = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener); + public onCrashDetected = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.CrashDetected, listener); + public onServerRunning = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.ServerRunning, listener); /** * Kill this session and its active child @@ -93,10 +91,10 @@ export class Monitor extends EventEmitter { }); } - private constructor() { + private constructor(sessionKey: string) { super(); - console.log(this.timestamp(), cyan("initializing session...")); + this.key = sessionKey; this.config = this.loadAndValidateConfiguration(); // determines whether or not we see the compilation / initialization / runtime output of each child server process @@ -131,7 +129,6 @@ export class Monitor extends EventEmitter { throw new Error("Session monitor is already finalized"); } this.finalized = true; - this.emit(Monitor.IntrinsicEvents.KeyGenerated, this.key = Utils.GenerateGuid()); this.spawn(); } @@ -284,11 +281,10 @@ export class Monitor extends EventEmitter { Monitor.IPCManager = IPC(this.activeWorker); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); - const { addMessageListener } = Monitor.IPCManager; - addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode)); - addMessageListener(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, ({ args: { error } }) => this.emit(Monitor.IntrinsicEvents.CrashDetected, error)); - addMessageListener(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, ({ args: { firstTime } }) => this.emit(Monitor.IntrinsicEvents.ServerRunning, firstTime)); - addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`)); + this.addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true); + this.addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); + + Monitor.IPCManager.setRouter(this.route); } } diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index 9e471366a..01e1cf971 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -1,6 +1,7 @@ import { ExitHandler } from "./applied_session_agent"; import { isMaster } from "cluster"; import { PromisifiedIPCManager } from "../utilities/ipc"; +import MessageRouter from "./message_router"; import { red, green, white, yellow } from "colors"; import { get } from "request-promise"; import { Monitor } from "./monitor"; @@ -10,7 +11,7 @@ import { Monitor } from "./monitor"; * if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification * email if the server encounters an uncaught exception or if the server cannot be reached. */ -export class ServerWorker { +export class ServerWorker extends MessageRouter { private static IPCManager = new PromisifiedIPCManager(process); private static count = 0; private shouldServerBeResponsive = false; @@ -58,6 +59,7 @@ export class ServerWorker { public emitToMonitor = (name: string, args?: any, expectResponse = false) => ServerWorker.IPCManager.emit(name, args, expectResponse); private constructor(work: Function) { + super(); this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env; @@ -76,10 +78,13 @@ export class ServerWorker { * server process. */ private configureProcess = () => { + ServerWorker.IPCManager.setRouter(this.route); // updates the local values of variables to the those sent from master - const { addMessageListener } = ServerWorker.IPCManager; - addMessageListener("updatePollingInterval", ({ args }) => this.pollingIntervalSeconds = args.newPollingIntervalSeconds); - addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => { + this.addMessageListener("updatePollingInterval", ({ args }) => { + this.pollingIntervalSeconds = args.newPollingIntervalSeconds; + return new Promise<void>(resolve => setTimeout(resolve, 1000 * 10)); + }); + this.addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => { await this.executeExitHandlers(isSessionEnd); process.exit(0); }); @@ -110,7 +115,7 @@ export class ServerWorker { private proactiveUnplannedExit = async (error: Error): Promise<void> => { this.shouldServerBeResponsive = false; // communicates via IPC to the master thread that it should dispatch a crash notification email - this.emitToMonitor(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, { error }); + this.emitToMonitor(Monitor.IntrinsicEvents.CrashDetected, { error }); await this.executeExitHandlers(error); // notify master thread (which will log update in the console) of crash event via IPC this.lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`)); @@ -130,7 +135,7 @@ export class ServerWorker { if (!this.shouldServerBeResponsive) { // notify monitor thread that the server is up and running this.lifecycleNotification(green(`listening on ${this.serverPort}...`)); - this.emitToMonitor(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, { firstTime: !this.isInitialized }); + this.emitToMonitor(Monitor.IntrinsicEvents.ServerRunning, { firstTime: !this.isInitialized }); this.isInitialized = true; } this.shouldServerBeResponsive = true; diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts index 2faf9f63e..37aaa6757 100644 --- a/src/server/session/utilities/ipc.ts +++ b/src/server/session/utilities/ipc.ts @@ -8,95 +8,66 @@ export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix; export interface Message { name: string; - args: any; + args?: any; } +type InternalMessage = Message & { metadata: any }; -export type MessageHandler = (message: Message) => any | Promise<any>; +export type MessageHandler<T extends Message = Message> = (message: T) => any | Promise<any>; export class PromisifiedIPCManager { - private onMessage: { [message: string]: MessageHandler[] | undefined } = {}; private readonly target: IPCTarget; private readonly ipc_id = `ipc_id_${suffix}`; - private readonly response_expected = `response_expected_${suffix}`; private readonly is_response = `is_response_${suffix}`; constructor(target: IPCTarget) { this.target = target; - - this.target.addListener("message", async ({ name, args }: Message) => { - let error: Error | undefined; - try { - const handlers = this.onMessage[name]; - if (handlers) { - await Promise.all(handlers.map(handler => handler({ name, args }))); - } - } catch (e) { - error = e; - } - if (args[this.response_expected] && this.target.send) { - const response: any = { error }; - response[this.ipc_id] = args[this.ipc_id]; - response[this.is_response] = true; - this.target.send(response); - } - }); - } - - /** - * Add a listener at this message. When the monitor process - * receives a message, it will invoke all registered functions. - */ - public addMessageListener = (name: string, handler: MessageHandler) => { - const handlers = this.onMessage[name]; - if (handlers) { - handlers.push(handler); - } else { - this.onMessage[name] = [handler]; - } - } - - /** - * Unregister a given listener at this message. - */ - public removeMessageListener = (name: string, handler: MessageHandler) => { - const handlers = this.onMessage[name]; - if (handlers) { - const index = handlers.indexOf(handler); - if (index > -1) { - handlers.splice(index, 1); - } - } } - /** - * Unregister all listeners at this message. - */ - public clearMessageListeners = (message: string) => this.onMessage[message] = undefined; - - public emit = async (name: string, args: any, expectResponse = false): Promise<Error | undefined> => { + public emit = async (name: string, args?: any, expectResponse = false): Promise<Error | undefined> => { if (!this.target.send) { return new Error("Cannot dispatch when send is undefined."); } - args[this.response_expected] = expectResponse; if (expectResponse) { return new Promise(resolve => { const messageId = Utils.GenerateGuid(); - args[this.ipc_id] = messageId; - const responseHandler: (args: any) => void = response => { - const { error } = response; - if (response[this.is_response] && response[this.ipc_id] === messageId) { + const metadata: any = {}; + metadata[this.ipc_id] = messageId; + const responseHandler: MessageHandler<InternalMessage> = ({ args, metadata }) => { + if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) { + const { error } = args; this.target.removeListener("message", responseHandler); resolve(error); } }; this.target.addListener("message", responseHandler); - this.target.send!({ name, args }); + this.target.send?.({ name, args, metadata }); }); } else { - this.target.send({ name, args }); + this.target.send?.({ name, args }); } } + public setRouter = (router: Router) => { + this.target.addListener("message", async ({ name, args, metadata }: InternalMessage) => { + if (name && (!metadata || !metadata[this.is_response])) { + let error: Error | undefined; + try { + await router({ name, args }); + } catch (e) { + error = e; + } + if (metadata && this.target.send) { + metadata[this.is_response] = true; + this.target.send({ + name, + args: { error }, + metadata + }); + } + } + }); + } + } export function IPC(target: IPCTarget) { |