From cf3e869023d3027ae42c828ba3670b77d838ac50 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Fri, 10 Jan 2020 06:00:50 -0500 Subject: email takes in object, commented debug zip, promisified inter-process messaging, streamlined session manager route responses --- src/server/ApiManagers/SessionManager.ts | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) (limited to 'src/server/ApiManagers') diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts index 0290b578c..6782643bc 100644 --- a/src/server/ApiManagers/SessionManager.ts +++ b/src/server/ApiManagers/SessionManager.ts @@ -19,7 +19,7 @@ export default class SessionManager extends ApiManager { if (password !== process.env.session_key) { return _permission_denied(res, permissionError); } - handler(core); + return handler(core); }; } @@ -28,11 +28,15 @@ export default class SessionManager extends ApiManager { register({ method: Method.GET, subscription: this.secureSubscriber("debug", "mode", "recipient"), - secureHandler: this.authorizedAction(({ req, res }) => { + secureHandler: this.authorizedAction(async ({ req, res }) => { const { mode, recipient } = req.params; if (["passive", "active"].includes(mode)) { - sessionAgent.serverWorker.sendMonitorAction("debug", { mode, recipient }); - res.send(`Your request was successful: the server is ${mode === "active" ? "creating and compressing a new" : "retrieving and compressing the most recent"} back up. It will be sent to ${recipient}.`); + const response = await sessionAgent.serverWorker.sendMonitorAction("debug", { mode, recipient }, true); + if (response instanceof Error) { + res.send(response); + } else { + res.send(`Your request was successful: the server ${mode === "active" ? "created and compressed a new" : "retrieved and compressed the most recent"} back up. It was sent to ${recipient}.`); + } } else { res.send(`Your request failed. '${mode}' is not a valid mode: please choose either 'active' or 'passive'`); } @@ -42,9 +46,13 @@ export default class SessionManager extends ApiManager { register({ method: Method.GET, subscription: this.secureSubscriber("backup"), - secureHandler: this.authorizedAction(({ res }) => { - sessionAgent.serverWorker.sendMonitorAction("backup"); - res.send(`Your request was successful: the server is creating a new back up.`); + secureHandler: this.authorizedAction(async ({ res }) => { + const response = await sessionAgent.serverWorker.sendMonitorAction("backup"); + if (response instanceof Error) { + res.send(response); + } else { + res.send("Your request was successful: the server successfully created a new back up."); + } }) }); -- cgit v1.2.3-70-g09d2 From 7741fd9cc135f94fbc1b68d89d68e38c93648f33 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Fri, 10 Jan 2020 14:34:22 -0500 Subject: created multicolumn view file, made recipient parameter optional in sessionmanager --- src/client/views/CollectionMulticolumnView.tsx | 25 +++++++++++++++++++++++++ src/server/ApiManagers/SessionManager.ts | 6 ++++-- src/server/DashSession/DashSessionAgent.ts | 25 ++++++++++++++++--------- 3 files changed, 45 insertions(+), 11 deletions(-) create mode 100644 src/client/views/CollectionMulticolumnView.tsx (limited to 'src/server/ApiManagers') diff --git a/src/client/views/CollectionMulticolumnView.tsx b/src/client/views/CollectionMulticolumnView.tsx new file mode 100644 index 000000000..8f0ffd3d0 --- /dev/null +++ b/src/client/views/CollectionMulticolumnView.tsx @@ -0,0 +1,25 @@ +import { observer } from 'mobx-react'; +import { makeInterface } from '../../new_fields/Schema'; +import { documentSchema } from '../../new_fields/documentSchemas'; +import { CollectionSubView } from './collections/CollectionSubView'; +import { DragManager } from '../util/DragManager'; + +type MulticolumnDocument = makeInterface<[typeof documentSchema]>; +const MulticolumnDocument = makeInterface(documentSchema); + +@observer +export default class CollectionMulticolumnView extends CollectionSubView(MulticolumnDocument) { + + private _dropDisposer?: DragManager.DragDropDisposer; + protected createDropTarget = (ele: HTMLDivElement) => { //used for stacking and masonry view + this._dropDisposer && this._dropDisposer(); + if (ele) { + this._dropDisposer = DragManager.MakeDropTarget(ele, this.drop.bind(this)); + } + } + + render() { + return null; + } + +} \ No newline at end of file diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts index 6782643bc..21103fdd5 100644 --- a/src/server/ApiManagers/SessionManager.ts +++ b/src/server/ApiManagers/SessionManager.ts @@ -2,6 +2,7 @@ import ApiManager, { Registration } from "./ApiManager"; import { Method, _permission_denied, AuthorizedCore, SecureHandler } from "../RouteManager"; import RouteSubscriber from "../RouteSubscriber"; import { sessionAgent } from ".."; +import { DashSessionAgent } from "../DashSession/DashSessionAgent"; const permissionError = "You are not authorized!"; @@ -27,10 +28,11 @@ export default class SessionManager extends ApiManager { register({ method: Method.GET, - subscription: this.secureSubscriber("debug", "mode", "recipient"), + subscription: this.secureSubscriber("debug", "mode", "recipient?"), secureHandler: this.authorizedAction(async ({ req, res }) => { - const { mode, recipient } = req.params; + const { mode } = req.params; if (["passive", "active"].includes(mode)) { + const recipient = req.params.recipient || DashSessionAgent.notificationRecipient; const response = await sessionAgent.serverWorker.sendMonitorAction("debug", { mode, recipient }, true); if (response instanceof Error) { res.send(response); diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index 8061da1ca..f3f0a3c3d 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -19,7 +19,6 @@ import { ServerWorker } from "../session/agents/server_worker"; */ export class DashSessionAgent extends AppliedSessionAgent { - private readonly notificationRecipients = ["brownptcdash@gmail.com"]; private readonly signature = "-Dash Server Session Manager"; private readonly releaseDesktop = pathFromRoot("../../Desktop"); @@ -83,14 +82,15 @@ export class DashSessionAgent extends AppliedSessionAgent { */ private dispatchSessionPassword = async (key: string) => { const { mainLog } = this.sessionMonitor; + const { notificationRecipient } = DashSessionAgent; mainLog(green("dispatching session key...")); - const failures = await Email.dispatchAll({ - to: this.notificationRecipients, + const error = await Email.dispatch({ + to: notificationRecipient, subject: "Dash Release Session Admin Authentication Key", content: `The key for this session (started @ ${new Date().toUTCString()}) is ${key}.\n\n${this.signature}` }); - if (failures) { - failures.map(({ recipient, error: { message } }) => this.sessionMonitor.mainLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`))); + if (error) { + this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`)); mainLog(red("distribution of session key experienced errors")); } else { mainLog(green("successfully distributed session key to recipients")); @@ -102,13 +102,14 @@ export class DashSessionAgent extends AppliedSessionAgent { */ private dispatchCrashReport = async (crashCause: Error) => { const { mainLog } = this.sessionMonitor; - const failures = await Email.dispatchAll({ - to: this.notificationRecipients, + const { notificationRecipient } = DashSessionAgent; + const error = await Email.dispatch({ + to: notificationRecipient, subject: "Dash Web Server Crash", content: this.generateCrashInstructions(crashCause) }); - if (failures) { - failures.map(({ recipient, error: { message } }) => this.sessionMonitor.mainLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`))); + if (error) { + this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`)); mainLog(red("distribution of crash notification experienced errors")); } else { mainLog(green("successfully distributed crash notification to recipients")); @@ -210,4 +211,10 @@ export class DashSessionAgent extends AppliedSessionAgent { } } +} + +export namespace DashSessionAgent { + + export const notificationRecipient = "brownptcdash@gmail.com"; + } \ No newline at end of file -- cgit v1.2.3-70-g09d2 From 27c93abd49ca8a519d2aa3cf7938434fe25947d7 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sat, 11 Jan 2020 09:54:48 -0500 Subject: extends message, removed duplicate handlers, IPC streamlined --- src/server/ApiManagers/SessionManager.ts | 4 +- src/server/DashSession/DashSessionAgent.ts | 15 ++-- src/server/session/agents/applied_session_agent.ts | 8 +- src/server/session/agents/message_router.ts | 45 +++++++++++ src/server/session/agents/monitor.ts | 30 +++---- src/server/session/agents/server_worker.ts | 17 ++-- src/server/session/utilities/ipc.ts | 93 ++++++++-------------- 7 files changed, 116 insertions(+), 96 deletions(-) create mode 100644 src/server/session/agents/message_router.ts (limited to 'src/server/ApiManagers') diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts index 21103fdd5..91ef7e298 100644 --- a/src/server/ApiManagers/SessionManager.ts +++ b/src/server/ApiManagers/SessionManager.ts @@ -33,7 +33,7 @@ export default class SessionManager extends ApiManager { const { mode } = req.params; if (["passive", "active"].includes(mode)) { const recipient = req.params.recipient || DashSessionAgent.notificationRecipient; - const response = await sessionAgent.serverWorker.sendMonitorAction("debug", { mode, recipient }, true); + const response = await sessionAgent.serverWorker.emitToMonitor("debug", { mode, recipient }, true); if (response instanceof Error) { res.send(response); } else { @@ -49,7 +49,7 @@ export default class SessionManager extends ApiManager { method: Method.GET, subscription: this.secureSubscriber("backup"), secureHandler: this.authorizedAction(async ({ res }) => { - const response = await sessionAgent.serverWorker.sendMonitorAction("backup"); + const response = await sessionAgent.serverWorker.emitToMonitor("backup"); if (response instanceof Error) { res.send(response); } else { diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index 0d9486757..b7e741525 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -11,6 +11,7 @@ import { resolve } from "path"; import { AppliedSessionAgent, ExitHandler } from "../session/agents/applied_session_agent"; import { Monitor } from "../session/agents/monitor"; import { ServerWorker } from "../session/agents/server_worker"; +import { Message } from "../session/utilities/ipc"; /** * If we're the monitor (master) thread, we should launch the monitor logic for the session. @@ -26,14 +27,14 @@ export class DashSessionAgent extends AppliedSessionAgent { * The core method invoked when the single master thread is initialized. * Installs event hooks, repl commands and additional IPC listeners. */ - protected async initializeMonitor(monitor: Monitor) { + protected async initializeMonitor(monitor: Monitor, sessionKey: string) { + await this.dispatchSessionPassword(sessionKey); monitor.addReplCommand("pull", [], () => monitor.exec("git pull")); monitor.addReplCommand("solr", [/start|stop|index/], this.executeSolrCommand); monitor.addReplCommand("backup", [], this.backup); monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.addServerMessageListener("backup", this.backup); - monitor.addServerMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.onKeyGenerated(this.dispatchSessionPassword); + monitor.addMessageListener("backup", this.backup); + monitor.addMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); monitor.onCrashDetected(this.dispatchCrashReport); } @@ -80,14 +81,14 @@ export class DashSessionAgent extends AppliedSessionAgent { * This sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone * to kill the server via the /kill/:key route. */ - private dispatchSessionPassword = async (key: string) => { + private dispatchSessionPassword = async (sessionKey: string) => { const { mainLog } = this.sessionMonitor; const { notificationRecipient } = DashSessionAgent; mainLog(green("dispatching session key...")); const error = await Email.dispatch({ to: notificationRecipient, subject: "Dash Release Session Admin Authentication Key", - content: `The key for this session (started @ ${new Date().toUTCString()}) is ${key}.\n\n${this.signature}` + content: `The key for this session (started @ ${new Date().toUTCString()}) is ${sessionKey}.\n\n${this.signature}` }); if (error) { this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`)); @@ -100,7 +101,7 @@ export class DashSessionAgent extends AppliedSessionAgent { /** * This sends an email with the generated crash report. */ - private dispatchCrashReport = async (crashCause: Error) => { + private dispatchCrashReport = async ({ args: { error: crashCause } }: Message) => { const { mainLog } = this.sessionMonitor; const { notificationRecipient } = DashSessionAgent; const error = await Email.dispatch({ diff --git a/src/server/session/agents/applied_session_agent.ts b/src/server/session/agents/applied_session_agent.ts index 53293d3bf..48226dab6 100644 --- a/src/server/session/agents/applied_session_agent.ts +++ b/src/server/session/agents/applied_session_agent.ts @@ -1,6 +1,7 @@ import { isMaster } from "cluster"; import { Monitor } from "./monitor"; import { ServerWorker } from "./server_worker"; +import { Utils } from "../../../Utils"; export type ExitHandler = (reason: Error | boolean) => void | Promise; @@ -8,7 +9,7 @@ export abstract class AppliedSessionAgent { // the following two methods allow the developer to create a custom // session and use the built in customization options for each thread - protected abstract async initializeMonitor(monitor: Monitor): Promise; + protected abstract async initializeMonitor(monitor: Monitor, key: string): Promise; protected abstract async initializeServerWorker(): Promise; private launched = false; @@ -21,7 +22,7 @@ export abstract class AppliedSessionAgent { private sessionMonitorRef: Monitor | undefined; public get sessionMonitor(): Monitor { if (!isMaster) { - this.serverWorker.sendMonitorAction("kill", { + this.serverWorker.emitToMonitor("kill", { graceful: false, reason: "Cannot access the session monitor directly from the server worker thread.", errorCode: 1 @@ -43,7 +44,8 @@ export abstract class AppliedSessionAgent { if (!this.launched) { this.launched = true; if (isMaster) { - await this.initializeMonitor(this.sessionMonitorRef = Monitor.Create()); + const sessionKey = Utils.GenerateGuid(); + await this.initializeMonitor(this.sessionMonitorRef = Monitor.Create(sessionKey), sessionKey); this.sessionMonitorRef.finalize(); } else { this.serverWorkerRef = await this.initializeServerWorker(); diff --git a/src/server/session/agents/message_router.ts b/src/server/session/agents/message_router.ts new file mode 100644 index 000000000..5848e27ab --- /dev/null +++ b/src/server/session/agents/message_router.ts @@ -0,0 +1,45 @@ +import { MessageHandler, Message } from "../utilities/ipc"; + +export default abstract class MessageRouter { + + private onMessage: { [name: string]: MessageHandler[] | undefined } = {}; + + /** + * Add a listener at this message. When the monitor process + * receives a message, it will invoke all registered functions. + */ + public addMessageListener = (name: string, handler: MessageHandler, exclusive = false) => { + const handlers = this.onMessage[name]; + if (exclusive || !handlers) { + this.onMessage[name] = [handler]; + } else { + handlers.push(handler); + } + } + + /** + * Unregister a given listener at this message. + */ + public removeMessageListener = (name: string, handler: MessageHandler) => { + const handlers = this.onMessage[name]; + if (handlers) { + const index = handlers.indexOf(handler); + if (index > -1) { + handlers.splice(index, 1); + } + } + } + + /** + * Unregister all listeners at this message. + */ + public clearMessageListeners = (...names: string[]) => names.map(name => this.onMessage[name] = undefined); + + protected route: MessageHandler = async ({ name, args }) => { + const handlers = this.onMessage[name]; + if (handlers) { + await Promise.all(handlers.map(handler => handler({ name, args }))); + } + } + +} \ No newline at end of file diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index 18fa6df24..96f1f8130 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -2,20 +2,19 @@ import { ExitHandler } from "./applied_session_agent"; import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config"; import Repl, { ReplAction } from "../utilities/repl"; import { isWorker, setupMaster, on, Worker, fork } from "cluster"; -import { PromisifiedIPCManager, suffix, IPC, Message } from "../utilities/ipc"; +import { PromisifiedIPCManager, suffix, IPC, MessageHandler, Message } from "../utilities/ipc"; import { red, cyan, white, yellow, blue } from "colors"; import { exec, ExecOptions } from "child_process"; -import { Utils } from "../../../Utils"; import { validate, ValidationError } from "jsonschema"; import { Utilities } from "../utilities/utilities"; import { readFileSync } from "fs"; -import { EventEmitter } from "events"; +import MessageRouter from "./message_router"; /** * Validates and reads the configuration file, accordingly builds a child process factory * and spawns off an initial process that will respawn as predecessors die. */ -export class Monitor extends EventEmitter { +export class Monitor extends MessageRouter { private static IPCManager: PromisifiedIPCManager; private static count = 0; private finalized = false; @@ -25,7 +24,7 @@ export class Monitor extends EventEmitter { private key: string | undefined; private repl: Repl; - public static Create() { + public static Create(sessionKey: string) { if (isWorker) { IPC(process).emit("kill", { reason: "cannot create a monitor on the worker process.", @@ -37,13 +36,12 @@ export class Monitor extends EventEmitter { console.error(red("cannot create more than one monitor.")); process.exit(1); } else { - return new Monitor(); + return new Monitor(sessionKey); } } - public onCrashDetected = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener); - public onKeyGenerated = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.KeyGenerated, listener); - public onServerRunning = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener); + public onCrashDetected = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.CrashDetected, listener); + public onServerRunning = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.ServerRunning, listener); /** * Kill this session and its active child @@ -93,10 +91,10 @@ export class Monitor extends EventEmitter { }); } - private constructor() { + private constructor(sessionKey: string) { super(); - console.log(this.timestamp(), cyan("initializing session...")); + this.key = sessionKey; this.config = this.loadAndValidateConfiguration(); // determines whether or not we see the compilation / initialization / runtime output of each child server process @@ -131,7 +129,6 @@ export class Monitor extends EventEmitter { throw new Error("Session monitor is already finalized"); } this.finalized = true; - this.emit(Monitor.IntrinsicEvents.KeyGenerated, this.key = Utils.GenerateGuid()); this.spawn(); } @@ -284,11 +281,10 @@ export class Monitor extends EventEmitter { Monitor.IPCManager = IPC(this.activeWorker); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); - const { addMessageListener } = Monitor.IPCManager; - addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode)); - addMessageListener(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, ({ args: { error } }) => this.emit(Monitor.IntrinsicEvents.CrashDetected, error)); - addMessageListener(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, ({ args: { firstTime } }) => this.emit(Monitor.IntrinsicEvents.ServerRunning, firstTime)); - addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`)); + this.addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true); + this.addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); + + Monitor.IPCManager.setRouter(this.route); } } diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index 9e471366a..01e1cf971 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -1,6 +1,7 @@ import { ExitHandler } from "./applied_session_agent"; import { isMaster } from "cluster"; import { PromisifiedIPCManager } from "../utilities/ipc"; +import MessageRouter from "./message_router"; import { red, green, white, yellow } from "colors"; import { get } from "request-promise"; import { Monitor } from "./monitor"; @@ -10,7 +11,7 @@ import { Monitor } from "./monitor"; * if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification * email if the server encounters an uncaught exception or if the server cannot be reached. */ -export class ServerWorker { +export class ServerWorker extends MessageRouter { private static IPCManager = new PromisifiedIPCManager(process); private static count = 0; private shouldServerBeResponsive = false; @@ -58,6 +59,7 @@ export class ServerWorker { public emitToMonitor = (name: string, args?: any, expectResponse = false) => ServerWorker.IPCManager.emit(name, args, expectResponse); private constructor(work: Function) { + super(); this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env; @@ -76,10 +78,13 @@ export class ServerWorker { * server process. */ private configureProcess = () => { + ServerWorker.IPCManager.setRouter(this.route); // updates the local values of variables to the those sent from master - const { addMessageListener } = ServerWorker.IPCManager; - addMessageListener("updatePollingInterval", ({ args }) => this.pollingIntervalSeconds = args.newPollingIntervalSeconds); - addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => { + this.addMessageListener("updatePollingInterval", ({ args }) => { + this.pollingIntervalSeconds = args.newPollingIntervalSeconds; + return new Promise(resolve => setTimeout(resolve, 1000 * 10)); + }); + this.addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => { await this.executeExitHandlers(isSessionEnd); process.exit(0); }); @@ -110,7 +115,7 @@ export class ServerWorker { private proactiveUnplannedExit = async (error: Error): Promise => { this.shouldServerBeResponsive = false; // communicates via IPC to the master thread that it should dispatch a crash notification email - this.emitToMonitor(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, { error }); + this.emitToMonitor(Monitor.IntrinsicEvents.CrashDetected, { error }); await this.executeExitHandlers(error); // notify master thread (which will log update in the console) of crash event via IPC this.lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`)); @@ -130,7 +135,7 @@ export class ServerWorker { if (!this.shouldServerBeResponsive) { // notify monitor thread that the server is up and running this.lifecycleNotification(green(`listening on ${this.serverPort}...`)); - this.emitToMonitor(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, { firstTime: !this.isInitialized }); + this.emitToMonitor(Monitor.IntrinsicEvents.ServerRunning, { firstTime: !this.isInitialized }); this.isInitialized = true; } this.shouldServerBeResponsive = true; diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts index 2faf9f63e..37aaa6757 100644 --- a/src/server/session/utilities/ipc.ts +++ b/src/server/session/utilities/ipc.ts @@ -8,95 +8,66 @@ export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix; export interface Message { name: string; - args: any; + args?: any; } +type InternalMessage = Message & { metadata: any }; -export type MessageHandler = (message: Message) => any | Promise; +export type MessageHandler = (message: T) => any | Promise; export class PromisifiedIPCManager { - private onMessage: { [message: string]: MessageHandler[] | undefined } = {}; private readonly target: IPCTarget; private readonly ipc_id = `ipc_id_${suffix}`; - private readonly response_expected = `response_expected_${suffix}`; private readonly is_response = `is_response_${suffix}`; constructor(target: IPCTarget) { this.target = target; - - this.target.addListener("message", async ({ name, args }: Message) => { - let error: Error | undefined; - try { - const handlers = this.onMessage[name]; - if (handlers) { - await Promise.all(handlers.map(handler => handler({ name, args }))); - } - } catch (e) { - error = e; - } - if (args[this.response_expected] && this.target.send) { - const response: any = { error }; - response[this.ipc_id] = args[this.ipc_id]; - response[this.is_response] = true; - this.target.send(response); - } - }); - } - - /** - * Add a listener at this message. When the monitor process - * receives a message, it will invoke all registered functions. - */ - public addMessageListener = (name: string, handler: MessageHandler) => { - const handlers = this.onMessage[name]; - if (handlers) { - handlers.push(handler); - } else { - this.onMessage[name] = [handler]; - } - } - - /** - * Unregister a given listener at this message. - */ - public removeMessageListener = (name: string, handler: MessageHandler) => { - const handlers = this.onMessage[name]; - if (handlers) { - const index = handlers.indexOf(handler); - if (index > -1) { - handlers.splice(index, 1); - } - } } - /** - * Unregister all listeners at this message. - */ - public clearMessageListeners = (message: string) => this.onMessage[message] = undefined; - - public emit = async (name: string, args: any, expectResponse = false): Promise => { + public emit = async (name: string, args?: any, expectResponse = false): Promise => { if (!this.target.send) { return new Error("Cannot dispatch when send is undefined."); } - args[this.response_expected] = expectResponse; if (expectResponse) { return new Promise(resolve => { const messageId = Utils.GenerateGuid(); - args[this.ipc_id] = messageId; - const responseHandler: (args: any) => void = response => { - const { error } = response; - if (response[this.is_response] && response[this.ipc_id] === messageId) { + const metadata: any = {}; + metadata[this.ipc_id] = messageId; + const responseHandler: MessageHandler = ({ args, metadata }) => { + if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) { + const { error } = args; this.target.removeListener("message", responseHandler); resolve(error); } }; this.target.addListener("message", responseHandler); - this.target.send!({ name, args }); + this.target.send?.({ name, args, metadata }); }); } else { - this.target.send({ name, args }); + this.target.send?.({ name, args }); } } + public setRouter = (router: Router) => { + this.target.addListener("message", async ({ name, args, metadata }: InternalMessage) => { + if (name && (!metadata || !metadata[this.is_response])) { + let error: Error | undefined; + try { + await router({ name, args }); + } catch (e) { + error = e; + } + if (metadata && this.target.send) { + metadata[this.is_response] = true; + this.target.send({ + name, + args: { error }, + metadata + }); + } + } + }); + } + } export function IPC(target: IPCTarget) { -- cgit v1.2.3-70-g09d2 From 86f1e0f58940904b8c55284f6787e7422a6665ff Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sat, 11 Jan 2020 13:42:06 -0500 Subject: refactor --- src/server/ApiManagers/SessionManager.ts | 8 ++-- src/server/DashSession/DashSessionAgent.ts | 8 ++-- src/server/session/agents/message_router.ts | 45 --------------------- src/server/session/agents/monitor.ts | 17 ++++---- .../session/agents/process_message_router.ts | 46 ++++++++++++++++++++++ src/server/session/agents/server_worker.ts | 18 +++++---- src/server/session/utilities/ipc.ts | 39 ++++++++---------- 7 files changed, 89 insertions(+), 92 deletions(-) delete mode 100644 src/server/session/agents/message_router.ts create mode 100644 src/server/session/agents/process_message_router.ts (limited to 'src/server/ApiManagers') diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts index 91ef7e298..4513752a6 100644 --- a/src/server/ApiManagers/SessionManager.ts +++ b/src/server/ApiManagers/SessionManager.ts @@ -8,16 +8,16 @@ const permissionError = "You are not authorized!"; export default class SessionManager extends ApiManager { - private secureSubscriber = (root: string, ...params: string[]) => new RouteSubscriber(root).add("password", ...params); + private secureSubscriber = (root: string, ...params: string[]) => new RouteSubscriber(root).add("sessionKey", ...params); private authorizedAction = (handler: SecureHandler) => { return (core: AuthorizedCore) => { const { req, res, isRelease } = core; - const { password } = req.params; + const { sessionKey } = req.params; if (!isRelease) { return res.send("This can be run only on the release server."); } - if (password !== process.env.session_key) { + if (sessionKey !== process.env.session_key) { return _permission_denied(res, permissionError); } return handler(core); @@ -33,7 +33,7 @@ export default class SessionManager extends ApiManager { const { mode } = req.params; if (["passive", "active"].includes(mode)) { const recipient = req.params.recipient || DashSessionAgent.notificationRecipient; - const response = await sessionAgent.serverWorker.emitToMonitor("debug", { mode, recipient }, true); + const response = await sessionAgent.serverWorker.emitToMonitorPromise("debug", { mode, recipient }); if (response instanceof Error) { res.send(response); } else { diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index 23d421835..de8e7240f 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -11,7 +11,7 @@ import { resolve } from "path"; import { AppliedSessionAgent, ExitHandler } from "../session/agents/applied_session_agent"; import { Monitor } from "../session/agents/monitor"; import { ServerWorker } from "../session/agents/server_worker"; -import { Message } from "../session/utilities/ipc"; +import { MessageHandler } from "../session/utilities/ipc"; /** * If we're the monitor (master) thread, we should launch the monitor logic for the session. @@ -34,8 +34,8 @@ export class DashSessionAgent extends AppliedSessionAgent { monitor.addReplCommand("backup", [], this.backup); monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient)); monitor.on("backup", this.backup); - monitor.on("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.hooks.crashDetected(this.dispatchCrashReport); + monitor.on("debug", ({ mode, recipient }) => this.dispatchZippedDebugBackup(mode, recipient)); + monitor.coreHooks.onCrashDetected(this.dispatchCrashReport); } /** @@ -101,7 +101,7 @@ export class DashSessionAgent extends AppliedSessionAgent { /** * This sends an email with the generated crash report. */ - private dispatchCrashReport = async ({ args: { error: crashCause } }: Message) => { + private dispatchCrashReport: MessageHandler<{ error: Error }> = async ({ error: crashCause }) => { const { mainLog } = this.sessionMonitor; const { notificationRecipient } = DashSessionAgent; const error = await Email.dispatch({ diff --git a/src/server/session/agents/message_router.ts b/src/server/session/agents/message_router.ts deleted file mode 100644 index 707f771d9..000000000 --- a/src/server/session/agents/message_router.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { MessageHandler, Message } from "../utilities/ipc"; - -export default abstract class MessageRouter { - - private onMessage: { [name: string]: MessageHandler[] | undefined } = {}; - - /** - * Add a listener at this message. When the monitor process - * receives a message, it will invoke all registered functions. - */ - public on = (name: string, handler: MessageHandler, exclusive = false) => { - const handlers = this.onMessage[name]; - if (exclusive || !handlers) { - this.onMessage[name] = [handler]; - } else { - handlers.push(handler); - } - } - - /** - * Unregister a given listener at this message. - */ - public off = (name: string, handler: MessageHandler) => { - const handlers = this.onMessage[name]; - if (handlers) { - const index = handlers.indexOf(handler); - if (index > -1) { - handlers.splice(index, 1); - } - } - } - - /** - * Unregister all listeners at this message. - */ - public clearMessageListeners = (...names: string[]) => names.map(name => this.onMessage[name] = undefined); - - protected route: MessageHandler = async ({ name, args }) => { - const handlers = this.onMessage[name]; - if (handlers) { - await Promise.all(handlers.map(handler => handler({ name, args }))); - } - } - -} \ No newline at end of file diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index ccba8199e..d4abbb51e 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -8,14 +8,13 @@ import { exec, ExecOptions } from "child_process"; import { validate, ValidationError } from "jsonschema"; import { Utilities } from "../utilities/utilities"; import { readFileSync } from "fs"; -import MessageRouter from "./message_router"; +import ProcessMessageRouter from "./process_message_router"; /** * Validates and reads the configuration file, accordingly builds a child process factory * and spawns off an initial process that will respawn as predecessors die. */ -export class Monitor extends MessageRouter { - private static IPCManager: PromisifiedIPCManager; +export class Monitor extends ProcessMessageRouter { private static count = 0; private finalized = false; private exitHandlers: ExitHandler[] = []; @@ -84,9 +83,9 @@ export class Monitor extends MessageRouter { this.spawn(); } - public readonly hooks = Object.freeze({ - crashDetected: (listener: MessageHandler<{ error: Error }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener), - serverRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener) + public readonly coreHooks = Object.freeze({ + onCrashDetected: (listener: MessageHandler<{ error: Error }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener), + onServerRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener) }); /** @@ -219,7 +218,7 @@ export class Monitor extends MessageRouter { if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) { this.config.polling.intervalSeconds = newPollingIntervalSeconds; if (args[2] === "true") { - return Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds }, true); + return Monitor.IPCManager.emitPromise("updatePollingInterval", { newPollingIntervalSeconds }); } } } @@ -286,8 +285,8 @@ export class Monitor extends MessageRouter { Monitor.IPCManager = IPC(this.activeWorker); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); - this.on("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true); - this.on("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); + this.on("kill", ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode), true); + this.on("lifecycle", ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); Monitor.IPCManager.setRouter(this.route); } diff --git a/src/server/session/agents/process_message_router.ts b/src/server/session/agents/process_message_router.ts new file mode 100644 index 000000000..f60343514 --- /dev/null +++ b/src/server/session/agents/process_message_router.ts @@ -0,0 +1,46 @@ +import { MessageHandler, PromisifiedIPCManager } from "../utilities/ipc"; + +export default abstract class ProcessMessageRouter { + + protected static IPCManager: PromisifiedIPCManager; + private onMessage: { [name: string]: MessageHandler[] | undefined } = {}; + + /** + * Add a listener at this message. When the monitor process + * receives a message, it will invoke all registered functions. + */ + public on = (name: string, handler: MessageHandler, exclusive = false) => { + const handlers = this.onMessage[name]; + if (exclusive || !handlers) { + this.onMessage[name] = [handler]; + } else { + handlers.push(handler); + } + } + + /** + * Unregister a given listener at this message. + */ + public off = (name: string, handler: MessageHandler) => { + const handlers = this.onMessage[name]; + if (handlers) { + const index = handlers.indexOf(handler); + if (index > -1) { + handlers.splice(index, 1); + } + } + } + + /** + * Unregister all listeners at this message. + */ + public clearMessageListeners = (...names: string[]) => names.map(name => this.onMessage[name] = undefined); + + protected route: MessageHandler = async ({ name, args }) => { + const handlers = this.onMessage[name]; + if (handlers) { + await Promise.all(handlers.map(handler => handler(args))); + } + } + +} \ No newline at end of file diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index 50abe398d..23ffb2650 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -1,7 +1,7 @@ import { ExitHandler } from "./applied_session_agent"; import { isMaster } from "cluster"; -import { PromisifiedIPCManager, Message } from "../utilities/ipc"; -import MessageRouter from "./message_router"; +import { PromisifiedIPCManager, Message, MessageHandler } from "../utilities/ipc"; +import ProcessMessageRouter from "./process_message_router"; import { red, green, white, yellow } from "colors"; import { get } from "request-promise"; import { Monitor } from "./monitor"; @@ -11,8 +11,7 @@ import { Monitor } from "./monitor"; * if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification * email if the server encounters an uncaught exception or if the server cannot be reached. */ -export class ServerWorker extends MessageRouter { - private static IPCManager = new PromisifiedIPCManager(process); +export class ServerWorker extends ProcessMessageRouter { private static count = 0; private shouldServerBeResponsive = false; private exitHandlers: ExitHandler[] = []; @@ -56,10 +55,13 @@ export class ServerWorker extends MessageRouter { * A convenience wrapper to tell the session monitor (parent process) * to carry out the action with the specified message and arguments. */ - public emitToMonitor = (name: string, args?: any, awaitResponse = false) => ServerWorker.IPCManager.emit(name, args, awaitResponse); + public emitToMonitor = (name: string, args?: any) => ServerWorker.IPCManager.emit(name, args); + + public emitToMonitorPromise = (name: string, args?: any) => ServerWorker.IPCManager.emitPromise(name, args); private constructor(work: Function) { super(); + ServerWorker.IPCManager = new PromisifiedIPCManager(process); this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env; @@ -80,11 +82,11 @@ export class ServerWorker extends MessageRouter { private configureProcess = () => { ServerWorker.IPCManager.setRouter(this.route); // updates the local values of variables to the those sent from master - this.on("updatePollingInterval", ({ args }: Message<{ newPollingIntervalSeconds: number }>) => { - this.pollingIntervalSeconds = args.newPollingIntervalSeconds; + this.on("updatePollingInterval", ({ newPollingIntervalSeconds }) => { + this.pollingIntervalSeconds = newPollingIntervalSeconds; return new Promise(resolve => setTimeout(resolve, 1000 * 10)); }); - this.on("manualExit", async ({ args: { isSessionEnd } }: Message<{ isSessionEnd: boolean }>) => { + this.on("manualExit", async ({ isSessionEnd }) => { await this.executeExitHandlers(isSessionEnd); process.exit(0); }); diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts index 7ad00596d..db4c23180 100644 --- a/src/server/session/utilities/ipc.ts +++ b/src/server/session/utilities/ipc.ts @@ -11,7 +11,7 @@ export interface Message { args: T; } type InternalMessage = Message & { metadata: any }; -export type MessageHandler = Message> = (message: T) => any | Promise; +export type MessageHandler = (message: T) => any | Promise; export class PromisifiedIPCManager { private readonly target: IPCTarget; @@ -22,27 +22,22 @@ export class PromisifiedIPCManager { this.target = target; } - public emit = async (name: string, args?: any, awaitResponse = false): Promise => { - if (!this.target.send) { - return new Error("Cannot dispatch when send is undefined."); - } - if (awaitResponse) { - return new Promise(resolve => { - const messageId = Utils.GenerateGuid(); - const metadata: any = {}; - metadata[this.ipc_id] = messageId; - const responseHandler: MessageHandler = ({ metadata, args }) => { - if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) { - this.target.removeListener("message", responseHandler); - resolve(args?.error as Error | undefined); - } - }; - this.target.addListener("message", responseHandler); - this.target.send?.({ name, args, metadata }); - }); - } else { - this.target.send?.({ name, args }); - } + public emit = async (name: string, args?: any) => this.target.send?.({ name, args }); + + public emitPromise = async (name: string, args?: any) => { + return new Promise(resolve => { + const messageId = Utils.GenerateGuid(); + const metadata: any = {}; + metadata[this.ipc_id] = messageId; + const responseHandler: MessageHandler = ({ metadata, args }) => { + if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) { + this.target.removeListener("message", responseHandler); + resolve(args?.error as Error | undefined); + } + }; + this.target.addListener("message", responseHandler); + this.target.send?.({ name, args, metadata }); + }); } public setRouter = (router: Router) => { -- cgit v1.2.3-70-g09d2 From f1a5faed19cc3f924a9304fd0bc4a1b3bc655bf8 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sat, 11 Jan 2020 14:06:18 -0500 Subject: made mode optional param --- src/server/ApiManagers/SessionManager.ts | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'src/server/ApiManagers') diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts index 4513752a6..a40b86dc5 100644 --- a/src/server/ApiManagers/SessionManager.ts +++ b/src/server/ApiManagers/SessionManager.ts @@ -28,10 +28,13 @@ export default class SessionManager extends ApiManager { register({ method: Method.GET, - subscription: this.secureSubscriber("debug", "mode", "recipient?"), + subscription: this.secureSubscriber("debug", "mode?", "recipient?"), secureHandler: this.authorizedAction(async ({ req, res }) => { - const { mode } = req.params; - if (["passive", "active"].includes(mode)) { + let { mode } = req.params; + if (mode && !["passive", "active"].includes(mode)) { + res.send(`Your request failed. '${mode}' is not a valid mode: please choose either 'active' or 'passive'`); + } else { + !mode && (mode = "active"); const recipient = req.params.recipient || DashSessionAgent.notificationRecipient; const response = await sessionAgent.serverWorker.emitToMonitorPromise("debug", { mode, recipient }); if (response instanceof Error) { @@ -39,8 +42,6 @@ export default class SessionManager extends ApiManager { } else { res.send(`Your request was successful: the server ${mode === "active" ? "created and compressed a new" : "retrieved and compressed the most recent"} back up. It was sent to ${recipient}.`); } - } else { - res.send(`Your request failed. '${mode}' is not a valid mode: please choose either 'active' or 'passive'`); } }) }); -- cgit v1.2.3-70-g09d2 From 54a241ff71abc07a5dbdebce1b614f1024a767e6 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sat, 11 Jan 2020 15:08:26 -0500 Subject: final session cleanup --- src/server/ApiManagers/SessionManager.ts | 9 +- src/server/DashSession/DashSessionAgent.ts | 4 +- src/server/session/agents/monitor.ts | 16 ++-- .../session/agents/process_message_router.ts | 2 +- .../session/agents/promisified_ipc_manager.ts | 97 ++++++++++++++++++++++ src/server/session/agents/server_worker.ts | 10 +-- src/server/session/utilities/ipc.ts | 66 --------------- 7 files changed, 115 insertions(+), 89 deletions(-) create mode 100644 src/server/session/agents/promisified_ipc_manager.ts delete mode 100644 src/server/session/utilities/ipc.ts (limited to 'src/server/ApiManagers') diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts index a40b86dc5..d989d8d1b 100644 --- a/src/server/ApiManagers/SessionManager.ts +++ b/src/server/ApiManagers/SessionManager.ts @@ -30,13 +30,14 @@ export default class SessionManager extends ApiManager { method: Method.GET, subscription: this.secureSubscriber("debug", "mode?", "recipient?"), secureHandler: this.authorizedAction(async ({ req, res }) => { - let { mode } = req.params; + const { mode, recipient } = req.params; if (mode && !["passive", "active"].includes(mode)) { res.send(`Your request failed. '${mode}' is not a valid mode: please choose either 'active' or 'passive'`); } else { - !mode && (mode = "active"); - const recipient = req.params.recipient || DashSessionAgent.notificationRecipient; - const response = await sessionAgent.serverWorker.emitToMonitorPromise("debug", { mode, recipient }); + const response = await sessionAgent.serverWorker.emitToMonitorPromise("debug", { + mode: mode || "active", + recipient: recipient || DashSessionAgent.notificationRecipient + }); if (response instanceof Error) { res.send(response); } else { diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index 3c98c1e9d..fe7cdae88 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -11,7 +11,7 @@ import { resolve } from "path"; import { AppliedSessionAgent, ExitHandler } from "../session/agents/applied_session_agent"; import { Monitor } from "../session/agents/monitor"; import { ServerWorker } from "../session/agents/server_worker"; -import { MessageHandler } from "../session/utilities/ipc"; +import { MessageHandler } from "../session/agents/promisified_ipc_manager"; /** * If we're the monitor (master) thread, we should launch the monitor logic for the session. @@ -110,7 +110,7 @@ export class DashSessionAgent extends AppliedSessionAgent { content: this.generateCrashInstructions(crashCause) }); if (error) { - this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`)); + this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} ${yellow(`(${error.message})`)}`)); mainLog(red("distribution of crash notification experienced errors")); } else { mainLog(green("successfully distributed crash notification to recipients")); diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index d4abbb51e..5ea950b2b 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -2,13 +2,14 @@ import { ExitHandler } from "./applied_session_agent"; import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config"; import Repl, { ReplAction } from "../utilities/repl"; import { isWorker, setupMaster, on, Worker, fork } from "cluster"; -import { PromisifiedIPCManager, suffix, IPC, MessageHandler } from "../utilities/ipc"; +import { IPC_Promisify, MessageHandler } from "./promisified_ipc_manager"; import { red, cyan, white, yellow, blue } from "colors"; import { exec, ExecOptions } from "child_process"; import { validate, ValidationError } from "jsonschema"; import { Utilities } from "../utilities/utilities"; import { readFileSync } from "fs"; import ProcessMessageRouter from "./process_message_router"; +import { ServerWorker } from "./server_worker"; /** * Validates and reads the configuration file, accordingly builds a child process factory @@ -25,7 +26,7 @@ export class Monitor extends ProcessMessageRouter { public static Create(sessionKey: string) { if (isWorker) { - IPC(process).emit("kill", { + ServerWorker.IPCManager.emit("kill", { reason: "cannot create a monitor on the worker process.", graceful: false, errorCode: 1 @@ -210,7 +211,7 @@ export class Monitor extends ProcessMessageRouter { repl.registerCommand("exit", [/clean|force/], args => this.killSession("manual exit requested by repl", args[0] === "clean", 0)); repl.registerCommand("restart", [/clean|force/], args => this.killActiveWorker(args[0] === "clean")); repl.registerCommand("set", [letters, "port", number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === "true")); - repl.registerCommand("set", [/polling/, number, boolean], async args => { + repl.registerCommand("set", [/polling/, number, boolean], args => { const newPollingIntervalSeconds = Math.floor(Number(args[1])); if (newPollingIntervalSeconds < 0) { this.mainLog(red("the polling interval must be a non-negative integer")); @@ -218,7 +219,7 @@ export class Monitor extends ProcessMessageRouter { if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) { this.config.polling.intervalSeconds = newPollingIntervalSeconds; if (args[2] === "true") { - return Monitor.IPCManager.emitPromise("updatePollingInterval", { newPollingIntervalSeconds }); + Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds }); } } } @@ -279,16 +280,13 @@ export class Monitor extends ProcessMessageRouter { serverPort: ports.server, socketPort: ports.socket, pollingIntervalSeconds: intervalSeconds, - session_key: this.key, - ipc_suffix: suffix + session_key: this.key }); - Monitor.IPCManager = IPC(this.activeWorker); + Monitor.IPCManager = IPC_Promisify(this.activeWorker, this.route); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); this.on("kill", ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode), true); this.on("lifecycle", ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); - - Monitor.IPCManager.setRouter(this.route); } } diff --git a/src/server/session/agents/process_message_router.ts b/src/server/session/agents/process_message_router.ts index f60343514..d359e97c3 100644 --- a/src/server/session/agents/process_message_router.ts +++ b/src/server/session/agents/process_message_router.ts @@ -1,4 +1,4 @@ -import { MessageHandler, PromisifiedIPCManager } from "../utilities/ipc"; +import { MessageHandler, PromisifiedIPCManager } from "./promisified_ipc_manager"; export default abstract class ProcessMessageRouter { diff --git a/src/server/session/agents/promisified_ipc_manager.ts b/src/server/session/agents/promisified_ipc_manager.ts new file mode 100644 index 000000000..216e9be44 --- /dev/null +++ b/src/server/session/agents/promisified_ipc_manager.ts @@ -0,0 +1,97 @@ +import { Utils } from "../../../Utils"; +import { isMaster } from "cluster"; + +/** + * Convenience constructor + * @param target the process / worker to which to attach the specialized listeners + */ +export function IPC_Promisify(target: IPCTarget, router: Router) { + return new PromisifiedIPCManager(target, router); +} + +/** + * Essentially, a node process or node cluster worker + */ +export type IPCTarget = NodeJS.EventEmitter & { send?: Function }; + +/** + * Some external code that maps the name of incoming messages to registered handlers, if any + * when this returns, the message is assumed to have been handled in its entirety by the process, so + * await any asynchronous code inside this router. + */ +export type Router = (message: Message) => void | Promise; + +/** + * Specifies a general message format for this API + */ +export type Message = { name: string; args: T; }; +export type MessageHandler = (args: T) => any | Promise; + +/** + * When a message is emitted, it + */ +type InternalMessage = Message & { metadata: any }; +type InternalMessageHandler = (message: InternalMessage) => any | Promise; + +/** + * This is a wrapper utility class that allows the caller process + * to emit an event and return a promise that resolves when it and all + * other processes listening to its emission of this event have completed. + */ +export class PromisifiedIPCManager { + private readonly target: IPCTarget; + + constructor(target: IPCTarget, router: Router) { + this.target = target; + this.target.addListener("message", this.internalHandler(router)); + } + + /** + * A convenience wrapper around the standard process emission. + * Does not wait for a response. + */ + public emit = async (name: string, args?: any) => this.target.send?.({ name, args }); + + /** + * This routine uniquely identifies each message, then adds a general + * message listener that waits for a response with the same id before resolving + * the promise. + */ + public emitPromise = async (name: string, args?: any) => { + return new Promise(resolve => { + const messageId = Utils.GenerateGuid(); + const responseHandler: InternalMessageHandler = ({ metadata: { id, isResponse }, args, name }) => { + if (isResponse && id === messageId) { + this.target.removeListener("message", responseHandler); + resolve(args?.error as Error | undefined); + } + }; + this.target.addListener("message", responseHandler); + const message = { name, args, metadata: { id: messageId } }; + this.target.send?.(message); + }); + } + + /** + * This routine receives a uniquely identified message. If the message is itself a response, + * it is ignored to avoid infinite mutual responses. Otherwise, the routine awaits its completion using whatever + * router the caller has installed, and then sends a response containing the original message id, + * which will ultimately invoke the responseHandler of the original emission and resolve the + * sender's promise. + */ + private internalHandler = (router: Router) => async ({ name, args, metadata }: InternalMessage) => { + if (name && (!metadata || !metadata.isResponse)) { + let error: Error | undefined; + try { + await router({ name, args }); + } catch (e) { + error = e; + } + if (metadata && this.target.send) { + metadata.isResponse = true; + this.target.send({ name, args: { error }, metadata }); + } + } + } + +} \ No newline at end of file diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index 23ffb2650..705307030 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -1,6 +1,6 @@ import { ExitHandler } from "./applied_session_agent"; import { isMaster } from "cluster"; -import { PromisifiedIPCManager, Message, MessageHandler } from "../utilities/ipc"; +import { PromisifiedIPCManager } from "./promisified_ipc_manager"; import ProcessMessageRouter from "./process_message_router"; import { red, green, white, yellow } from "colors"; import { get } from "request-promise"; @@ -61,7 +61,7 @@ export class ServerWorker extends ProcessMessageRouter { private constructor(work: Function) { super(); - ServerWorker.IPCManager = new PromisifiedIPCManager(process); + ServerWorker.IPCManager = new PromisifiedIPCManager(process, this.route); this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env; @@ -80,12 +80,8 @@ export class ServerWorker extends ProcessMessageRouter { * server process. */ private configureProcess = () => { - ServerWorker.IPCManager.setRouter(this.route); // updates the local values of variables to the those sent from master - this.on("updatePollingInterval", ({ newPollingIntervalSeconds }) => { - this.pollingIntervalSeconds = newPollingIntervalSeconds; - return new Promise(resolve => setTimeout(resolve, 1000 * 10)); - }); + this.on("updatePollingInterval", ({ newPollingIntervalSeconds }) => this.pollingIntervalSeconds = newPollingIntervalSeconds); this.on("manualExit", async ({ isSessionEnd }) => { await this.executeExitHandlers(isSessionEnd); process.exit(0); diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts deleted file mode 100644 index c90b15907..000000000 --- a/src/server/session/utilities/ipc.ts +++ /dev/null @@ -1,66 +0,0 @@ -import { isMaster } from "cluster"; -import { Utils } from "../../../Utils"; - -export function IPC(target: IPCTarget) { - return new PromisifiedIPCManager(target); -} - -export type IPCTarget = NodeJS.EventEmitter & { send?: Function }; -export type Router = (message: Message) => void | Promise; - -export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix; - -type InternalMessage = Message & { metadata: any }; - -export interface Message { - name: string; - args: T; -} - -export type MessageHandler = (message: T) => any | Promise; - -export class PromisifiedIPCManager { - private readonly target: IPCTarget; - private readonly ipc_id = `ipc_id_${suffix}`; - private readonly is_response = `is_response_${suffix}`; - - constructor(target: IPCTarget) { - this.target = target; - } - - public emit = async (name: string, args?: any) => this.target.send?.({ name, args }); - - public emitPromise = async (name: string, args?: any) => { - return new Promise(resolve => { - const messageId = Utils.GenerateGuid(); - const metadata: any = {}; - metadata[this.ipc_id] = messageId; - const responseHandler: MessageHandler = ({ metadata, args }) => { - if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) { - this.target.removeListener("message", responseHandler); - resolve(args?.error as Error | undefined); - } - }; - this.target.addListener("message", responseHandler); - this.target.send?.({ name, args, metadata }); - }); - } - - public setRouter = (router: Router) => { - this.target.addListener("message", async ({ name, args, metadata }: InternalMessage) => { - if (name && (!metadata || !metadata[this.is_response])) { - let error: Error | undefined; - try { - await router({ name, args }); - } catch (e) { - error = e; - } - if (metadata && this.target.send) { - metadata[this.is_response] = true; - this.target.send({ name, args: { error }, metadata }); - } - } - }); - } - -} \ No newline at end of file -- cgit v1.2.3-70-g09d2