From 109abe78646c94903ef423aeb7db213087c4b92d Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Fri, 10 Jan 2020 13:57:21 -0500 Subject: event emitter, streamlined initialization --- src/server/DashSession/DashSessionAgent.ts | 213 +++++++++++++++++++++ src/server/DashSession/crash_instructions.txt | 14 ++ .../DashSession/remote_debug_instructions.txt | 16 ++ 3 files changed, 243 insertions(+) create mode 100644 src/server/DashSession/DashSessionAgent.ts create mode 100644 src/server/DashSession/crash_instructions.txt create mode 100644 src/server/DashSession/remote_debug_instructions.txt (limited to 'src/server/DashSession') diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts new file mode 100644 index 000000000..b031c177e --- /dev/null +++ b/src/server/DashSession/DashSessionAgent.ts @@ -0,0 +1,213 @@ +import { Email, pathFromRoot } from "../ActionUtilities"; +import { red, yellow, green, cyan } from "colors"; +import { get } from "request-promise"; +import { Utils } from "../../Utils"; +import { WebSocket } from "../Websocket/Websocket"; +import { MessageStore } from "../Message"; +import { launchServer, onWindows } from ".."; +import { existsSync, mkdirSync, readdirSync, statSync, createWriteStream, readFileSync } from "fs"; +import * as Archiver from "archiver"; +import { resolve } from "path"; +import { AppliedSessionAgent, ExitHandler } from "../session/agents/applied_session_agent"; +import { Monitor } from "../session/agents/monitor"; +import { ServerWorker } from "../session/agents/server_worker"; + +/** + * If we're the monitor (master) thread, we should launch the monitor logic for the session. + * Otherwise, we must be on a worker thread that was spawned *by* the monitor (master) thread, and thus + * our job should be to run the server. + */ +export class DashSessionAgent extends AppliedSessionAgent { + + private readonly notificationRecipients = ["brownptcdash@gmail.com"]; + private readonly signature = "-Dash Server Session Manager"; + private readonly releaseDesktop = pathFromRoot("../../Desktop"); + + /** + * The core method invoked when the single master thread is initialized. + * Installs event hooks, repl commands and additional IPC listeners. + */ + protected async initializeMonitor(monitor: Monitor) { + monitor.addReplCommand("pull", [], () => monitor.exec("git pull")); + monitor.addReplCommand("solr", [/start|stop|index/], this.executeSolrCommand); + monitor.addReplCommand("backup", [], this.backup); + monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient)); + monitor.addServerMessageListener("backup", this.backup); + monitor.addServerMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); + monitor.on(Monitor.IntrinsicEvents.KeyGenerated, this.dispatchSessionPassword); + monitor.on(Monitor.IntrinsicEvents.CrashDetected, this.dispatchCrashReport); + } + + /** + * The core method invoked when a server worker thread is initialized. + * Installs logic to be executed when the server worker dies. + */ + protected async initializeServerWorker() { + const worker = ServerWorker.Create(launchServer); // server initialization delegated to worker + worker.addExitHandler(this.notifyClient); + return worker; + } + + /** + * Prepares the body of the email with instructions on restoring the transmitted remote database backup locally. + */ + private _remoteDebugInstructions: string | undefined; + private generateDebugInstructions = (zipName: string, target: string) => { + if (!this._remoteDebugInstructions) { + this._remoteDebugInstructions = readFileSync(resolve(__dirname, "./remote_debug_instructions.txt"), { encoding: "utf8" }); + } + return this._remoteDebugInstructions + .replace(/__zipname__/, zipName) + .replace(/__target__/, target) + .replace(/__signature__/, this.signature); + } + + /** + * Prepares the body of the email with information regarding a crash event. + */ + private _crashInstructions: string | undefined; + private generateCrashInstructions({ name, message, stack }: Error) { + if (!this._crashInstructions) { + this._crashInstructions = readFileSync(resolve(__dirname, "./crash_instructions.txt"), { encoding: "utf8" }); + } + return this._crashInstructions + .replace(/__name__/, name || "[no error name found]") + .replace(/__message__/, message || "[no error message found]") + .replace(/__stack__/, stack || "[no error stack found]") + .replace(/__signature__/, this.signature); + } + + /** + * This sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone + * to kill the server via the /kill/:key route. + */ + private dispatchSessionPassword = async (key: string) => { + const { mainLog } = this.sessionMonitor; + mainLog(green("dispatching session key...")); + const failures = await Email.dispatchAll({ + to: this.notificationRecipients, + subject: "Dash Release Session Admin Authentication Key", + content: `The key for this session (started @ ${new Date().toUTCString()}) is ${key}.\n\n${this.signature}` + }); + if (failures) { + failures.map(({ recipient, error: { message } }) => this.sessionMonitor.mainLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`))); + mainLog(red("distribution of session key experienced errors")); + } else { + mainLog(green("successfully distributed session key to recipients")); + } + } + + /** + * This sends an email with the generated crash report. + */ + private dispatchCrashReport = async (crashCause: Error) => { + const { mainLog } = this.sessionMonitor; + const failures = await Email.dispatchAll({ + to: this.notificationRecipients, + subject: "Dash Web Server Crash", + content: this.generateCrashInstructions(crashCause) + }); + if (failures) { + failures.map(({ recipient, error: { message } }) => this.sessionMonitor.mainLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`))); + mainLog(red("distribution of crash notification experienced errors")); + } else { + mainLog(green("successfully distributed crash notification to recipients")); + } + } + + /** + * Logic for interfacing with Solr. Either starts it, + * stops it, or rebuilds its indicies. + */ + private executeSolrCommand = async (args: string[]) => { + const { exec, mainLog } = this.sessionMonitor; + const action = args[0]; + if (action === "index") { + exec("npx ts-node ./updateSearch.ts", { cwd: pathFromRoot("./src/server") }); + } else { + const command = `${onWindows ? "solr.cmd" : "solr"} ${args[0] === "start" ? "start" : "stop -p 8983"}`; + await exec(command, { cwd: "./solr-8.3.1/bin" }); + try { + await get("http://localhost:8983"); + mainLog(green("successfully connected to 8983 after running solr initialization")); + } catch { + mainLog(red("unable to connect at 8983 after running solr initialization")); + } + } + } + + /** + * Broadcast to all clients that their connection + * is no longer valid, and explain why / what to expect. + */ + private notifyClient: ExitHandler = reason => { + const { _socket } = WebSocket; + if (_socket) { + const message = typeof reason === "boolean" ? (reason ? "exit" : "temporary") : "crash"; + Utils.Emit(_socket, MessageStore.ConnectionTerminated, message); + } + } + + /** + * Performs a backup of the database, saved to the desktop subdirectory. + * This should work as is only on our specific release server. + */ + private backup = async () => this.sessionMonitor.exec("backup.bat", { cwd: this.releaseDesktop }); + + /** + * Compress either a brand new backup or the most recent backup and send it + * as an attachment to an email, dispatched to the requested recipient. + * @param mode specifies whether or not to make a new backup before exporting + * @param to the recipient of the email + */ + private async dispatchZippedDebugBackup(mode: string, to: string) { + const { mainLog } = this.sessionMonitor; + try { + // if desired, complete an immediate backup to send + if (mode === "active") { + await this.backup(); + mainLog("backup complete"); + } + + // ensure the directory for compressed backups exists + const backupsDirectory = `${this.releaseDesktop}/backups`; + const compressedDirectory = `${this.releaseDesktop}/compressed`; + if (!existsSync(compressedDirectory)) { + mkdirSync(compressedDirectory); + } + + // sort all backups by their modified time, and choose the most recent one + const target = readdirSync(backupsDirectory).map(filename => ({ + modifiedTime: statSync(`${backupsDirectory}/${filename}`).mtimeMs, + filename + })).sort((a, b) => b.modifiedTime - a.modifiedTime)[0].filename; + mainLog(`targeting ${target}...`); + + // create a zip file and to it, write the contents of the backup directory + const zipName = `${target}.zip`; + const zipPath = `${compressedDirectory}/${zipName}`; + const output = createWriteStream(zipPath); + const zip = Archiver('zip'); + zip.pipe(output); + zip.directory(`${backupsDirectory}/${target}/Dash`, false); + await zip.finalize(); + mainLog(`zip finalized with size ${statSync(zipPath).size} bytes, saved to ${zipPath}`); + + // dispatch the email to the recipient, containing the finalized zip file + const error = await Email.dispatch({ + to, + subject: `Remote debug: compressed backup of ${target}...`, + content: this.generateDebugInstructions(zipName, target), + attachments: [{ filename: zipName, path: zipPath }] + }); + + // indicate success or failure + mainLog(`${error === null ? green("successfully dispatched") : red("failed to dispatch")} ${zipName} to ${cyan(to)}`); + error && mainLog(red(error.message)); + } catch (error) { + mainLog(red("unable to dispatch zipped backup...")); + mainLog(red(error.message)); + } + } + +} \ No newline at end of file diff --git a/src/server/DashSession/crash_instructions.txt b/src/server/DashSession/crash_instructions.txt new file mode 100644 index 000000000..65417919d --- /dev/null +++ b/src/server/DashSession/crash_instructions.txt @@ -0,0 +1,14 @@ +You, as a Dash Administrator, are being notified of a server crash event. Here's what we know: + +name: +__name__ + +message: +__message__ + +stack: +__stack__ + +The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress. + +__signature__ \ No newline at end of file diff --git a/src/server/DashSession/remote_debug_instructions.txt b/src/server/DashSession/remote_debug_instructions.txt new file mode 100644 index 000000000..c279c460a --- /dev/null +++ b/src/server/DashSession/remote_debug_instructions.txt @@ -0,0 +1,16 @@ +Instructions: + +Download this attachment, open your downloads folder and find this file (__zipname__). +Right click on the zip file and select 'Extract to __target__\'. +Open up the command line, and remember that you can get the path to any file or directory by literally dragging it from the file system and dropping it onto the terminal. +Unless it's in your path, you'll want to navigate to the MongoDB bin directory, given for Windows: + +cd '/c/Program Files/MongoDB/Server/[your version, i.e. 4.0, goes here]/bin' + +Then run the following command (if you're in the bin folder, make that ./mongorestore ...): + +mongorestore --gzip [/path/to/directory/you/just/unzipped] --db Dash + +Assuming everything runs well, this will mirror your local database with that of the server. Now, just start the server locally and debug. + +__signature__ \ No newline at end of file -- cgit v1.2.3-70-g09d2 From cfba84bdfee74407b9dcbe80505f527ddb4b0433 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Fri, 10 Jan 2020 14:14:05 -0500 Subject: new folder --- src/server/DashSession/DashSessionAgent.ts | 4 ++-- src/server/DashSession/crash_instructions.txt | 14 -------------- src/server/DashSession/remote_debug_instructions.txt | 16 ---------------- src/server/DashSession/templates/crash_instructions.txt | 14 ++++++++++++++ .../DashSession/templates/remote_debug_instructions.txt | 16 ++++++++++++++++ 5 files changed, 32 insertions(+), 32 deletions(-) delete mode 100644 src/server/DashSession/crash_instructions.txt delete mode 100644 src/server/DashSession/remote_debug_instructions.txt create mode 100644 src/server/DashSession/templates/crash_instructions.txt create mode 100644 src/server/DashSession/templates/remote_debug_instructions.txt (limited to 'src/server/DashSession') diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index b031c177e..8061da1ca 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -54,7 +54,7 @@ export class DashSessionAgent extends AppliedSessionAgent { private _remoteDebugInstructions: string | undefined; private generateDebugInstructions = (zipName: string, target: string) => { if (!this._remoteDebugInstructions) { - this._remoteDebugInstructions = readFileSync(resolve(__dirname, "./remote_debug_instructions.txt"), { encoding: "utf8" }); + this._remoteDebugInstructions = readFileSync(resolve(__dirname, "./templates/remote_debug_instructions.txt"), { encoding: "utf8" }); } return this._remoteDebugInstructions .replace(/__zipname__/, zipName) @@ -68,7 +68,7 @@ export class DashSessionAgent extends AppliedSessionAgent { private _crashInstructions: string | undefined; private generateCrashInstructions({ name, message, stack }: Error) { if (!this._crashInstructions) { - this._crashInstructions = readFileSync(resolve(__dirname, "./crash_instructions.txt"), { encoding: "utf8" }); + this._crashInstructions = readFileSync(resolve(__dirname, "./templates/crash_instructions.txt"), { encoding: "utf8" }); } return this._crashInstructions .replace(/__name__/, name || "[no error name found]") diff --git a/src/server/DashSession/crash_instructions.txt b/src/server/DashSession/crash_instructions.txt deleted file mode 100644 index 65417919d..000000000 --- a/src/server/DashSession/crash_instructions.txt +++ /dev/null @@ -1,14 +0,0 @@ -You, as a Dash Administrator, are being notified of a server crash event. Here's what we know: - -name: -__name__ - -message: -__message__ - -stack: -__stack__ - -The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress. - -__signature__ \ No newline at end of file diff --git a/src/server/DashSession/remote_debug_instructions.txt b/src/server/DashSession/remote_debug_instructions.txt deleted file mode 100644 index c279c460a..000000000 --- a/src/server/DashSession/remote_debug_instructions.txt +++ /dev/null @@ -1,16 +0,0 @@ -Instructions: - -Download this attachment, open your downloads folder and find this file (__zipname__). -Right click on the zip file and select 'Extract to __target__\'. -Open up the command line, and remember that you can get the path to any file or directory by literally dragging it from the file system and dropping it onto the terminal. -Unless it's in your path, you'll want to navigate to the MongoDB bin directory, given for Windows: - -cd '/c/Program Files/MongoDB/Server/[your version, i.e. 4.0, goes here]/bin' - -Then run the following command (if you're in the bin folder, make that ./mongorestore ...): - -mongorestore --gzip [/path/to/directory/you/just/unzipped] --db Dash - -Assuming everything runs well, this will mirror your local database with that of the server. Now, just start the server locally and debug. - -__signature__ \ No newline at end of file diff --git a/src/server/DashSession/templates/crash_instructions.txt b/src/server/DashSession/templates/crash_instructions.txt new file mode 100644 index 000000000..65417919d --- /dev/null +++ b/src/server/DashSession/templates/crash_instructions.txt @@ -0,0 +1,14 @@ +You, as a Dash Administrator, are being notified of a server crash event. Here's what we know: + +name: +__name__ + +message: +__message__ + +stack: +__stack__ + +The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress. + +__signature__ \ No newline at end of file diff --git a/src/server/DashSession/templates/remote_debug_instructions.txt b/src/server/DashSession/templates/remote_debug_instructions.txt new file mode 100644 index 000000000..c279c460a --- /dev/null +++ b/src/server/DashSession/templates/remote_debug_instructions.txt @@ -0,0 +1,16 @@ +Instructions: + +Download this attachment, open your downloads folder and find this file (__zipname__). +Right click on the zip file and select 'Extract to __target__\'. +Open up the command line, and remember that you can get the path to any file or directory by literally dragging it from the file system and dropping it onto the terminal. +Unless it's in your path, you'll want to navigate to the MongoDB bin directory, given for Windows: + +cd '/c/Program Files/MongoDB/Server/[your version, i.e. 4.0, goes here]/bin' + +Then run the following command (if you're in the bin folder, make that ./mongorestore ...): + +mongorestore --gzip [/path/to/directory/you/just/unzipped] --db Dash + +Assuming everything runs well, this will mirror your local database with that of the server. Now, just start the server locally and debug. + +__signature__ \ No newline at end of file -- cgit v1.2.3-70-g09d2 From 7741fd9cc135f94fbc1b68d89d68e38c93648f33 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Fri, 10 Jan 2020 14:34:22 -0500 Subject: created multicolumn view file, made recipient parameter optional in sessionmanager --- src/client/views/CollectionMulticolumnView.tsx | 25 +++++++++++++++++++++++++ src/server/ApiManagers/SessionManager.ts | 6 ++++-- src/server/DashSession/DashSessionAgent.ts | 25 ++++++++++++++++--------- 3 files changed, 45 insertions(+), 11 deletions(-) create mode 100644 src/client/views/CollectionMulticolumnView.tsx (limited to 'src/server/DashSession') diff --git a/src/client/views/CollectionMulticolumnView.tsx b/src/client/views/CollectionMulticolumnView.tsx new file mode 100644 index 000000000..8f0ffd3d0 --- /dev/null +++ b/src/client/views/CollectionMulticolumnView.tsx @@ -0,0 +1,25 @@ +import { observer } from 'mobx-react'; +import { makeInterface } from '../../new_fields/Schema'; +import { documentSchema } from '../../new_fields/documentSchemas'; +import { CollectionSubView } from './collections/CollectionSubView'; +import { DragManager } from '../util/DragManager'; + +type MulticolumnDocument = makeInterface<[typeof documentSchema]>; +const MulticolumnDocument = makeInterface(documentSchema); + +@observer +export default class CollectionMulticolumnView extends CollectionSubView(MulticolumnDocument) { + + private _dropDisposer?: DragManager.DragDropDisposer; + protected createDropTarget = (ele: HTMLDivElement) => { //used for stacking and masonry view + this._dropDisposer && this._dropDisposer(); + if (ele) { + this._dropDisposer = DragManager.MakeDropTarget(ele, this.drop.bind(this)); + } + } + + render() { + return null; + } + +} \ No newline at end of file diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts index 6782643bc..21103fdd5 100644 --- a/src/server/ApiManagers/SessionManager.ts +++ b/src/server/ApiManagers/SessionManager.ts @@ -2,6 +2,7 @@ import ApiManager, { Registration } from "./ApiManager"; import { Method, _permission_denied, AuthorizedCore, SecureHandler } from "../RouteManager"; import RouteSubscriber from "../RouteSubscriber"; import { sessionAgent } from ".."; +import { DashSessionAgent } from "../DashSession/DashSessionAgent"; const permissionError = "You are not authorized!"; @@ -27,10 +28,11 @@ export default class SessionManager extends ApiManager { register({ method: Method.GET, - subscription: this.secureSubscriber("debug", "mode", "recipient"), + subscription: this.secureSubscriber("debug", "mode", "recipient?"), secureHandler: this.authorizedAction(async ({ req, res }) => { - const { mode, recipient } = req.params; + const { mode } = req.params; if (["passive", "active"].includes(mode)) { + const recipient = req.params.recipient || DashSessionAgent.notificationRecipient; const response = await sessionAgent.serverWorker.sendMonitorAction("debug", { mode, recipient }, true); if (response instanceof Error) { res.send(response); diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index 8061da1ca..f3f0a3c3d 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -19,7 +19,6 @@ import { ServerWorker } from "../session/agents/server_worker"; */ export class DashSessionAgent extends AppliedSessionAgent { - private readonly notificationRecipients = ["brownptcdash@gmail.com"]; private readonly signature = "-Dash Server Session Manager"; private readonly releaseDesktop = pathFromRoot("../../Desktop"); @@ -83,14 +82,15 @@ export class DashSessionAgent extends AppliedSessionAgent { */ private dispatchSessionPassword = async (key: string) => { const { mainLog } = this.sessionMonitor; + const { notificationRecipient } = DashSessionAgent; mainLog(green("dispatching session key...")); - const failures = await Email.dispatchAll({ - to: this.notificationRecipients, + const error = await Email.dispatch({ + to: notificationRecipient, subject: "Dash Release Session Admin Authentication Key", content: `The key for this session (started @ ${new Date().toUTCString()}) is ${key}.\n\n${this.signature}` }); - if (failures) { - failures.map(({ recipient, error: { message } }) => this.sessionMonitor.mainLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`))); + if (error) { + this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`)); mainLog(red("distribution of session key experienced errors")); } else { mainLog(green("successfully distributed session key to recipients")); @@ -102,13 +102,14 @@ export class DashSessionAgent extends AppliedSessionAgent { */ private dispatchCrashReport = async (crashCause: Error) => { const { mainLog } = this.sessionMonitor; - const failures = await Email.dispatchAll({ - to: this.notificationRecipients, + const { notificationRecipient } = DashSessionAgent; + const error = await Email.dispatch({ + to: notificationRecipient, subject: "Dash Web Server Crash", content: this.generateCrashInstructions(crashCause) }); - if (failures) { - failures.map(({ recipient, error: { message } }) => this.sessionMonitor.mainLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`))); + if (error) { + this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`)); mainLog(red("distribution of crash notification experienced errors")); } else { mainLog(green("successfully distributed crash notification to recipients")); @@ -210,4 +211,10 @@ export class DashSessionAgent extends AppliedSessionAgent { } } +} + +export namespace DashSessionAgent { + + export const notificationRecipient = "brownptcdash@gmail.com"; + } \ No newline at end of file -- cgit v1.2.3-70-g09d2 From 2c83f136771794565350d229a238b3f01cc60aca Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Fri, 10 Jan 2020 17:10:50 -0500 Subject: monitor events --- src/server/DashSession/DashSessionAgent.ts | 4 ++-- src/server/session/agents/monitor.ts | 29 +++++++++++------------------ src/server/session/agents/server_worker.ts | 4 ++-- 3 files changed, 15 insertions(+), 22 deletions(-) (limited to 'src/server/DashSession') diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index f3f0a3c3d..0d9486757 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -33,8 +33,8 @@ export class DashSessionAgent extends AppliedSessionAgent { monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient)); monitor.addServerMessageListener("backup", this.backup); monitor.addServerMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.on(Monitor.IntrinsicEvents.KeyGenerated, this.dispatchSessionPassword); - monitor.on(Monitor.IntrinsicEvents.CrashDetected, this.dispatchCrashReport); + monitor.onKeyGenerated(this.dispatchSessionPassword); + monitor.onCrashDetected(this.dispatchCrashReport); } /** diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index f6738a23f..cd09c9e41 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -48,6 +48,10 @@ export class Monitor extends EventEmitter { } } + public onCrashDetected = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener); + public onKeyGenerated = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.KeyGenerated, listener); + public onServerRunning = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener); + /** * Kill this session and its active child * server process, either gracefully (may wait @@ -317,30 +321,19 @@ export class Monitor extends EventEmitter { }); Monitor.childIPCManager = new PromisifiedIPCManager(this.activeWorker); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); + + this.addServerMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode)); + this.addServerMessageListener(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, ({ args: { error } }) => this.emit(Monitor.IntrinsicEvents.CrashDetected, error)); + this.addServerMessageListener(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, ({ args: { firstTime } }) => this.emit(Monitor.IntrinsicEvents.ServerRunning, firstTime)); + // an IPC message handler that executes actions on the master thread when prompted by the active worker Monitor.childIPCManager.addMessagesHandler(async ({ lifecycle, action }) => { if (action) { const { message, args } = action as Monitor.Action; console.log(this.timestamp(), `${this.config.identifiers.worker.text} action requested (${cyan(message)})`); - switch (message) { - case "kill": - const { reason, graceful, errorCode } = args; - this.killSession(reason, graceful, errorCode); - break; - case "notify_crash": - this.emit(Monitor.IntrinsicEvents.CrashDetected, args.error); - break; - case Monitor.IntrinsicEvents.ServerRunning: - this.emit(Monitor.IntrinsicEvents.ServerRunning, args.firstTime); - break; - case "set_port": - const { port, value, immediateRestart } = args; - this.setPort(port, value, immediateRestart); - break; - } const handlers = this.onMessage[message]; if (handlers) { - handlers.forEach(handler => handler({ message, args })); + await Promise.all(handlers.map(handler => handler({ message, args }))); } } if (lifecycle) { @@ -358,7 +351,7 @@ export namespace Monitor { args: any; } - export type ServerMessageHandler = (action: Action) => void | Promise; + export type ServerMessageHandler = (action: Action) => any | Promise; export enum IntrinsicEvents { KeyGenerated = "key_generated", diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index 278cbb42f..b279a19d8 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -118,7 +118,7 @@ export class ServerWorker { private proactiveUnplannedExit = async (error: Error): Promise => { this.shouldServerBeResponsive = false; // communicates via IPC to the master thread that it should dispatch a crash notification email - this.sendMonitorAction("notify_crash", { error }); + this.sendMonitorAction(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, { error }); await this.executeExitHandlers(error); // notify master thread (which will log update in the console) of crash event via IPC this.lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`)); @@ -138,7 +138,7 @@ export class ServerWorker { if (!this.shouldServerBeResponsive) { // notify monitor thread that the server is up and running this.lifecycleNotification(green(`listening on ${this.serverPort}...`)); - this.sendMonitorAction(Monitor.IntrinsicEvents.ServerRunning, { firstTime: !this.isInitialized }); + this.sendMonitorAction(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, { firstTime: !this.isInitialized }); this.isInitialized = true; } this.shouldServerBeResponsive = true; -- cgit v1.2.3-70-g09d2 From 27c93abd49ca8a519d2aa3cf7938434fe25947d7 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sat, 11 Jan 2020 09:54:48 -0500 Subject: extends message, removed duplicate handlers, IPC streamlined --- src/server/ApiManagers/SessionManager.ts | 4 +- src/server/DashSession/DashSessionAgent.ts | 15 ++-- src/server/session/agents/applied_session_agent.ts | 8 +- src/server/session/agents/message_router.ts | 45 +++++++++++ src/server/session/agents/monitor.ts | 30 +++---- src/server/session/agents/server_worker.ts | 17 ++-- src/server/session/utilities/ipc.ts | 93 ++++++++-------------- 7 files changed, 116 insertions(+), 96 deletions(-) create mode 100644 src/server/session/agents/message_router.ts (limited to 'src/server/DashSession') diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts index 21103fdd5..91ef7e298 100644 --- a/src/server/ApiManagers/SessionManager.ts +++ b/src/server/ApiManagers/SessionManager.ts @@ -33,7 +33,7 @@ export default class SessionManager extends ApiManager { const { mode } = req.params; if (["passive", "active"].includes(mode)) { const recipient = req.params.recipient || DashSessionAgent.notificationRecipient; - const response = await sessionAgent.serverWorker.sendMonitorAction("debug", { mode, recipient }, true); + const response = await sessionAgent.serverWorker.emitToMonitor("debug", { mode, recipient }, true); if (response instanceof Error) { res.send(response); } else { @@ -49,7 +49,7 @@ export default class SessionManager extends ApiManager { method: Method.GET, subscription: this.secureSubscriber("backup"), secureHandler: this.authorizedAction(async ({ res }) => { - const response = await sessionAgent.serverWorker.sendMonitorAction("backup"); + const response = await sessionAgent.serverWorker.emitToMonitor("backup"); if (response instanceof Error) { res.send(response); } else { diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index 0d9486757..b7e741525 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -11,6 +11,7 @@ import { resolve } from "path"; import { AppliedSessionAgent, ExitHandler } from "../session/agents/applied_session_agent"; import { Monitor } from "../session/agents/monitor"; import { ServerWorker } from "../session/agents/server_worker"; +import { Message } from "../session/utilities/ipc"; /** * If we're the monitor (master) thread, we should launch the monitor logic for the session. @@ -26,14 +27,14 @@ export class DashSessionAgent extends AppliedSessionAgent { * The core method invoked when the single master thread is initialized. * Installs event hooks, repl commands and additional IPC listeners. */ - protected async initializeMonitor(monitor: Monitor) { + protected async initializeMonitor(monitor: Monitor, sessionKey: string) { + await this.dispatchSessionPassword(sessionKey); monitor.addReplCommand("pull", [], () => monitor.exec("git pull")); monitor.addReplCommand("solr", [/start|stop|index/], this.executeSolrCommand); monitor.addReplCommand("backup", [], this.backup); monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.addServerMessageListener("backup", this.backup); - monitor.addServerMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.onKeyGenerated(this.dispatchSessionPassword); + monitor.addMessageListener("backup", this.backup); + monitor.addMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); monitor.onCrashDetected(this.dispatchCrashReport); } @@ -80,14 +81,14 @@ export class DashSessionAgent extends AppliedSessionAgent { * This sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone * to kill the server via the /kill/:key route. */ - private dispatchSessionPassword = async (key: string) => { + private dispatchSessionPassword = async (sessionKey: string) => { const { mainLog } = this.sessionMonitor; const { notificationRecipient } = DashSessionAgent; mainLog(green("dispatching session key...")); const error = await Email.dispatch({ to: notificationRecipient, subject: "Dash Release Session Admin Authentication Key", - content: `The key for this session (started @ ${new Date().toUTCString()}) is ${key}.\n\n${this.signature}` + content: `The key for this session (started @ ${new Date().toUTCString()}) is ${sessionKey}.\n\n${this.signature}` }); if (error) { this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`)); @@ -100,7 +101,7 @@ export class DashSessionAgent extends AppliedSessionAgent { /** * This sends an email with the generated crash report. */ - private dispatchCrashReport = async (crashCause: Error) => { + private dispatchCrashReport = async ({ args: { error: crashCause } }: Message) => { const { mainLog } = this.sessionMonitor; const { notificationRecipient } = DashSessionAgent; const error = await Email.dispatch({ diff --git a/src/server/session/agents/applied_session_agent.ts b/src/server/session/agents/applied_session_agent.ts index 53293d3bf..48226dab6 100644 --- a/src/server/session/agents/applied_session_agent.ts +++ b/src/server/session/agents/applied_session_agent.ts @@ -1,6 +1,7 @@ import { isMaster } from "cluster"; import { Monitor } from "./monitor"; import { ServerWorker } from "./server_worker"; +import { Utils } from "../../../Utils"; export type ExitHandler = (reason: Error | boolean) => void | Promise; @@ -8,7 +9,7 @@ export abstract class AppliedSessionAgent { // the following two methods allow the developer to create a custom // session and use the built in customization options for each thread - protected abstract async initializeMonitor(monitor: Monitor): Promise; + protected abstract async initializeMonitor(monitor: Monitor, key: string): Promise; protected abstract async initializeServerWorker(): Promise; private launched = false; @@ -21,7 +22,7 @@ export abstract class AppliedSessionAgent { private sessionMonitorRef: Monitor | undefined; public get sessionMonitor(): Monitor { if (!isMaster) { - this.serverWorker.sendMonitorAction("kill", { + this.serverWorker.emitToMonitor("kill", { graceful: false, reason: "Cannot access the session monitor directly from the server worker thread.", errorCode: 1 @@ -43,7 +44,8 @@ export abstract class AppliedSessionAgent { if (!this.launched) { this.launched = true; if (isMaster) { - await this.initializeMonitor(this.sessionMonitorRef = Monitor.Create()); + const sessionKey = Utils.GenerateGuid(); + await this.initializeMonitor(this.sessionMonitorRef = Monitor.Create(sessionKey), sessionKey); this.sessionMonitorRef.finalize(); } else { this.serverWorkerRef = await this.initializeServerWorker(); diff --git a/src/server/session/agents/message_router.ts b/src/server/session/agents/message_router.ts new file mode 100644 index 000000000..5848e27ab --- /dev/null +++ b/src/server/session/agents/message_router.ts @@ -0,0 +1,45 @@ +import { MessageHandler, Message } from "../utilities/ipc"; + +export default abstract class MessageRouter { + + private onMessage: { [name: string]: MessageHandler[] | undefined } = {}; + + /** + * Add a listener at this message. When the monitor process + * receives a message, it will invoke all registered functions. + */ + public addMessageListener = (name: string, handler: MessageHandler, exclusive = false) => { + const handlers = this.onMessage[name]; + if (exclusive || !handlers) { + this.onMessage[name] = [handler]; + } else { + handlers.push(handler); + } + } + + /** + * Unregister a given listener at this message. + */ + public removeMessageListener = (name: string, handler: MessageHandler) => { + const handlers = this.onMessage[name]; + if (handlers) { + const index = handlers.indexOf(handler); + if (index > -1) { + handlers.splice(index, 1); + } + } + } + + /** + * Unregister all listeners at this message. + */ + public clearMessageListeners = (...names: string[]) => names.map(name => this.onMessage[name] = undefined); + + protected route: MessageHandler = async ({ name, args }) => { + const handlers = this.onMessage[name]; + if (handlers) { + await Promise.all(handlers.map(handler => handler({ name, args }))); + } + } + +} \ No newline at end of file diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index 18fa6df24..96f1f8130 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -2,20 +2,19 @@ import { ExitHandler } from "./applied_session_agent"; import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config"; import Repl, { ReplAction } from "../utilities/repl"; import { isWorker, setupMaster, on, Worker, fork } from "cluster"; -import { PromisifiedIPCManager, suffix, IPC, Message } from "../utilities/ipc"; +import { PromisifiedIPCManager, suffix, IPC, MessageHandler, Message } from "../utilities/ipc"; import { red, cyan, white, yellow, blue } from "colors"; import { exec, ExecOptions } from "child_process"; -import { Utils } from "../../../Utils"; import { validate, ValidationError } from "jsonschema"; import { Utilities } from "../utilities/utilities"; import { readFileSync } from "fs"; -import { EventEmitter } from "events"; +import MessageRouter from "./message_router"; /** * Validates and reads the configuration file, accordingly builds a child process factory * and spawns off an initial process that will respawn as predecessors die. */ -export class Monitor extends EventEmitter { +export class Monitor extends MessageRouter { private static IPCManager: PromisifiedIPCManager; private static count = 0; private finalized = false; @@ -25,7 +24,7 @@ export class Monitor extends EventEmitter { private key: string | undefined; private repl: Repl; - public static Create() { + public static Create(sessionKey: string) { if (isWorker) { IPC(process).emit("kill", { reason: "cannot create a monitor on the worker process.", @@ -37,13 +36,12 @@ export class Monitor extends EventEmitter { console.error(red("cannot create more than one monitor.")); process.exit(1); } else { - return new Monitor(); + return new Monitor(sessionKey); } } - public onCrashDetected = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener); - public onKeyGenerated = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.KeyGenerated, listener); - public onServerRunning = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener); + public onCrashDetected = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.CrashDetected, listener); + public onServerRunning = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.ServerRunning, listener); /** * Kill this session and its active child @@ -93,10 +91,10 @@ export class Monitor extends EventEmitter { }); } - private constructor() { + private constructor(sessionKey: string) { super(); - console.log(this.timestamp(), cyan("initializing session...")); + this.key = sessionKey; this.config = this.loadAndValidateConfiguration(); // determines whether or not we see the compilation / initialization / runtime output of each child server process @@ -131,7 +129,6 @@ export class Monitor extends EventEmitter { throw new Error("Session monitor is already finalized"); } this.finalized = true; - this.emit(Monitor.IntrinsicEvents.KeyGenerated, this.key = Utils.GenerateGuid()); this.spawn(); } @@ -284,11 +281,10 @@ export class Monitor extends EventEmitter { Monitor.IPCManager = IPC(this.activeWorker); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); - const { addMessageListener } = Monitor.IPCManager; - addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode)); - addMessageListener(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, ({ args: { error } }) => this.emit(Monitor.IntrinsicEvents.CrashDetected, error)); - addMessageListener(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, ({ args: { firstTime } }) => this.emit(Monitor.IntrinsicEvents.ServerRunning, firstTime)); - addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`)); + this.addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true); + this.addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); + + Monitor.IPCManager.setRouter(this.route); } } diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index 9e471366a..01e1cf971 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -1,6 +1,7 @@ import { ExitHandler } from "./applied_session_agent"; import { isMaster } from "cluster"; import { PromisifiedIPCManager } from "../utilities/ipc"; +import MessageRouter from "./message_router"; import { red, green, white, yellow } from "colors"; import { get } from "request-promise"; import { Monitor } from "./monitor"; @@ -10,7 +11,7 @@ import { Monitor } from "./monitor"; * if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification * email if the server encounters an uncaught exception or if the server cannot be reached. */ -export class ServerWorker { +export class ServerWorker extends MessageRouter { private static IPCManager = new PromisifiedIPCManager(process); private static count = 0; private shouldServerBeResponsive = false; @@ -58,6 +59,7 @@ export class ServerWorker { public emitToMonitor = (name: string, args?: any, expectResponse = false) => ServerWorker.IPCManager.emit(name, args, expectResponse); private constructor(work: Function) { + super(); this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env; @@ -76,10 +78,13 @@ export class ServerWorker { * server process. */ private configureProcess = () => { + ServerWorker.IPCManager.setRouter(this.route); // updates the local values of variables to the those sent from master - const { addMessageListener } = ServerWorker.IPCManager; - addMessageListener("updatePollingInterval", ({ args }) => this.pollingIntervalSeconds = args.newPollingIntervalSeconds); - addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => { + this.addMessageListener("updatePollingInterval", ({ args }) => { + this.pollingIntervalSeconds = args.newPollingIntervalSeconds; + return new Promise(resolve => setTimeout(resolve, 1000 * 10)); + }); + this.addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => { await this.executeExitHandlers(isSessionEnd); process.exit(0); }); @@ -110,7 +115,7 @@ export class ServerWorker { private proactiveUnplannedExit = async (error: Error): Promise => { this.shouldServerBeResponsive = false; // communicates via IPC to the master thread that it should dispatch a crash notification email - this.emitToMonitor(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, { error }); + this.emitToMonitor(Monitor.IntrinsicEvents.CrashDetected, { error }); await this.executeExitHandlers(error); // notify master thread (which will log update in the console) of crash event via IPC this.lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`)); @@ -130,7 +135,7 @@ export class ServerWorker { if (!this.shouldServerBeResponsive) { // notify monitor thread that the server is up and running this.lifecycleNotification(green(`listening on ${this.serverPort}...`)); - this.emitToMonitor(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, { firstTime: !this.isInitialized }); + this.emitToMonitor(Monitor.IntrinsicEvents.ServerRunning, { firstTime: !this.isInitialized }); this.isInitialized = true; } this.shouldServerBeResponsive = true; diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts index 2faf9f63e..37aaa6757 100644 --- a/src/server/session/utilities/ipc.ts +++ b/src/server/session/utilities/ipc.ts @@ -8,95 +8,66 @@ export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix; export interface Message { name: string; - args: any; + args?: any; } +type InternalMessage = Message & { metadata: any }; -export type MessageHandler = (message: Message) => any | Promise; +export type MessageHandler = (message: T) => any | Promise; export class PromisifiedIPCManager { - private onMessage: { [message: string]: MessageHandler[] | undefined } = {}; private readonly target: IPCTarget; private readonly ipc_id = `ipc_id_${suffix}`; - private readonly response_expected = `response_expected_${suffix}`; private readonly is_response = `is_response_${suffix}`; constructor(target: IPCTarget) { this.target = target; - - this.target.addListener("message", async ({ name, args }: Message) => { - let error: Error | undefined; - try { - const handlers = this.onMessage[name]; - if (handlers) { - await Promise.all(handlers.map(handler => handler({ name, args }))); - } - } catch (e) { - error = e; - } - if (args[this.response_expected] && this.target.send) { - const response: any = { error }; - response[this.ipc_id] = args[this.ipc_id]; - response[this.is_response] = true; - this.target.send(response); - } - }); - } - - /** - * Add a listener at this message. When the monitor process - * receives a message, it will invoke all registered functions. - */ - public addMessageListener = (name: string, handler: MessageHandler) => { - const handlers = this.onMessage[name]; - if (handlers) { - handlers.push(handler); - } else { - this.onMessage[name] = [handler]; - } - } - - /** - * Unregister a given listener at this message. - */ - public removeMessageListener = (name: string, handler: MessageHandler) => { - const handlers = this.onMessage[name]; - if (handlers) { - const index = handlers.indexOf(handler); - if (index > -1) { - handlers.splice(index, 1); - } - } } - /** - * Unregister all listeners at this message. - */ - public clearMessageListeners = (message: string) => this.onMessage[message] = undefined; - - public emit = async (name: string, args: any, expectResponse = false): Promise => { + public emit = async (name: string, args?: any, expectResponse = false): Promise => { if (!this.target.send) { return new Error("Cannot dispatch when send is undefined."); } - args[this.response_expected] = expectResponse; if (expectResponse) { return new Promise(resolve => { const messageId = Utils.GenerateGuid(); - args[this.ipc_id] = messageId; - const responseHandler: (args: any) => void = response => { - const { error } = response; - if (response[this.is_response] && response[this.ipc_id] === messageId) { + const metadata: any = {}; + metadata[this.ipc_id] = messageId; + const responseHandler: MessageHandler = ({ args, metadata }) => { + if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) { + const { error } = args; this.target.removeListener("message", responseHandler); resolve(error); } }; this.target.addListener("message", responseHandler); - this.target.send!({ name, args }); + this.target.send?.({ name, args, metadata }); }); } else { - this.target.send({ name, args }); + this.target.send?.({ name, args }); } } + public setRouter = (router: Router) => { + this.target.addListener("message", async ({ name, args, metadata }: InternalMessage) => { + if (name && (!metadata || !metadata[this.is_response])) { + let error: Error | undefined; + try { + await router({ name, args }); + } catch (e) { + error = e; + } + if (metadata && this.target.send) { + metadata[this.is_response] = true; + this.target.send({ + name, + args: { error }, + metadata + }); + } + } + }); + } + } export function IPC(target: IPCTarget) { -- cgit v1.2.3-70-g09d2 From 120fa84b3e8c794dd882d3613067c5b18ee7ba04 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sat, 11 Jan 2020 11:23:11 -0500 Subject: typed messages and handlers --- src/server/DashSession/DashSessionAgent.ts | 6 +- src/server/session/agents/message_router.ts | 4 +- src/server/session/agents/monitor.ts | 100 +++++++++++++++------------- src/server/session/agents/server_worker.ts | 8 +-- src/server/session/utilities/ipc.ts | 13 ++-- 5 files changed, 68 insertions(+), 63 deletions(-) (limited to 'src/server/DashSession') diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index b7e741525..23d421835 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -33,9 +33,9 @@ export class DashSessionAgent extends AppliedSessionAgent { monitor.addReplCommand("solr", [/start|stop|index/], this.executeSolrCommand); monitor.addReplCommand("backup", [], this.backup); monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.addMessageListener("backup", this.backup); - monitor.addMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.onCrashDetected(this.dispatchCrashReport); + monitor.on("backup", this.backup); + monitor.on("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); + monitor.hooks.crashDetected(this.dispatchCrashReport); } /** diff --git a/src/server/session/agents/message_router.ts b/src/server/session/agents/message_router.ts index 5848e27ab..707f771d9 100644 --- a/src/server/session/agents/message_router.ts +++ b/src/server/session/agents/message_router.ts @@ -8,7 +8,7 @@ export default abstract class MessageRouter { * Add a listener at this message. When the monitor process * receives a message, it will invoke all registered functions. */ - public addMessageListener = (name: string, handler: MessageHandler, exclusive = false) => { + public on = (name: string, handler: MessageHandler, exclusive = false) => { const handlers = this.onMessage[name]; if (exclusive || !handlers) { this.onMessage[name] = [handler]; @@ -20,7 +20,7 @@ export default abstract class MessageRouter { /** * Unregister a given listener at this message. */ - public removeMessageListener = (name: string, handler: MessageHandler) => { + public off = (name: string, handler: MessageHandler) => { const handlers = this.onMessage[name]; if (handlers) { const index = handlers.indexOf(handler); diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index 5f4543606..ccba8199e 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -2,7 +2,7 @@ import { ExitHandler } from "./applied_session_agent"; import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config"; import Repl, { ReplAction } from "../utilities/repl"; import { isWorker, setupMaster, on, Worker, fork } from "cluster"; -import { PromisifiedIPCManager, suffix, IPC, MessageHandler, Message } from "../utilities/ipc"; +import { PromisifiedIPCManager, suffix, IPC, MessageHandler } from "../utilities/ipc"; import { red, cyan, white, yellow, blue } from "colors"; import { exec, ExecOptions } from "child_process"; import { validate, ValidationError } from "jsonschema"; @@ -40,8 +40,54 @@ export class Monitor extends MessageRouter { } } - public onCrashDetected = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.CrashDetected, listener); - public onServerRunning = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.ServerRunning, listener); + private constructor(sessionKey: string) { + super(); + this.config = this.loadAndValidateConfiguration(); + this.initialize(sessionKey); + this.repl = this.initializeRepl(); + } + + private initialize = (sessionKey: string) => { + console.log(this.timestamp(), cyan("initializing session...")); + this.key = sessionKey; + + // determines whether or not we see the compilation / initialization / runtime output of each child server process + const output = this.config.showServerOutput ? "inherit" : "ignore"; + setupMaster({ stdio: ["ignore", output, output, "ipc"] }); + + // handle exceptions in the master thread - there shouldn't be many of these + // the IPC (inter process communication) channel closed exception can't seem + // to be caught in a try catch, and is inconsequential, so it is ignored + process.on("uncaughtException", ({ message, stack }): void => { + if (message !== "Channel closed") { + this.mainLog(red(message)); + if (stack) { + this.mainLog(`uncaught exception\n${red(stack)}`); + } + } + }); + + // a helpful cluster event called on the master thread each time a child process exits + on("exit", ({ process: { pid } }, code, signal) => { + const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`; + this.mainLog(cyan(prompt)); + // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one + this.spawn(); + }); + } + + public finalize = (): void => { + if (this.finalized) { + throw new Error("Session monitor is already finalized"); + } + this.finalized = true; + this.spawn(); + } + + public readonly hooks = Object.freeze({ + crashDetected: (listener: MessageHandler<{ error: Error }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener), + serverRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener) + }); /** * Kill this session and its active child @@ -91,48 +137,6 @@ export class Monitor extends MessageRouter { }); } - private constructor(sessionKey: string) { - super(); - console.log(this.timestamp(), cyan("initializing session...")); - this.key = sessionKey; - this.config = this.loadAndValidateConfiguration(); - - // determines whether or not we see the compilation / initialization / runtime output of each child server process - const output = this.config.showServerOutput ? "inherit" : "ignore"; - setupMaster({ stdio: ["ignore", output, output, "ipc"] }); - - // handle exceptions in the master thread - there shouldn't be many of these - // the IPC (inter process communication) channel closed exception can't seem - // to be caught in a try catch, and is inconsequential, so it is ignored - process.on("uncaughtException", ({ message, stack }): void => { - if (message !== "Channel closed") { - this.mainLog(red(message)); - if (stack) { - this.mainLog(`uncaught exception\n${red(stack)}`); - } - } - }); - - // a helpful cluster event called on the master thread each time a child process exits - on("exit", ({ process: { pid } }, code, signal) => { - const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`; - this.mainLog(cyan(prompt)); - // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one - this.spawn(); - }); - - this.repl = this.initializeRepl(); - Monitor.IPCManager.setRouter(this.route); - } - - public finalize = (): void => { - if (this.finalized) { - throw new Error("Session monitor is already finalized"); - } - this.finalized = true; - this.spawn(); - } - /** * Generates a blue UTC string associated with the time * of invocation. @@ -282,8 +286,10 @@ export class Monitor extends MessageRouter { Monitor.IPCManager = IPC(this.activeWorker); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); - this.addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true); - this.addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); + this.on("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true); + this.on("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); + + Monitor.IPCManager.setRouter(this.route); } } diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index 2c77cfb29..50abe398d 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -1,6 +1,6 @@ import { ExitHandler } from "./applied_session_agent"; import { isMaster } from "cluster"; -import { PromisifiedIPCManager } from "../utilities/ipc"; +import { PromisifiedIPCManager, Message } from "../utilities/ipc"; import MessageRouter from "./message_router"; import { red, green, white, yellow } from "colors"; import { get } from "request-promise"; @@ -80,11 +80,11 @@ export class ServerWorker extends MessageRouter { private configureProcess = () => { ServerWorker.IPCManager.setRouter(this.route); // updates the local values of variables to the those sent from master - this.addMessageListener("updatePollingInterval", ({ args }) => { + this.on("updatePollingInterval", ({ args }: Message<{ newPollingIntervalSeconds: number }>) => { this.pollingIntervalSeconds = args.newPollingIntervalSeconds; return new Promise(resolve => setTimeout(resolve, 1000 * 10)); }); - this.addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => { + this.on("manualExit", async ({ args: { isSessionEnd } }: Message<{ isSessionEnd: boolean }>) => { await this.executeExitHandlers(isSessionEnd); process.exit(0); }); @@ -135,7 +135,7 @@ export class ServerWorker extends MessageRouter { if (!this.shouldServerBeResponsive) { // notify monitor thread that the server is up and running this.lifecycleNotification(green(`listening on ${this.serverPort}...`)); - this.emitToMonitor(Monitor.IntrinsicEvents.ServerRunning, { firstTime: !this.isInitialized }); + this.emitToMonitor(Monitor.IntrinsicEvents.ServerRunning, { isFirstTime: !this.isInitialized }); this.isInitialized = true; } this.shouldServerBeResponsive = true; diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts index fd8bf6075..7ad00596d 100644 --- a/src/server/session/utilities/ipc.ts +++ b/src/server/session/utilities/ipc.ts @@ -6,13 +6,12 @@ export type Router = (message: Message) => void | Promise; export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix; -export interface Message { +export interface Message { name: string; - args?: any; + args: T; } -type InternalMessage = Message & { metadata: any }; - -export type MessageHandler = (message: T) => any | Promise; +type InternalMessage = Message & { metadata: any }; +export type MessageHandler = Message> = (message: T) => any | Promise; export class PromisifiedIPCManager { private readonly target: IPCTarget; @@ -32,10 +31,10 @@ export class PromisifiedIPCManager { const messageId = Utils.GenerateGuid(); const metadata: any = {}; metadata[this.ipc_id] = messageId; - const responseHandler: MessageHandler = ({ args, metadata }) => { + const responseHandler: MessageHandler = ({ metadata, args }) => { if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) { this.target.removeListener("message", responseHandler); - resolve(args.error as Error | undefined); + resolve(args?.error as Error | undefined); } }; this.target.addListener("message", responseHandler); -- cgit v1.2.3-70-g09d2 From 86f1e0f58940904b8c55284f6787e7422a6665ff Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sat, 11 Jan 2020 13:42:06 -0500 Subject: refactor --- src/server/ApiManagers/SessionManager.ts | 8 ++-- src/server/DashSession/DashSessionAgent.ts | 8 ++-- src/server/session/agents/message_router.ts | 45 --------------------- src/server/session/agents/monitor.ts | 17 ++++---- .../session/agents/process_message_router.ts | 46 ++++++++++++++++++++++ src/server/session/agents/server_worker.ts | 18 +++++---- src/server/session/utilities/ipc.ts | 39 ++++++++---------- 7 files changed, 89 insertions(+), 92 deletions(-) delete mode 100644 src/server/session/agents/message_router.ts create mode 100644 src/server/session/agents/process_message_router.ts (limited to 'src/server/DashSession') diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts index 91ef7e298..4513752a6 100644 --- a/src/server/ApiManagers/SessionManager.ts +++ b/src/server/ApiManagers/SessionManager.ts @@ -8,16 +8,16 @@ const permissionError = "You are not authorized!"; export default class SessionManager extends ApiManager { - private secureSubscriber = (root: string, ...params: string[]) => new RouteSubscriber(root).add("password", ...params); + private secureSubscriber = (root: string, ...params: string[]) => new RouteSubscriber(root).add("sessionKey", ...params); private authorizedAction = (handler: SecureHandler) => { return (core: AuthorizedCore) => { const { req, res, isRelease } = core; - const { password } = req.params; + const { sessionKey } = req.params; if (!isRelease) { return res.send("This can be run only on the release server."); } - if (password !== process.env.session_key) { + if (sessionKey !== process.env.session_key) { return _permission_denied(res, permissionError); } return handler(core); @@ -33,7 +33,7 @@ export default class SessionManager extends ApiManager { const { mode } = req.params; if (["passive", "active"].includes(mode)) { const recipient = req.params.recipient || DashSessionAgent.notificationRecipient; - const response = await sessionAgent.serverWorker.emitToMonitor("debug", { mode, recipient }, true); + const response = await sessionAgent.serverWorker.emitToMonitorPromise("debug", { mode, recipient }); if (response instanceof Error) { res.send(response); } else { diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index 23d421835..de8e7240f 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -11,7 +11,7 @@ import { resolve } from "path"; import { AppliedSessionAgent, ExitHandler } from "../session/agents/applied_session_agent"; import { Monitor } from "../session/agents/monitor"; import { ServerWorker } from "../session/agents/server_worker"; -import { Message } from "../session/utilities/ipc"; +import { MessageHandler } from "../session/utilities/ipc"; /** * If we're the monitor (master) thread, we should launch the monitor logic for the session. @@ -34,8 +34,8 @@ export class DashSessionAgent extends AppliedSessionAgent { monitor.addReplCommand("backup", [], this.backup); monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient)); monitor.on("backup", this.backup); - monitor.on("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.hooks.crashDetected(this.dispatchCrashReport); + monitor.on("debug", ({ mode, recipient }) => this.dispatchZippedDebugBackup(mode, recipient)); + monitor.coreHooks.onCrashDetected(this.dispatchCrashReport); } /** @@ -101,7 +101,7 @@ export class DashSessionAgent extends AppliedSessionAgent { /** * This sends an email with the generated crash report. */ - private dispatchCrashReport = async ({ args: { error: crashCause } }: Message) => { + private dispatchCrashReport: MessageHandler<{ error: Error }> = async ({ error: crashCause }) => { const { mainLog } = this.sessionMonitor; const { notificationRecipient } = DashSessionAgent; const error = await Email.dispatch({ diff --git a/src/server/session/agents/message_router.ts b/src/server/session/agents/message_router.ts deleted file mode 100644 index 707f771d9..000000000 --- a/src/server/session/agents/message_router.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { MessageHandler, Message } from "../utilities/ipc"; - -export default abstract class MessageRouter { - - private onMessage: { [name: string]: MessageHandler[] | undefined } = {}; - - /** - * Add a listener at this message. When the monitor process - * receives a message, it will invoke all registered functions. - */ - public on = (name: string, handler: MessageHandler, exclusive = false) => { - const handlers = this.onMessage[name]; - if (exclusive || !handlers) { - this.onMessage[name] = [handler]; - } else { - handlers.push(handler); - } - } - - /** - * Unregister a given listener at this message. - */ - public off = (name: string, handler: MessageHandler) => { - const handlers = this.onMessage[name]; - if (handlers) { - const index = handlers.indexOf(handler); - if (index > -1) { - handlers.splice(index, 1); - } - } - } - - /** - * Unregister all listeners at this message. - */ - public clearMessageListeners = (...names: string[]) => names.map(name => this.onMessage[name] = undefined); - - protected route: MessageHandler = async ({ name, args }) => { - const handlers = this.onMessage[name]; - if (handlers) { - await Promise.all(handlers.map(handler => handler({ name, args }))); - } - } - -} \ No newline at end of file diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index ccba8199e..d4abbb51e 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -8,14 +8,13 @@ import { exec, ExecOptions } from "child_process"; import { validate, ValidationError } from "jsonschema"; import { Utilities } from "../utilities/utilities"; import { readFileSync } from "fs"; -import MessageRouter from "./message_router"; +import ProcessMessageRouter from "./process_message_router"; /** * Validates and reads the configuration file, accordingly builds a child process factory * and spawns off an initial process that will respawn as predecessors die. */ -export class Monitor extends MessageRouter { - private static IPCManager: PromisifiedIPCManager; +export class Monitor extends ProcessMessageRouter { private static count = 0; private finalized = false; private exitHandlers: ExitHandler[] = []; @@ -84,9 +83,9 @@ export class Monitor extends MessageRouter { this.spawn(); } - public readonly hooks = Object.freeze({ - crashDetected: (listener: MessageHandler<{ error: Error }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener), - serverRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener) + public readonly coreHooks = Object.freeze({ + onCrashDetected: (listener: MessageHandler<{ error: Error }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener), + onServerRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener) }); /** @@ -219,7 +218,7 @@ export class Monitor extends MessageRouter { if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) { this.config.polling.intervalSeconds = newPollingIntervalSeconds; if (args[2] === "true") { - return Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds }, true); + return Monitor.IPCManager.emitPromise("updatePollingInterval", { newPollingIntervalSeconds }); } } } @@ -286,8 +285,8 @@ export class Monitor extends MessageRouter { Monitor.IPCManager = IPC(this.activeWorker); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); - this.on("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true); - this.on("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); + this.on("kill", ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode), true); + this.on("lifecycle", ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); Monitor.IPCManager.setRouter(this.route); } diff --git a/src/server/session/agents/process_message_router.ts b/src/server/session/agents/process_message_router.ts new file mode 100644 index 000000000..f60343514 --- /dev/null +++ b/src/server/session/agents/process_message_router.ts @@ -0,0 +1,46 @@ +import { MessageHandler, PromisifiedIPCManager } from "../utilities/ipc"; + +export default abstract class ProcessMessageRouter { + + protected static IPCManager: PromisifiedIPCManager; + private onMessage: { [name: string]: MessageHandler[] | undefined } = {}; + + /** + * Add a listener at this message. When the monitor process + * receives a message, it will invoke all registered functions. + */ + public on = (name: string, handler: MessageHandler, exclusive = false) => { + const handlers = this.onMessage[name]; + if (exclusive || !handlers) { + this.onMessage[name] = [handler]; + } else { + handlers.push(handler); + } + } + + /** + * Unregister a given listener at this message. + */ + public off = (name: string, handler: MessageHandler) => { + const handlers = this.onMessage[name]; + if (handlers) { + const index = handlers.indexOf(handler); + if (index > -1) { + handlers.splice(index, 1); + } + } + } + + /** + * Unregister all listeners at this message. + */ + public clearMessageListeners = (...names: string[]) => names.map(name => this.onMessage[name] = undefined); + + protected route: MessageHandler = async ({ name, args }) => { + const handlers = this.onMessage[name]; + if (handlers) { + await Promise.all(handlers.map(handler => handler(args))); + } + } + +} \ No newline at end of file diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index 50abe398d..23ffb2650 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -1,7 +1,7 @@ import { ExitHandler } from "./applied_session_agent"; import { isMaster } from "cluster"; -import { PromisifiedIPCManager, Message } from "../utilities/ipc"; -import MessageRouter from "./message_router"; +import { PromisifiedIPCManager, Message, MessageHandler } from "../utilities/ipc"; +import ProcessMessageRouter from "./process_message_router"; import { red, green, white, yellow } from "colors"; import { get } from "request-promise"; import { Monitor } from "./monitor"; @@ -11,8 +11,7 @@ import { Monitor } from "./monitor"; * if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification * email if the server encounters an uncaught exception or if the server cannot be reached. */ -export class ServerWorker extends MessageRouter { - private static IPCManager = new PromisifiedIPCManager(process); +export class ServerWorker extends ProcessMessageRouter { private static count = 0; private shouldServerBeResponsive = false; private exitHandlers: ExitHandler[] = []; @@ -56,10 +55,13 @@ export class ServerWorker extends MessageRouter { * A convenience wrapper to tell the session monitor (parent process) * to carry out the action with the specified message and arguments. */ - public emitToMonitor = (name: string, args?: any, awaitResponse = false) => ServerWorker.IPCManager.emit(name, args, awaitResponse); + public emitToMonitor = (name: string, args?: any) => ServerWorker.IPCManager.emit(name, args); + + public emitToMonitorPromise = (name: string, args?: any) => ServerWorker.IPCManager.emitPromise(name, args); private constructor(work: Function) { super(); + ServerWorker.IPCManager = new PromisifiedIPCManager(process); this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env; @@ -80,11 +82,11 @@ export class ServerWorker extends MessageRouter { private configureProcess = () => { ServerWorker.IPCManager.setRouter(this.route); // updates the local values of variables to the those sent from master - this.on("updatePollingInterval", ({ args }: Message<{ newPollingIntervalSeconds: number }>) => { - this.pollingIntervalSeconds = args.newPollingIntervalSeconds; + this.on("updatePollingInterval", ({ newPollingIntervalSeconds }) => { + this.pollingIntervalSeconds = newPollingIntervalSeconds; return new Promise(resolve => setTimeout(resolve, 1000 * 10)); }); - this.on("manualExit", async ({ args: { isSessionEnd } }: Message<{ isSessionEnd: boolean }>) => { + this.on("manualExit", async ({ isSessionEnd }) => { await this.executeExitHandlers(isSessionEnd); process.exit(0); }); diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts index 7ad00596d..db4c23180 100644 --- a/src/server/session/utilities/ipc.ts +++ b/src/server/session/utilities/ipc.ts @@ -11,7 +11,7 @@ export interface Message { args: T; } type InternalMessage = Message & { metadata: any }; -export type MessageHandler = Message> = (message: T) => any | Promise; +export type MessageHandler = (message: T) => any | Promise; export class PromisifiedIPCManager { private readonly target: IPCTarget; @@ -22,27 +22,22 @@ export class PromisifiedIPCManager { this.target = target; } - public emit = async (name: string, args?: any, awaitResponse = false): Promise => { - if (!this.target.send) { - return new Error("Cannot dispatch when send is undefined."); - } - if (awaitResponse) { - return new Promise(resolve => { - const messageId = Utils.GenerateGuid(); - const metadata: any = {}; - metadata[this.ipc_id] = messageId; - const responseHandler: MessageHandler = ({ metadata, args }) => { - if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) { - this.target.removeListener("message", responseHandler); - resolve(args?.error as Error | undefined); - } - }; - this.target.addListener("message", responseHandler); - this.target.send?.({ name, args, metadata }); - }); - } else { - this.target.send?.({ name, args }); - } + public emit = async (name: string, args?: any) => this.target.send?.({ name, args }); + + public emitPromise = async (name: string, args?: any) => { + return new Promise(resolve => { + const messageId = Utils.GenerateGuid(); + const metadata: any = {}; + metadata[this.ipc_id] = messageId; + const responseHandler: MessageHandler = ({ metadata, args }) => { + if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) { + this.target.removeListener("message", responseHandler); + resolve(args?.error as Error | undefined); + } + }; + this.target.addListener("message", responseHandler); + this.target.send?.({ name, args, metadata }); + }); } public setRouter = (router: Router) => { -- cgit v1.2.3-70-g09d2 From e0ccbb6b47bf612d29de515316b869fa6f7552fe Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sat, 11 Jan 2020 14:01:48 -0500 Subject: email fix --- src/server/DashSession/DashSessionAgent.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/server/DashSession') diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index de8e7240f..3c98c1e9d 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -88,7 +88,7 @@ export class DashSessionAgent extends AppliedSessionAgent { const error = await Email.dispatch({ to: notificationRecipient, subject: "Dash Release Session Admin Authentication Key", - content: `The key for this session (started @ ${new Date().toUTCString()}) is ${sessionKey}.\n\n${this.signature}` + content: `Here's the key for this session (started @ ${new Date().toUTCString()}):\n\n${sessionKey}.\n\n${this.signature}` }); if (error) { this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`)); -- cgit v1.2.3-70-g09d2 From 54a241ff71abc07a5dbdebce1b614f1024a767e6 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sat, 11 Jan 2020 15:08:26 -0500 Subject: final session cleanup --- src/server/ApiManagers/SessionManager.ts | 9 +- src/server/DashSession/DashSessionAgent.ts | 4 +- src/server/session/agents/monitor.ts | 16 ++-- .../session/agents/process_message_router.ts | 2 +- .../session/agents/promisified_ipc_manager.ts | 97 ++++++++++++++++++++++ src/server/session/agents/server_worker.ts | 10 +-- src/server/session/utilities/ipc.ts | 66 --------------- 7 files changed, 115 insertions(+), 89 deletions(-) create mode 100644 src/server/session/agents/promisified_ipc_manager.ts delete mode 100644 src/server/session/utilities/ipc.ts (limited to 'src/server/DashSession') diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts index a40b86dc5..d989d8d1b 100644 --- a/src/server/ApiManagers/SessionManager.ts +++ b/src/server/ApiManagers/SessionManager.ts @@ -30,13 +30,14 @@ export default class SessionManager extends ApiManager { method: Method.GET, subscription: this.secureSubscriber("debug", "mode?", "recipient?"), secureHandler: this.authorizedAction(async ({ req, res }) => { - let { mode } = req.params; + const { mode, recipient } = req.params; if (mode && !["passive", "active"].includes(mode)) { res.send(`Your request failed. '${mode}' is not a valid mode: please choose either 'active' or 'passive'`); } else { - !mode && (mode = "active"); - const recipient = req.params.recipient || DashSessionAgent.notificationRecipient; - const response = await sessionAgent.serverWorker.emitToMonitorPromise("debug", { mode, recipient }); + const response = await sessionAgent.serverWorker.emitToMonitorPromise("debug", { + mode: mode || "active", + recipient: recipient || DashSessionAgent.notificationRecipient + }); if (response instanceof Error) { res.send(response); } else { diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index 3c98c1e9d..fe7cdae88 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -11,7 +11,7 @@ import { resolve } from "path"; import { AppliedSessionAgent, ExitHandler } from "../session/agents/applied_session_agent"; import { Monitor } from "../session/agents/monitor"; import { ServerWorker } from "../session/agents/server_worker"; -import { MessageHandler } from "../session/utilities/ipc"; +import { MessageHandler } from "../session/agents/promisified_ipc_manager"; /** * If we're the monitor (master) thread, we should launch the monitor logic for the session. @@ -110,7 +110,7 @@ export class DashSessionAgent extends AppliedSessionAgent { content: this.generateCrashInstructions(crashCause) }); if (error) { - this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`)); + this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} ${yellow(`(${error.message})`)}`)); mainLog(red("distribution of crash notification experienced errors")); } else { mainLog(green("successfully distributed crash notification to recipients")); diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index d4abbb51e..5ea950b2b 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -2,13 +2,14 @@ import { ExitHandler } from "./applied_session_agent"; import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config"; import Repl, { ReplAction } from "../utilities/repl"; import { isWorker, setupMaster, on, Worker, fork } from "cluster"; -import { PromisifiedIPCManager, suffix, IPC, MessageHandler } from "../utilities/ipc"; +import { IPC_Promisify, MessageHandler } from "./promisified_ipc_manager"; import { red, cyan, white, yellow, blue } from "colors"; import { exec, ExecOptions } from "child_process"; import { validate, ValidationError } from "jsonschema"; import { Utilities } from "../utilities/utilities"; import { readFileSync } from "fs"; import ProcessMessageRouter from "./process_message_router"; +import { ServerWorker } from "./server_worker"; /** * Validates and reads the configuration file, accordingly builds a child process factory @@ -25,7 +26,7 @@ export class Monitor extends ProcessMessageRouter { public static Create(sessionKey: string) { if (isWorker) { - IPC(process).emit("kill", { + ServerWorker.IPCManager.emit("kill", { reason: "cannot create a monitor on the worker process.", graceful: false, errorCode: 1 @@ -210,7 +211,7 @@ export class Monitor extends ProcessMessageRouter { repl.registerCommand("exit", [/clean|force/], args => this.killSession("manual exit requested by repl", args[0] === "clean", 0)); repl.registerCommand("restart", [/clean|force/], args => this.killActiveWorker(args[0] === "clean")); repl.registerCommand("set", [letters, "port", number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === "true")); - repl.registerCommand("set", [/polling/, number, boolean], async args => { + repl.registerCommand("set", [/polling/, number, boolean], args => { const newPollingIntervalSeconds = Math.floor(Number(args[1])); if (newPollingIntervalSeconds < 0) { this.mainLog(red("the polling interval must be a non-negative integer")); @@ -218,7 +219,7 @@ export class Monitor extends ProcessMessageRouter { if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) { this.config.polling.intervalSeconds = newPollingIntervalSeconds; if (args[2] === "true") { - return Monitor.IPCManager.emitPromise("updatePollingInterval", { newPollingIntervalSeconds }); + Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds }); } } } @@ -279,16 +280,13 @@ export class Monitor extends ProcessMessageRouter { serverPort: ports.server, socketPort: ports.socket, pollingIntervalSeconds: intervalSeconds, - session_key: this.key, - ipc_suffix: suffix + session_key: this.key }); - Monitor.IPCManager = IPC(this.activeWorker); + Monitor.IPCManager = IPC_Promisify(this.activeWorker, this.route); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); this.on("kill", ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode), true); this.on("lifecycle", ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); - - Monitor.IPCManager.setRouter(this.route); } } diff --git a/src/server/session/agents/process_message_router.ts b/src/server/session/agents/process_message_router.ts index f60343514..d359e97c3 100644 --- a/src/server/session/agents/process_message_router.ts +++ b/src/server/session/agents/process_message_router.ts @@ -1,4 +1,4 @@ -import { MessageHandler, PromisifiedIPCManager } from "../utilities/ipc"; +import { MessageHandler, PromisifiedIPCManager } from "./promisified_ipc_manager"; export default abstract class ProcessMessageRouter { diff --git a/src/server/session/agents/promisified_ipc_manager.ts b/src/server/session/agents/promisified_ipc_manager.ts new file mode 100644 index 000000000..216e9be44 --- /dev/null +++ b/src/server/session/agents/promisified_ipc_manager.ts @@ -0,0 +1,97 @@ +import { Utils } from "../../../Utils"; +import { isMaster } from "cluster"; + +/** + * Convenience constructor + * @param target the process / worker to which to attach the specialized listeners + */ +export function IPC_Promisify(target: IPCTarget, router: Router) { + return new PromisifiedIPCManager(target, router); +} + +/** + * Essentially, a node process or node cluster worker + */ +export type IPCTarget = NodeJS.EventEmitter & { send?: Function }; + +/** + * Some external code that maps the name of incoming messages to registered handlers, if any + * when this returns, the message is assumed to have been handled in its entirety by the process, so + * await any asynchronous code inside this router. + */ +export type Router = (message: Message) => void | Promise; + +/** + * Specifies a general message format for this API + */ +export type Message = { name: string; args: T; }; +export type MessageHandler = (args: T) => any | Promise; + +/** + * When a message is emitted, it + */ +type InternalMessage = Message & { metadata: any }; +type InternalMessageHandler = (message: InternalMessage) => any | Promise; + +/** + * This is a wrapper utility class that allows the caller process + * to emit an event and return a promise that resolves when it and all + * other processes listening to its emission of this event have completed. + */ +export class PromisifiedIPCManager { + private readonly target: IPCTarget; + + constructor(target: IPCTarget, router: Router) { + this.target = target; + this.target.addListener("message", this.internalHandler(router)); + } + + /** + * A convenience wrapper around the standard process emission. + * Does not wait for a response. + */ + public emit = async (name: string, args?: any) => this.target.send?.({ name, args }); + + /** + * This routine uniquely identifies each message, then adds a general + * message listener that waits for a response with the same id before resolving + * the promise. + */ + public emitPromise = async (name: string, args?: any) => { + return new Promise(resolve => { + const messageId = Utils.GenerateGuid(); + const responseHandler: InternalMessageHandler = ({ metadata: { id, isResponse }, args, name }) => { + if (isResponse && id === messageId) { + this.target.removeListener("message", responseHandler); + resolve(args?.error as Error | undefined); + } + }; + this.target.addListener("message", responseHandler); + const message = { name, args, metadata: { id: messageId } }; + this.target.send?.(message); + }); + } + + /** + * This routine receives a uniquely identified message. If the message is itself a response, + * it is ignored to avoid infinite mutual responses. Otherwise, the routine awaits its completion using whatever + * router the caller has installed, and then sends a response containing the original message id, + * which will ultimately invoke the responseHandler of the original emission and resolve the + * sender's promise. + */ + private internalHandler = (router: Router) => async ({ name, args, metadata }: InternalMessage) => { + if (name && (!metadata || !metadata.isResponse)) { + let error: Error | undefined; + try { + await router({ name, args }); + } catch (e) { + error = e; + } + if (metadata && this.target.send) { + metadata.isResponse = true; + this.target.send({ name, args: { error }, metadata }); + } + } + } + +} \ No newline at end of file diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index 23ffb2650..705307030 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -1,6 +1,6 @@ import { ExitHandler } from "./applied_session_agent"; import { isMaster } from "cluster"; -import { PromisifiedIPCManager, Message, MessageHandler } from "../utilities/ipc"; +import { PromisifiedIPCManager } from "./promisified_ipc_manager"; import ProcessMessageRouter from "./process_message_router"; import { red, green, white, yellow } from "colors"; import { get } from "request-promise"; @@ -61,7 +61,7 @@ export class ServerWorker extends ProcessMessageRouter { private constructor(work: Function) { super(); - ServerWorker.IPCManager = new PromisifiedIPCManager(process); + ServerWorker.IPCManager = new PromisifiedIPCManager(process, this.route); this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env; @@ -80,12 +80,8 @@ export class ServerWorker extends ProcessMessageRouter { * server process. */ private configureProcess = () => { - ServerWorker.IPCManager.setRouter(this.route); // updates the local values of variables to the those sent from master - this.on("updatePollingInterval", ({ newPollingIntervalSeconds }) => { - this.pollingIntervalSeconds = newPollingIntervalSeconds; - return new Promise(resolve => setTimeout(resolve, 1000 * 10)); - }); + this.on("updatePollingInterval", ({ newPollingIntervalSeconds }) => this.pollingIntervalSeconds = newPollingIntervalSeconds); this.on("manualExit", async ({ isSessionEnd }) => { await this.executeExitHandlers(isSessionEnd); process.exit(0); diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts deleted file mode 100644 index c90b15907..000000000 --- a/src/server/session/utilities/ipc.ts +++ /dev/null @@ -1,66 +0,0 @@ -import { isMaster } from "cluster"; -import { Utils } from "../../../Utils"; - -export function IPC(target: IPCTarget) { - return new PromisifiedIPCManager(target); -} - -export type IPCTarget = NodeJS.EventEmitter & { send?: Function }; -export type Router = (message: Message) => void | Promise; - -export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix; - -type InternalMessage = Message & { metadata: any }; - -export interface Message { - name: string; - args: T; -} - -export type MessageHandler = (message: T) => any | Promise; - -export class PromisifiedIPCManager { - private readonly target: IPCTarget; - private readonly ipc_id = `ipc_id_${suffix}`; - private readonly is_response = `is_response_${suffix}`; - - constructor(target: IPCTarget) { - this.target = target; - } - - public emit = async (name: string, args?: any) => this.target.send?.({ name, args }); - - public emitPromise = async (name: string, args?: any) => { - return new Promise(resolve => { - const messageId = Utils.GenerateGuid(); - const metadata: any = {}; - metadata[this.ipc_id] = messageId; - const responseHandler: MessageHandler = ({ metadata, args }) => { - if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) { - this.target.removeListener("message", responseHandler); - resolve(args?.error as Error | undefined); - } - }; - this.target.addListener("message", responseHandler); - this.target.send?.({ name, args, metadata }); - }); - } - - public setRouter = (router: Router) => { - this.target.addListener("message", async ({ name, args, metadata }: InternalMessage) => { - if (name && (!metadata || !metadata[this.is_response])) { - let error: Error | undefined; - try { - await router({ name, args }); - } catch (e) { - error = e; - } - if (metadata && this.target.send) { - metadata[this.is_response] = true; - this.target.send({ name, args: { error }, metadata }); - } - } - }); - } - -} \ No newline at end of file -- cgit v1.2.3-70-g09d2 From 791499af4f474fe8ec7863ab9fe7b5b1120ac5ce Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sat, 11 Jan 2020 15:19:06 -0500 Subject: small changes --- src/server/ApiManagers/SessionManager.ts | 12 +++++------- src/server/DashSession/DashSessionAgent.ts | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) (limited to 'src/server/DashSession') diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts index d989d8d1b..1816d492d 100644 --- a/src/server/ApiManagers/SessionManager.ts +++ b/src/server/ApiManagers/SessionManager.ts @@ -29,15 +29,13 @@ export default class SessionManager extends ApiManager { register({ method: Method.GET, subscription: this.secureSubscriber("debug", "mode?", "recipient?"), - secureHandler: this.authorizedAction(async ({ req, res }) => { - const { mode, recipient } = req.params; - if (mode && !["passive", "active"].includes(mode)) { + secureHandler: this.authorizedAction(async ({ req: { params }, res }) => { + const mode = params.mode || "active"; + const recipient = params.recipient || DashSessionAgent.notificationRecipient; + if (!["passive", "active"].includes(mode)) { res.send(`Your request failed. '${mode}' is not a valid mode: please choose either 'active' or 'passive'`); } else { - const response = await sessionAgent.serverWorker.emitToMonitorPromise("debug", { - mode: mode || "active", - recipient: recipient || DashSessionAgent.notificationRecipient - }); + const response = await sessionAgent.serverWorker.emitToMonitorPromise("debug", { mode, recipient }); if (response instanceof Error) { res.send(response); } else { diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index fe7cdae88..a688f2909 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -88,7 +88,7 @@ export class DashSessionAgent extends AppliedSessionAgent { const error = await Email.dispatch({ to: notificationRecipient, subject: "Dash Release Session Admin Authentication Key", - content: `Here's the key for this session (started @ ${new Date().toUTCString()}):\n\n${sessionKey}.\n\n${this.signature}` + content: `Here's the key for this session (started @ ${new Date().toUTCString()}):\n\n${sessionKey}\n\n${this.signature}` }); if (error) { this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`)); -- cgit v1.2.3-70-g09d2 From dc51ee0dc771b3ac6ff6adc7c039df94935ef943 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sat, 11 Jan 2020 17:58:16 -0500 Subject: switched out to npm --- package.json | 1 + src/server/DashSession/DashSessionAgent.ts | 5 +- src/server/index.ts | 2 +- src/server/session/README.txt | 11 - src/server/session/agents/applied_session_agent.ts | 58 ---- src/server/session/agents/monitor.ts | 302 --------------------- .../session/agents/process_message_router.ts | 46 ---- .../session/agents/promisified_ipc_manager.ts | 97 ------- src/server/session/agents/server_worker.ts | 160 ----------- src/server/session/utilities/repl.ts | 128 --------- src/server/session/utilities/session_config.ts | 129 --------- src/server/session/utilities/utilities.ts | 31 --- 12 files changed, 3 insertions(+), 967 deletions(-) delete mode 100644 src/server/session/README.txt delete mode 100644 src/server/session/agents/applied_session_agent.ts delete mode 100644 src/server/session/agents/monitor.ts delete mode 100644 src/server/session/agents/process_message_router.ts delete mode 100644 src/server/session/agents/promisified_ipc_manager.ts delete mode 100644 src/server/session/agents/server_worker.ts delete mode 100644 src/server/session/utilities/repl.ts delete mode 100644 src/server/session/utilities/session_config.ts delete mode 100644 src/server/session/utilities/utilities.ts (limited to 'src/server/DashSession') diff --git a/package.json b/package.json index a936182cc..b8f7ccf65 100644 --- a/package.json +++ b/package.json @@ -218,6 +218,7 @@ "readline": "^1.3.0", "request": "^2.88.0", "request-promise": "^4.2.4", + "resilient-server-session": "^1.0.2", "rimraf": "^3.0.0", "serializr": "^1.5.1", "sharp": "^0.22.1", diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index a688f2909..f3943eba6 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -8,10 +8,7 @@ import { launchServer, onWindows } from ".."; import { existsSync, mkdirSync, readdirSync, statSync, createWriteStream, readFileSync } from "fs"; import * as Archiver from "archiver"; import { resolve } from "path"; -import { AppliedSessionAgent, ExitHandler } from "../session/agents/applied_session_agent"; -import { Monitor } from "../session/agents/monitor"; -import { ServerWorker } from "../session/agents/server_worker"; -import { MessageHandler } from "../session/agents/promisified_ipc_manager"; +import { AppliedSessionAgent, MessageHandler, ExitHandler, Monitor, ServerWorker } from "resilient-server-session"; /** * If we're the monitor (master) thread, we should launch the monitor logic for the session. diff --git a/src/server/index.ts b/src/server/index.ts index 0cce0dc54..058935bac 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -24,7 +24,7 @@ import { Logger } from "./ProcessFactory"; import { yellow } from "colors"; import { DashSessionAgent } from "./DashSession/DashSessionAgent"; import SessionManager from "./ApiManagers/SessionManager"; -import { AppliedSessionAgent } from "./session/agents/applied_session_agent"; +import { AppliedSessionAgent } from "resilient-server-session"; export const onWindows = process.platform === "win32"; export let sessionAgent: AppliedSessionAgent; diff --git a/src/server/session/README.txt b/src/server/session/README.txt deleted file mode 100644 index ac7d3d4e7..000000000 --- a/src/server/session/README.txt +++ /dev/null @@ -1,11 +0,0 @@ -/** - * These abstractions rely on NodeJS's cluster module, which allows a parent (master) process to share - * code with its children (workers). A simple `isMaster` flag indicates who is trying to access - * the code, and thus determines the functionality that actually gets invoked (checked by the caller, not internally). - * - * Think of the master thread as a factory, and the workers as the helpers that actually run the server. - * - * So, when we run `npm start`, given the appropriate check, initializeMaster() is called in the parent process - * This will spawn off its own child process (by default, mirrors the execution path of its parent), - * in which initializeWorker() is invoked. - */ \ No newline at end of file diff --git a/src/server/session/agents/applied_session_agent.ts b/src/server/session/agents/applied_session_agent.ts deleted file mode 100644 index 48226dab6..000000000 --- a/src/server/session/agents/applied_session_agent.ts +++ /dev/null @@ -1,58 +0,0 @@ -import { isMaster } from "cluster"; -import { Monitor } from "./monitor"; -import { ServerWorker } from "./server_worker"; -import { Utils } from "../../../Utils"; - -export type ExitHandler = (reason: Error | boolean) => void | Promise; - -export abstract class AppliedSessionAgent { - - // the following two methods allow the developer to create a custom - // session and use the built in customization options for each thread - protected abstract async initializeMonitor(monitor: Monitor, key: string): Promise; - protected abstract async initializeServerWorker(): Promise; - - private launched = false; - - public killSession = (reason: string, graceful = true, errorCode = 0) => { - const target = isMaster ? this.sessionMonitor : this.serverWorker; - target.killSession(reason, graceful, errorCode); - } - - private sessionMonitorRef: Monitor | undefined; - public get sessionMonitor(): Monitor { - if (!isMaster) { - this.serverWorker.emitToMonitor("kill", { - graceful: false, - reason: "Cannot access the session monitor directly from the server worker thread.", - errorCode: 1 - }); - throw new Error(); - } - return this.sessionMonitorRef!; - } - - private serverWorkerRef: ServerWorker | undefined; - public get serverWorker(): ServerWorker { - if (isMaster) { - throw new Error("Cannot access the server worker directly from the session monitor thread"); - } - return this.serverWorkerRef!; - } - - public async launch(): Promise { - if (!this.launched) { - this.launched = true; - if (isMaster) { - const sessionKey = Utils.GenerateGuid(); - await this.initializeMonitor(this.sessionMonitorRef = Monitor.Create(sessionKey), sessionKey); - this.sessionMonitorRef.finalize(); - } else { - this.serverWorkerRef = await this.initializeServerWorker(); - } - } else { - throw new Error("Cannot launch a session thread more than once per process."); - } - } - -} \ No newline at end of file diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts deleted file mode 100644 index 5ea950b2b..000000000 --- a/src/server/session/agents/monitor.ts +++ /dev/null @@ -1,302 +0,0 @@ -import { ExitHandler } from "./applied_session_agent"; -import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config"; -import Repl, { ReplAction } from "../utilities/repl"; -import { isWorker, setupMaster, on, Worker, fork } from "cluster"; -import { IPC_Promisify, MessageHandler } from "./promisified_ipc_manager"; -import { red, cyan, white, yellow, blue } from "colors"; -import { exec, ExecOptions } from "child_process"; -import { validate, ValidationError } from "jsonschema"; -import { Utilities } from "../utilities/utilities"; -import { readFileSync } from "fs"; -import ProcessMessageRouter from "./process_message_router"; -import { ServerWorker } from "./server_worker"; - -/** - * Validates and reads the configuration file, accordingly builds a child process factory - * and spawns off an initial process that will respawn as predecessors die. - */ -export class Monitor extends ProcessMessageRouter { - private static count = 0; - private finalized = false; - private exitHandlers: ExitHandler[] = []; - private readonly config: Configuration; - private activeWorker: Worker | undefined; - private key: string | undefined; - private repl: Repl; - - public static Create(sessionKey: string) { - if (isWorker) { - ServerWorker.IPCManager.emit("kill", { - reason: "cannot create a monitor on the worker process.", - graceful: false, - errorCode: 1 - }); - process.exit(1); - } else if (++Monitor.count > 1) { - console.error(red("cannot create more than one monitor.")); - process.exit(1); - } else { - return new Monitor(sessionKey); - } - } - - private constructor(sessionKey: string) { - super(); - this.config = this.loadAndValidateConfiguration(); - this.initialize(sessionKey); - this.repl = this.initializeRepl(); - } - - private initialize = (sessionKey: string) => { - console.log(this.timestamp(), cyan("initializing session...")); - this.key = sessionKey; - - // determines whether or not we see the compilation / initialization / runtime output of each child server process - const output = this.config.showServerOutput ? "inherit" : "ignore"; - setupMaster({ stdio: ["ignore", output, output, "ipc"] }); - - // handle exceptions in the master thread - there shouldn't be many of these - // the IPC (inter process communication) channel closed exception can't seem - // to be caught in a try catch, and is inconsequential, so it is ignored - process.on("uncaughtException", ({ message, stack }): void => { - if (message !== "Channel closed") { - this.mainLog(red(message)); - if (stack) { - this.mainLog(`uncaught exception\n${red(stack)}`); - } - } - }); - - // a helpful cluster event called on the master thread each time a child process exits - on("exit", ({ process: { pid } }, code, signal) => { - const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`; - this.mainLog(cyan(prompt)); - // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one - this.spawn(); - }); - } - - public finalize = (): void => { - if (this.finalized) { - throw new Error("Session monitor is already finalized"); - } - this.finalized = true; - this.spawn(); - } - - public readonly coreHooks = Object.freeze({ - onCrashDetected: (listener: MessageHandler<{ error: Error }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener), - onServerRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener) - }); - - /** - * Kill this session and its active child - * server process, either gracefully (may wait - * indefinitely, but at least allows active networking - * requests to complete) or immediately. - */ - public killSession = async (reason: string, graceful = true, errorCode = 0) => { - this.mainLog(cyan(`exiting session ${graceful ? "clean" : "immediate"}ly`)); - this.mainLog(`session exit reason: ${(red(reason))}`); - await this.executeExitHandlers(true); - this.killActiveWorker(graceful, true); - process.exit(errorCode); - } - - /** - * Execute the list of functions registered to be called - * whenever the process exits. - */ - public addExitHandler = (handler: ExitHandler) => this.exitHandlers.push(handler); - - /** - * Extend the default repl by adding in custom commands - * that can invoke application logic external to this module - */ - public addReplCommand = (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => { - this.repl.registerCommand(basename, argPatterns, action); - } - - public exec = (command: string, options?: ExecOptions) => { - return new Promise(resolve => { - exec(command, { ...options, encoding: "utf8" }, (error, stdout, stderr) => { - if (error) { - this.execLog(red(`unable to execute ${white(command)}`)); - error.message.split("\n").forEach(line => line.length && this.execLog(red(`(error) ${line}`))); - } else { - let outLines: string[], errorLines: string[]; - if ((outLines = stdout.split("\n").filter(line => line.length)).length) { - outLines.forEach(line => line.length && this.execLog(cyan(`(stdout) ${line}`))); - } - if ((errorLines = stderr.split("\n").filter(line => line.length)).length) { - errorLines.forEach(line => line.length && this.execLog(yellow(`(stderr) ${line}`))); - } - } - resolve(); - }); - }); - } - - /** - * Generates a blue UTC string associated with the time - * of invocation. - */ - private timestamp = () => blue(`[${new Date().toUTCString()}]`); - - /** - * A formatted, identified and timestamped log in color - */ - public mainLog = (...optionalParams: any[]) => { - console.log(this.timestamp(), this.config.identifiers.master.text, ...optionalParams); - } - - /** - * A formatted, identified and timestamped log in color for non- - */ - private execLog = (...optionalParams: any[]) => { - console.log(this.timestamp(), this.config.identifiers.exec.text, ...optionalParams); - } - - /** - * Reads in configuration .json file only once, in the master thread - * and pass down any variables the pertinent to the child processes as environment variables. - */ - private loadAndValidateConfiguration = (): Configuration => { - let config: Configuration; - try { - console.log(this.timestamp(), cyan("validating configuration...")); - config = JSON.parse(readFileSync('./session.config.json', 'utf8')); - const options = { - throwError: true, - allowUnknownAttributes: false - }; - // ensure all necessary and no excess information is specified by the configuration file - validate(config, configurationSchema, options); - config = Utilities.preciseAssign({}, defaultConfig, config); - } catch (error) { - if (error instanceof ValidationError) { - console.log(red("\nSession configuration failed.")); - console.log("The given session.config.json configuration file is invalid."); - console.log(`${error.instance}: ${error.stack}`); - process.exit(0); - } else if (error.code === "ENOENT" && error.path === "./session.config.json") { - console.log(cyan("Loading default session parameters...")); - console.log("Consider including a session.config.json configuration file in your project root for customization."); - config = Utilities.preciseAssign({}, defaultConfig); - } else { - console.log(red("\nSession configuration failed.")); - console.log("The following unknown error occurred during configuration."); - console.log(error.stack); - process.exit(0); - } - } finally { - const { identifiers } = config!; - Object.keys(identifiers).forEach(key => { - const resolved = key as keyof Identifiers; - const { text, color } = identifiers[resolved]; - identifiers[resolved].text = (colorMapping.get(color) || white)(`${text}:`); - }); - return config!; - } - } - - /** - * Builds the repl that allows the following commands to be typed into stdin of the master thread. - */ - private initializeRepl = (): Repl => { - const repl = new Repl({ identifier: () => `${this.timestamp()} ${this.config.identifiers.master.text}` }); - const boolean = /true|false/; - const number = /\d+/; - const letters = /[a-zA-Z]+/; - repl.registerCommand("exit", [/clean|force/], args => this.killSession("manual exit requested by repl", args[0] === "clean", 0)); - repl.registerCommand("restart", [/clean|force/], args => this.killActiveWorker(args[0] === "clean")); - repl.registerCommand("set", [letters, "port", number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === "true")); - repl.registerCommand("set", [/polling/, number, boolean], args => { - const newPollingIntervalSeconds = Math.floor(Number(args[1])); - if (newPollingIntervalSeconds < 0) { - this.mainLog(red("the polling interval must be a non-negative integer")); - } else { - if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) { - this.config.polling.intervalSeconds = newPollingIntervalSeconds; - if (args[2] === "true") { - Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds }); - } - } - } - }); - return repl; - } - - private executeExitHandlers = async (reason: Error | boolean) => Promise.all(this.exitHandlers.map(handler => handler(reason))); - - /** - * Attempts to kill the active worker gracefully, unless otherwise specified. - */ - private killActiveWorker = (graceful = true, isSessionEnd = false): void => { - if (this.activeWorker && !this.activeWorker.isDead()) { - if (graceful) { - Monitor.IPCManager.emit("manualExit", { isSessionEnd }); - } else { - this.activeWorker.process.kill(); - } - } - } - - /** - * Allows the caller to set the port at which the target (be it the server, - * the websocket, some other custom port) is listening. If an immediate restart - * is specified, this monitor will kill the active child and re-launch the server - * at the port. Otherwise, the updated port won't be used until / unless the child - * dies on its own and triggers a restart. - */ - private setPort = (port: "server" | "socket" | string, value: number, immediateRestart: boolean): void => { - if (value > 1023 && value < 65536) { - this.config.ports[port] = value; - if (immediateRestart) { - this.killActiveWorker(); - } - } else { - this.mainLog(red(`${port} is an invalid port number`)); - } - } - - /** - * Kills the current active worker and proceeds to spawn a new worker, - * feeding in configuration information as environment variables. - */ - private spawn = (): void => { - const { - polling: { - route, - failureTolerance, - intervalSeconds - }, - ports - } = this.config; - this.killActiveWorker(); - this.activeWorker = fork({ - pollingRoute: route, - pollingFailureTolerance: failureTolerance, - serverPort: ports.server, - socketPort: ports.socket, - pollingIntervalSeconds: intervalSeconds, - session_key: this.key - }); - Monitor.IPCManager = IPC_Promisify(this.activeWorker, this.route); - this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); - - this.on("kill", ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode), true); - this.on("lifecycle", ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); - } - -} - -export namespace Monitor { - - export enum IntrinsicEvents { - KeyGenerated = "key_generated", - CrashDetected = "crash_detected", - ServerRunning = "server_running" - } - -} \ No newline at end of file diff --git a/src/server/session/agents/process_message_router.ts b/src/server/session/agents/process_message_router.ts deleted file mode 100644 index d359e97c3..000000000 --- a/src/server/session/agents/process_message_router.ts +++ /dev/null @@ -1,46 +0,0 @@ -import { MessageHandler, PromisifiedIPCManager } from "./promisified_ipc_manager"; - -export default abstract class ProcessMessageRouter { - - protected static IPCManager: PromisifiedIPCManager; - private onMessage: { [name: string]: MessageHandler[] | undefined } = {}; - - /** - * Add a listener at this message. When the monitor process - * receives a message, it will invoke all registered functions. - */ - public on = (name: string, handler: MessageHandler, exclusive = false) => { - const handlers = this.onMessage[name]; - if (exclusive || !handlers) { - this.onMessage[name] = [handler]; - } else { - handlers.push(handler); - } - } - - /** - * Unregister a given listener at this message. - */ - public off = (name: string, handler: MessageHandler) => { - const handlers = this.onMessage[name]; - if (handlers) { - const index = handlers.indexOf(handler); - if (index > -1) { - handlers.splice(index, 1); - } - } - } - - /** - * Unregister all listeners at this message. - */ - public clearMessageListeners = (...names: string[]) => names.map(name => this.onMessage[name] = undefined); - - protected route: MessageHandler = async ({ name, args }) => { - const handlers = this.onMessage[name]; - if (handlers) { - await Promise.all(handlers.map(handler => handler(args))); - } - } - -} \ No newline at end of file diff --git a/src/server/session/agents/promisified_ipc_manager.ts b/src/server/session/agents/promisified_ipc_manager.ts deleted file mode 100644 index 216e9be44..000000000 --- a/src/server/session/agents/promisified_ipc_manager.ts +++ /dev/null @@ -1,97 +0,0 @@ -import { Utils } from "../../../Utils"; -import { isMaster } from "cluster"; - -/** - * Convenience constructor - * @param target the process / worker to which to attach the specialized listeners - */ -export function IPC_Promisify(target: IPCTarget, router: Router) { - return new PromisifiedIPCManager(target, router); -} - -/** - * Essentially, a node process or node cluster worker - */ -export type IPCTarget = NodeJS.EventEmitter & { send?: Function }; - -/** - * Some external code that maps the name of incoming messages to registered handlers, if any - * when this returns, the message is assumed to have been handled in its entirety by the process, so - * await any asynchronous code inside this router. - */ -export type Router = (message: Message) => void | Promise; - -/** - * Specifies a general message format for this API - */ -export type Message = { name: string; args: T; }; -export type MessageHandler = (args: T) => any | Promise; - -/** - * When a message is emitted, it - */ -type InternalMessage = Message & { metadata: any }; -type InternalMessageHandler = (message: InternalMessage) => any | Promise; - -/** - * This is a wrapper utility class that allows the caller process - * to emit an event and return a promise that resolves when it and all - * other processes listening to its emission of this event have completed. - */ -export class PromisifiedIPCManager { - private readonly target: IPCTarget; - - constructor(target: IPCTarget, router: Router) { - this.target = target; - this.target.addListener("message", this.internalHandler(router)); - } - - /** - * A convenience wrapper around the standard process emission. - * Does not wait for a response. - */ - public emit = async (name: string, args?: any) => this.target.send?.({ name, args }); - - /** - * This routine uniquely identifies each message, then adds a general - * message listener that waits for a response with the same id before resolving - * the promise. - */ - public emitPromise = async (name: string, args?: any) => { - return new Promise(resolve => { - const messageId = Utils.GenerateGuid(); - const responseHandler: InternalMessageHandler = ({ metadata: { id, isResponse }, args, name }) => { - if (isResponse && id === messageId) { - this.target.removeListener("message", responseHandler); - resolve(args?.error as Error | undefined); - } - }; - this.target.addListener("message", responseHandler); - const message = { name, args, metadata: { id: messageId } }; - this.target.send?.(message); - }); - } - - /** - * This routine receives a uniquely identified message. If the message is itself a response, - * it is ignored to avoid infinite mutual responses. Otherwise, the routine awaits its completion using whatever - * router the caller has installed, and then sends a response containing the original message id, - * which will ultimately invoke the responseHandler of the original emission and resolve the - * sender's promise. - */ - private internalHandler = (router: Router) => async ({ name, args, metadata }: InternalMessage) => { - if (name && (!metadata || !metadata.isResponse)) { - let error: Error | undefined; - try { - await router({ name, args }); - } catch (e) { - error = e; - } - if (metadata && this.target.send) { - metadata.isResponse = true; - this.target.send({ name, args: { error }, metadata }); - } - } - } - -} \ No newline at end of file diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts deleted file mode 100644 index 705307030..000000000 --- a/src/server/session/agents/server_worker.ts +++ /dev/null @@ -1,160 +0,0 @@ -import { ExitHandler } from "./applied_session_agent"; -import { isMaster } from "cluster"; -import { PromisifiedIPCManager } from "./promisified_ipc_manager"; -import ProcessMessageRouter from "./process_message_router"; -import { red, green, white, yellow } from "colors"; -import { get } from "request-promise"; -import { Monitor } from "./monitor"; - -/** - * Effectively, each worker repairs the connection to the server by reintroducing a consistent state - * if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification - * email if the server encounters an uncaught exception or if the server cannot be reached. - */ -export class ServerWorker extends ProcessMessageRouter { - private static count = 0; - private shouldServerBeResponsive = false; - private exitHandlers: ExitHandler[] = []; - private pollingFailureCount = 0; - private pollingIntervalSeconds: number; - private pollingFailureTolerance: number; - private pollTarget: string; - private serverPort: number; - private isInitialized = false; - - public static Create(work: Function) { - if (isMaster) { - console.error(red("cannot create a worker on the monitor process.")); - process.exit(1); - } else if (++ServerWorker.count > 1) { - ServerWorker.IPCManager.emit("kill", { - reason: "cannot create more than one worker on a given worker process.", - graceful: false, - errorCode: 1 - }); - process.exit(1); - } else { - return new ServerWorker(work); - } - } - - /** - * Allows developers to invoke application specific logic - * by hooking into the exiting of the server process. - */ - public addExitHandler = (handler: ExitHandler) => this.exitHandlers.push(handler); - - /** - * Kill the session monitor (parent process) from this - * server worker (child process). This will also kill - * this process (child process). - */ - public killSession = (reason: string, graceful = true, errorCode = 0) => this.emitToMonitor("kill", { reason, graceful, errorCode }); - - /** - * A convenience wrapper to tell the session monitor (parent process) - * to carry out the action with the specified message and arguments. - */ - public emitToMonitor = (name: string, args?: any) => ServerWorker.IPCManager.emit(name, args); - - public emitToMonitorPromise = (name: string, args?: any) => ServerWorker.IPCManager.emitPromise(name, args); - - private constructor(work: Function) { - super(); - ServerWorker.IPCManager = new PromisifiedIPCManager(process, this.route); - this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); - - const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env; - this.serverPort = Number(serverPort); - this.pollingIntervalSeconds = Number(pollingIntervalSeconds); - this.pollingFailureTolerance = Number(pollingFailureTolerance); - this.pollTarget = `http://localhost:${serverPort}${pollingRoute}`; - - this.configureProcess(); - work(); - this.pollServer(); - } - - /** - * Set up message and uncaught exception handlers for this - * server process. - */ - private configureProcess = () => { - // updates the local values of variables to the those sent from master - this.on("updatePollingInterval", ({ newPollingIntervalSeconds }) => this.pollingIntervalSeconds = newPollingIntervalSeconds); - this.on("manualExit", async ({ isSessionEnd }) => { - await this.executeExitHandlers(isSessionEnd); - process.exit(0); - }); - - // one reason to exit, as the process might be in an inconsistent state after such an exception - process.on('uncaughtException', this.proactiveUnplannedExit); - process.on('unhandledRejection', reason => { - const appropriateError = reason instanceof Error ? reason : new Error(`unhandled rejection: ${reason}`); - this.proactiveUnplannedExit(appropriateError); - }); - } - - /** - * Execute the list of functions registered to be called - * whenever the process exits. - */ - private executeExitHandlers = async (reason: Error | boolean) => Promise.all(this.exitHandlers.map(handler => handler(reason))); - - /** - * Notify master thread (which will log update in the console) of initialization via IPC. - */ - public lifecycleNotification = (event: string) => ServerWorker.IPCManager.emit("lifecycle", { event }); - - /** - * Called whenever the process has a reason to terminate, either through an uncaught exception - * in the process (potentially inconsistent state) or the server cannot be reached. - */ - private proactiveUnplannedExit = async (error: Error): Promise => { - this.shouldServerBeResponsive = false; - // communicates via IPC to the master thread that it should dispatch a crash notification email - this.emitToMonitor(Monitor.IntrinsicEvents.CrashDetected, { error }); - await this.executeExitHandlers(error); - // notify master thread (which will log update in the console) of crash event via IPC - this.lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`)); - this.lifecycleNotification(red(error.message)); - process.exit(1); - } - - /** - * This monitors the health of the server by submitting a get request to whatever port / route specified - * by the configuration every n seconds, where n is also given by the configuration. - */ - private pollServer = async (): Promise => { - await new Promise(resolve => { - setTimeout(async () => { - try { - await get(this.pollTarget); - if (!this.shouldServerBeResponsive) { - // notify monitor thread that the server is up and running - this.lifecycleNotification(green(`listening on ${this.serverPort}...`)); - this.emitToMonitor(Monitor.IntrinsicEvents.ServerRunning, { isFirstTime: !this.isInitialized }); - this.isInitialized = true; - } - this.shouldServerBeResponsive = true; - } catch (error) { - // if we expect the server to be unavailable, i.e. during compilation, - // the listening variable is false, activeExit will return early and the child - // process will continue - if (this.shouldServerBeResponsive) { - if (++this.pollingFailureCount > this.pollingFailureTolerance) { - this.proactiveUnplannedExit(error); - } else { - this.lifecycleNotification(yellow(`the server has encountered ${this.pollingFailureCount} of ${this.pollingFailureTolerance} tolerable failures`)); - } - } - } finally { - resolve(); - } - }, 1000 * this.pollingIntervalSeconds); - }); - // controlled, asynchronous infinite recursion achieves a persistent poll that does not submit a new request until the previous has completed - this.pollServer(); - } - -} \ No newline at end of file diff --git a/src/server/session/utilities/repl.ts b/src/server/session/utilities/repl.ts deleted file mode 100644 index 643141286..000000000 --- a/src/server/session/utilities/repl.ts +++ /dev/null @@ -1,128 +0,0 @@ -import { createInterface, Interface } from "readline"; -import { red, green, white } from "colors"; - -export interface Configuration { - identifier: () => string | string; - onInvalid?: (command: string, validCommand: boolean) => string | string; - onValid?: (success?: string) => string | string; - isCaseSensitive?: boolean; -} - -export type ReplAction = (parsedArgs: Array) => any | Promise; -export interface Registration { - argPatterns: RegExp[]; - action: ReplAction; -} - -export default class Repl { - private identifier: () => string | string; - private onInvalid: ((command: string, validCommand: boolean) => string) | string; - private onValid: ((success: string) => string) | string; - private isCaseSensitive: boolean; - private commandMap = new Map(); - public interface: Interface; - private busy = false; - private keys: string | undefined; - - constructor({ identifier: prompt, onInvalid, onValid, isCaseSensitive }: Configuration) { - this.identifier = prompt; - this.onInvalid = onInvalid || this.usage; - this.onValid = onValid || this.success; - this.isCaseSensitive = isCaseSensitive ?? true; - this.interface = createInterface(process.stdin, process.stdout).on('line', this.considerInput); - } - - private resolvedIdentifier = () => typeof this.identifier === "string" ? this.identifier : this.identifier(); - - private usage = (command: string, validCommand: boolean) => { - if (validCommand) { - const formatted = white(command); - const patterns = green(this.commandMap.get(command)!.map(({ argPatterns }) => `${formatted} ${argPatterns.join(" ")}`).join('\n')); - return `${this.resolvedIdentifier()}\nthe given arguments do not match any registered patterns for ${formatted}\nthe list of valid argument patterns is given by:\n${patterns}`; - } else { - const resolved = this.keys; - if (resolved) { - return resolved; - } - const members: string[] = []; - const keys = this.commandMap.keys(); - let next: IteratorResult; - while (!(next = keys.next()).done) { - members.push(next.value); - } - return `${this.resolvedIdentifier()} commands: { ${members.sort().join(", ")} }`; - } - } - - private success = (command: string) => `${this.resolvedIdentifier()} completed local execution of ${white(command)}`; - - public registerCommand = (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => { - const existing = this.commandMap.get(basename); - const converted = argPatterns.map(input => input instanceof RegExp ? input : new RegExp(input)); - const registration = { argPatterns: converted, action }; - if (existing) { - existing.push(registration); - } else { - this.commandMap.set(basename, [registration]); - } - } - - private invalid = (command: string, validCommand: boolean) => { - console.log(red(typeof this.onInvalid === "string" ? this.onInvalid : this.onInvalid(command, validCommand))); - this.busy = false; - } - - private valid = (command: string) => { - console.log(green(typeof this.onValid === "string" ? this.onValid : this.onValid(command))); - this.busy = false; - } - - private considerInput = async (line: string) => { - if (this.busy) { - console.log(red("Busy")); - return; - } - this.busy = true; - line = line.trim(); - if (this.isCaseSensitive) { - line = line.toLowerCase(); - } - const [command, ...args] = line.split(/\s+/g); - if (!command) { - return this.invalid(command, false); - } - const registered = this.commandMap.get(command); - if (registered) { - const { length } = args; - const candidates = registered.filter(({ argPatterns: { length: count } }) => count === length); - for (const { argPatterns, action } of candidates) { - const parsed: string[] = []; - let matched = true; - if (length) { - for (let i = 0; i < length; i++) { - let matches: RegExpExecArray | null; - if ((matches = argPatterns[i].exec(args[i])) === null) { - matched = false; - break; - } - parsed.push(matches[0]); - } - } - if (!length || matched) { - const result = action(parsed); - const resolve = () => this.valid(`${command} ${parsed.join(" ")}`); - if (result instanceof Promise) { - result.then(resolve); - } else { - resolve(); - } - return; - } - } - this.invalid(command, true); - } else { - this.invalid(command, false); - } - } - -} \ No newline at end of file diff --git a/src/server/session/utilities/session_config.ts b/src/server/session/utilities/session_config.ts deleted file mode 100644 index b0e65dde4..000000000 --- a/src/server/session/utilities/session_config.ts +++ /dev/null @@ -1,129 +0,0 @@ -import { Schema } from "jsonschema"; -import { yellow, red, cyan, green, blue, magenta, Color, grey, gray, white, black } from "colors"; - -const colorPattern = /black|red|green|yellow|blue|magenta|cyan|white|gray|grey/; - -const identifierProperties: Schema = { - type: "object", - properties: { - text: { - type: "string", - minLength: 1 - }, - color: { - type: "string", - pattern: colorPattern - } - } -}; - -const portProperties: Schema = { - type: "number", - minimum: 1024, - maximum: 65535 -}; - -export const configurationSchema: Schema = { - id: "/configuration", - type: "object", - properties: { - showServerOutput: { type: "boolean" }, - ports: { - type: "object", - properties: { - server: portProperties, - socket: portProperties - }, - required: ["server"], - additionalProperties: true - }, - identifiers: { - type: "object", - properties: { - master: identifierProperties, - worker: identifierProperties, - exec: identifierProperties - } - }, - polling: { - type: "object", - additionalProperties: false, - properties: { - intervalSeconds: { - type: "number", - minimum: 1, - maximum: 86400 - }, - route: { - type: "string", - pattern: /\/[a-zA-Z]*/g - }, - failureTolerance: { - type: "number", - minimum: 0, - } - } - }, - } -}; - -type ColorLabel = "yellow" | "red" | "cyan" | "green" | "blue" | "magenta" | "grey" | "gray" | "white" | "black"; - -export const colorMapping: Map = new Map([ - ["yellow", yellow], - ["red", red], - ["cyan", cyan], - ["green", green], - ["blue", blue], - ["magenta", magenta], - ["grey", grey], - ["gray", gray], - ["white", white], - ["black", black] -]); - -interface Identifier { - text: string; - color: ColorLabel; -} - -export interface Identifiers { - master: Identifier; - worker: Identifier; - exec: Identifier; -} - -export interface Configuration { - showServerOutput: boolean; - identifiers: Identifiers; - ports: { [description: string]: number }; - polling: { - route: string; - intervalSeconds: number; - failureTolerance: number; - }; -} - -export const defaultConfig: Configuration = { - showServerOutput: false, - identifiers: { - master: { - text: "__monitor__", - color: "yellow" - }, - worker: { - text: "__server__", - color: "magenta" - }, - exec: { - text: "__exec__", - color: "green" - } - }, - ports: { server: 3000 }, - polling: { - route: "/", - intervalSeconds: 30, - failureTolerance: 0 - } -}; \ No newline at end of file diff --git a/src/server/session/utilities/utilities.ts b/src/server/session/utilities/utilities.ts deleted file mode 100644 index ac8a6590a..000000000 --- a/src/server/session/utilities/utilities.ts +++ /dev/null @@ -1,31 +0,0 @@ -export namespace Utilities { - - /** - * At any arbitrary layer of nesting within the configuration objects, any single value that - * is not specified by the configuration is given the default counterpart. If, within an object, - * one peer is given by configuration and two are not, the one is preserved while the two are given - * the default value. - * @returns the composition of all of the assigned objects, much like Object.assign(), but with more - * granularity in the overwriting of nested objects - */ - export function preciseAssign(target: any, ...sources: any[]): any { - for (const source of sources) { - preciseAssignHelper(target, source); - } - return target; - } - - export function preciseAssignHelper(target: any, source: any) { - Array.from(new Set([...Object.keys(target), ...Object.keys(source)])).map(property => { - let targetValue: any, sourceValue: any; - if (sourceValue = source[property]) { - if (typeof sourceValue === "object" && typeof (targetValue = target[property]) === "object") { - preciseAssignHelper(targetValue, sourceValue); - } else { - target[property] = sourceValue; - } - } - }); - } - -} \ No newline at end of file -- cgit v1.2.3-70-g09d2 From 8ce88dfc0319c17542b33db11e8334f59c3293ff Mon Sep 17 00:00:00 2001 From: Sam Wilkins <35748010+samwilkins333@users.noreply.github.com> Date: Sat, 11 Jan 2020 23:40:49 -0500 Subject: added types --- src/server/DashSession/DashSessionAgent.ts | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) (limited to 'src/server/DashSession') diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index f3943eba6..a4e8d851e 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -24,14 +24,14 @@ export class DashSessionAgent extends AppliedSessionAgent { * The core method invoked when the single master thread is initialized. * Installs event hooks, repl commands and additional IPC listeners. */ - protected async initializeMonitor(monitor: Monitor, sessionKey: string) { + protected async initializeMonitor(monitor: Monitor, sessionKey: string): Promise { await this.dispatchSessionPassword(sessionKey); monitor.addReplCommand("pull", [], () => monitor.exec("git pull")); monitor.addReplCommand("solr", [/start|stop|index/], this.executeSolrCommand); monitor.addReplCommand("backup", [], this.backup); monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient)); monitor.on("backup", this.backup); - monitor.on("debug", ({ mode, recipient }) => this.dispatchZippedDebugBackup(mode, recipient)); + monitor.on("debug", async ({ mode, recipient }) => this.dispatchZippedDebugBackup(mode, recipient)); monitor.coreHooks.onCrashDetected(this.dispatchCrashReport); } @@ -39,7 +39,7 @@ export class DashSessionAgent extends AppliedSessionAgent { * The core method invoked when a server worker thread is initialized. * Installs logic to be executed when the server worker dies. */ - protected async initializeServerWorker() { + protected async initializeServerWorker(): Promise { const worker = ServerWorker.Create(launchServer); // server initialization delegated to worker worker.addExitHandler(this.notifyClient); return worker; @@ -49,7 +49,7 @@ export class DashSessionAgent extends AppliedSessionAgent { * Prepares the body of the email with instructions on restoring the transmitted remote database backup locally. */ private _remoteDebugInstructions: string | undefined; - private generateDebugInstructions = (zipName: string, target: string) => { + private generateDebugInstructions = (zipName: string, target: string): string => { if (!this._remoteDebugInstructions) { this._remoteDebugInstructions = readFileSync(resolve(__dirname, "./templates/remote_debug_instructions.txt"), { encoding: "utf8" }); } @@ -63,7 +63,7 @@ export class DashSessionAgent extends AppliedSessionAgent { * Prepares the body of the email with information regarding a crash event. */ private _crashInstructions: string | undefined; - private generateCrashInstructions({ name, message, stack }: Error) { + private generateCrashInstructions({ name, message, stack }: Error): string { if (!this._crashInstructions) { this._crashInstructions = readFileSync(resolve(__dirname, "./templates/crash_instructions.txt"), { encoding: "utf8" }); } @@ -78,14 +78,18 @@ export class DashSessionAgent extends AppliedSessionAgent { * This sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone * to kill the server via the /kill/:key route. */ - private dispatchSessionPassword = async (sessionKey: string) => { + private dispatchSessionPassword = async (sessionKey: string): Promise => { const { mainLog } = this.sessionMonitor; const { notificationRecipient } = DashSessionAgent; mainLog(green("dispatching session key...")); const error = await Email.dispatch({ to: notificationRecipient, subject: "Dash Release Session Admin Authentication Key", - content: `Here's the key for this session (started @ ${new Date().toUTCString()}):\n\n${sessionKey}\n\n${this.signature}` + content: [ + `Here's the key for this session (started @ ${new Date().toUTCString()}):`, + sessionKey, + this.signature + ].join("\n\n") }); if (error) { this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`)); @@ -118,7 +122,7 @@ export class DashSessionAgent extends AppliedSessionAgent { * Logic for interfacing with Solr. Either starts it, * stops it, or rebuilds its indicies. */ - private executeSolrCommand = async (args: string[]) => { + private executeSolrCommand = async (args: string[]): Promise => { const { exec, mainLog } = this.sessionMonitor; const action = args[0]; if (action === "index") { @@ -151,7 +155,7 @@ export class DashSessionAgent extends AppliedSessionAgent { * Performs a backup of the database, saved to the desktop subdirectory. * This should work as is only on our specific release server. */ - private backup = async () => this.sessionMonitor.exec("backup.bat", { cwd: this.releaseDesktop }); + private backup = async (): Promise => this.sessionMonitor.exec("backup.bat", { cwd: this.releaseDesktop }); /** * Compress either a brand new backup or the most recent backup and send it @@ -159,7 +163,7 @@ export class DashSessionAgent extends AppliedSessionAgent { * @param mode specifies whether or not to make a new backup before exporting * @param to the recipient of the email */ - private async dispatchZippedDebugBackup(mode: string, to: string) { + private async dispatchZippedDebugBackup(mode: string, to: string): Promise { const { mainLog } = this.sessionMonitor; try { // if desired, complete an immediate backup to send -- cgit v1.2.3-70-g09d2 From 83dbddfdc3f954eb398a69e5d60025b8b337d580 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sun, 12 Jan 2020 02:19:28 -0500 Subject: remove zipped backup after dispatch --- src/server/DashSession/DashSessionAgent.ts | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) (limited to 'src/server/DashSession') diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index f3943eba6..86cddcea1 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -5,10 +5,11 @@ import { Utils } from "../../Utils"; import { WebSocket } from "../Websocket/Websocket"; import { MessageStore } from "../Message"; import { launchServer, onWindows } from ".."; -import { existsSync, mkdirSync, readdirSync, statSync, createWriteStream, readFileSync } from "fs"; +import { readdirSync, statSync, createWriteStream, readFileSync, unlinkSync } from "fs"; import * as Archiver from "archiver"; import { resolve } from "path"; import { AppliedSessionAgent, MessageHandler, ExitHandler, Monitor, ServerWorker } from "resilient-server-session"; +import rimraf = require("rimraf"); /** * If we're the monitor (master) thread, we should launch the monitor logic for the session. @@ -168,12 +169,7 @@ export class DashSessionAgent extends AppliedSessionAgent { mainLog("backup complete"); } - // ensure the directory for compressed backups exists const backupsDirectory = `${this.releaseDesktop}/backups`; - const compressedDirectory = `${this.releaseDesktop}/compressed`; - if (!existsSync(compressedDirectory)) { - mkdirSync(compressedDirectory); - } // sort all backups by their modified time, and choose the most recent one const target = readdirSync(backupsDirectory).map(filename => ({ @@ -184,7 +180,7 @@ export class DashSessionAgent extends AppliedSessionAgent { // create a zip file and to it, write the contents of the backup directory const zipName = `${target}.zip`; - const zipPath = `${compressedDirectory}/${zipName}`; + const zipPath = `${this.releaseDesktop}/${zipName}`; const output = createWriteStream(zipPath); const zip = Archiver('zip'); zip.pipe(output); @@ -200,6 +196,8 @@ export class DashSessionAgent extends AppliedSessionAgent { attachments: [{ filename: zipName, path: zipPath }] }); + unlinkSync(zipPath); + // indicate success or failure mainLog(`${error === null ? green("successfully dispatched") : red("failed to dispatch")} ${zipName} to ${cyan(to)}`); error && mainLog(red(error.message)); -- cgit v1.2.3-70-g09d2 From 03a5af39e1b9d710b2882669a027cfc5ca98ed9f Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sun, 12 Jan 2020 02:20:06 -0500 Subject: small --- src/server/DashSession/DashSessionAgent.ts | 1 - 1 file changed, 1 deletion(-) (limited to 'src/server/DashSession') diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index eab624d32..8aa84f1dd 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -9,7 +9,6 @@ import { readdirSync, statSync, createWriteStream, readFileSync, unlinkSync } fr import * as Archiver from "archiver"; import { resolve } from "path"; import { AppliedSessionAgent, MessageHandler, ExitHandler, Monitor, ServerWorker } from "resilient-server-session"; -import rimraf = require("rimraf"); /** * If we're the monitor (master) thread, we should launch the monitor logic for the session. -- cgit v1.2.3-70-g09d2 From e6995511a36d5ced79e0738f06487bec592cb992 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sun, 12 Jan 2020 02:33:40 -0500 Subject: remove backup for debug --- src/server/DashSession/DashSessionAgent.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src/server/DashSession') diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index 8aa84f1dd..bf07cf89e 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -9,6 +9,7 @@ import { readdirSync, statSync, createWriteStream, readFileSync, unlinkSync } fr import * as Archiver from "archiver"; import { resolve } from "path"; import { AppliedSessionAgent, MessageHandler, ExitHandler, Monitor, ServerWorker } from "resilient-server-session"; +import rimraf = require("rimraf"); /** * If we're the monitor (master) thread, we should launch the monitor logic for the session. @@ -184,10 +185,11 @@ export class DashSessionAgent extends AppliedSessionAgent { // create a zip file and to it, write the contents of the backup directory const zipName = `${target}.zip`; const zipPath = `${this.releaseDesktop}/${zipName}`; + const targetPath = `${backupsDirectory}/${target}`; const output = createWriteStream(zipPath); const zip = Archiver('zip'); zip.pipe(output); - zip.directory(`${backupsDirectory}/${target}/Dash`, false); + zip.directory(`${targetPath}/Dash`, false); await zip.finalize(); mainLog(`zip finalized with size ${statSync(zipPath).size} bytes, saved to ${zipPath}`); @@ -200,6 +202,7 @@ export class DashSessionAgent extends AppliedSessionAgent { }); unlinkSync(zipPath); + rimraf.sync(targetPath); // indicate success or failure mainLog(`${error === null ? green("successfully dispatched") : red("failed to dispatch")} ${zipName} to ${cyan(to)}`); -- cgit v1.2.3-70-g09d2 From 9d17eb8923c0a0366b15d7b634f7229371d74286 Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Sun, 12 Jan 2020 02:54:24 -0500 Subject: backup fixes --- src/server/ApiManagers/SessionManager.ts | 13 ++++--------- src/server/DashSession/DashSessionAgent.ts | 15 ++++++++------- 2 files changed, 12 insertions(+), 16 deletions(-) (limited to 'src/server/DashSession') diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts index 463dfc739..9bf0e4b50 100644 --- a/src/server/ApiManagers/SessionManager.ts +++ b/src/server/ApiManagers/SessionManager.ts @@ -28,16 +28,11 @@ export default class SessionManager extends ApiManager { register({ method: Method.GET, - subscription: this.secureSubscriber("debug", "mode?", "recipient?"), + subscription: this.secureSubscriber("debug", "to?"), secureHandler: this.authorizedAction(async ({ req: { params }, res }) => { - const mode = params.mode || "active"; - const recipient = params.recipient || DashSessionAgent.notificationRecipient; - if (!["passive", "active"].includes(mode)) { - res.send(`Your request failed. '${mode}' is not a valid mode: please choose either 'active' or 'passive'`); - } else { - const { error } = await sessionAgent.serverWorker.emitToMonitorPromise("debug", { mode, recipient }); - res.send(error ? error.message : `Your request was successful: the server ${mode === "active" ? "created and compressed a new" : "retrieved and compressed the most recent"} back up. It was sent to ${recipient}.`); - } + const to = params.to || DashSessionAgent.notificationRecipient; + const { error } = await sessionAgent.serverWorker.emitToMonitorPromise("debug", { to }); + res.send(error ? error.message : `Your request was successful: the server captured and compressed (but did not save) a new back up. It was sent to ${to}.`); }) }); diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index bf07cf89e..c55e01243 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -30,9 +30,9 @@ export class DashSessionAgent extends AppliedSessionAgent { monitor.addReplCommand("pull", [], () => monitor.exec("git pull")); monitor.addReplCommand("solr", [/start|stop|index/], this.executeSolrCommand); monitor.addReplCommand("backup", [], this.backup); - monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient)); + monitor.addReplCommand("debug", [/\S+\@\S+/], async ([to]) => this.dispatchZippedDebugBackup(to)); monitor.on("backup", this.backup); - monitor.on("debug", async ({ mode, recipient }) => this.dispatchZippedDebugBackup(mode, recipient)); + monitor.on("debug", async ({ to }) => this.dispatchZippedDebugBackup(to)); monitor.coreHooks.onCrashDetected(this.dispatchCrashReport); } @@ -164,14 +164,12 @@ export class DashSessionAgent extends AppliedSessionAgent { * @param mode specifies whether or not to make a new backup before exporting * @param to the recipient of the email */ - private async dispatchZippedDebugBackup(mode: string, to: string): Promise { + private async dispatchZippedDebugBackup(to: string): Promise { const { mainLog } = this.sessionMonitor; try { // if desired, complete an immediate backup to send - if (mode === "active") { - await this.backup(); - mainLog("backup complete"); - } + await this.backup(); + mainLog("backup complete"); const backupsDirectory = `${this.releaseDesktop}/backups`; @@ -201,6 +199,9 @@ export class DashSessionAgent extends AppliedSessionAgent { attachments: [{ filename: zipName, path: zipPath }] }); + // since this is intended to be a zero-footprint operation, clean up + // by unlinking both the backup generated earlier in the function and the compressed zip file. + // to generate a persistent backup, just run backup. unlinkSync(zipPath); rimraf.sync(targetPath); -- cgit v1.2.3-70-g09d2