diff options
Diffstat (limited to 'src/server/DashSession')
5 files changed, 205 insertions, 196 deletions
diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index 1a5934d8f..1ef7a131d 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -1,18 +1,18 @@ -import { Email, pathFromRoot } from "../ActionUtilities"; -import { red, yellow, green, cyan } from "colors"; -import { get } from "request-promise"; -import { Utils } from "../../Utils"; -import { WebSocket } from "../websocket"; -import { MessageStore } from "../Message"; -import { launchServer, onWindows } from ".."; -import { readdirSync, statSync, createWriteStream, readFileSync, unlinkSync } from "fs"; -import * as Archiver from "archiver"; -import { resolve } from "path"; -import rimraf = require("rimraf"); -import { AppliedSessionAgent, ExitHandler } from "./Session/agents/applied_session_agent"; -import { ServerWorker } from "./Session/agents/server_worker"; -import { Monitor } from "./Session/agents/monitor"; -import { MessageHandler, ErrorLike } from "./Session/agents/promisified_ipc_manager"; +import { Email, pathFromRoot } from '../ActionUtilities'; +import { red, yellow, green, cyan } from 'colors'; +import { get } from 'request-promise'; +import { Utils } from '../../Utils'; +import { WebSocket } from '../websocket'; +import { MessageStore } from '../Message'; +import { launchServer, onWindows } from '..'; +import { readdirSync, statSync, createWriteStream, readFileSync, unlinkSync } from 'fs'; +import * as Archiver from 'archiver'; +import { resolve } from 'path'; +import { rimraf } from 'rimraf'; +import { AppliedSessionAgent, ExitHandler } from './Session/agents/applied_session_agent'; +import { ServerWorker } from './Session/agents/server_worker'; +import { Monitor } from './Session/agents/monitor'; +import { MessageHandler, ErrorLike } from './Session/agents/promisified_ipc_manager'; /** * If we're the monitor (master) thread, we should launch the monitor logic for the session. @@ -20,9 +20,8 @@ import { MessageHandler, ErrorLike } from "./Session/agents/promisified_ipc_mana * our job should be to run the server. */ export class DashSessionAgent extends AppliedSessionAgent { - - private readonly signature = "-Dash Server Session Manager"; - private readonly releaseDesktop = pathFromRoot("../../Desktop"); + private readonly signature = '-Dash Server Session Manager'; + private readonly releaseDesktop = pathFromRoot('../../Desktop'); /** * The core method invoked when the single master thread is initialized. @@ -31,13 +30,13 @@ export class DashSessionAgent extends AppliedSessionAgent { protected async initializeMonitor(monitor: Monitor): Promise<string> { const sessionKey = Utils.GenerateGuid(); await this.dispatchSessionPassword(sessionKey); - monitor.addReplCommand("pull", [], () => monitor.exec("git pull")); - monitor.addReplCommand("solr", [/start|stop|index/], this.executeSolrCommand); - monitor.addReplCommand("backup", [], this.backup); - monitor.addReplCommand("debug", [/\S+\@\S+/], async ([to]) => this.dispatchZippedDebugBackup(to)); - monitor.on("backup", this.backup); - monitor.on("debug", async ({ to }) => this.dispatchZippedDebugBackup(to)); - monitor.on("delete", WebSocket.doDelete); + monitor.addReplCommand('pull', [], () => monitor.exec('git pull')); + monitor.addReplCommand('solr', [/start|stop|index/], this.executeSolrCommand); + monitor.addReplCommand('backup', [], this.backup); + monitor.addReplCommand('debug', [/\S+\@\S+/], async ([to]) => this.dispatchZippedDebugBackup(to)); + monitor.on('backup', this.backup); + monitor.on('debug', async ({ to }) => this.dispatchZippedDebugBackup(to)); + monitor.on('delete', WebSocket.doDelete); monitor.coreHooks.onCrashDetected(this.dispatchCrashReport); return sessionKey; } @@ -58,13 +57,13 @@ export class DashSessionAgent extends AppliedSessionAgent { private _remoteDebugInstructions: string | undefined; private generateDebugInstructions = (zipName: string, target: string): string => { if (!this._remoteDebugInstructions) { - this._remoteDebugInstructions = readFileSync(resolve(__dirname, "./templates/remote_debug_instructions.txt"), { encoding: "utf8" }); + this._remoteDebugInstructions = readFileSync(resolve(__dirname, './templates/remote_debug_instructions.txt'), { encoding: 'utf8' }); } return this._remoteDebugInstructions .replace(/__zipname__/, zipName) .replace(/__target__/, target) .replace(/__signature__/, this.signature); - } + }; /** * Prepares the body of the email with information regarding a crash event. @@ -72,12 +71,12 @@ export class DashSessionAgent extends AppliedSessionAgent { private _crashInstructions: string | undefined; private generateCrashInstructions({ name, message, stack }: ErrorLike): string { if (!this._crashInstructions) { - this._crashInstructions = readFileSync(resolve(__dirname, "./templates/crash_instructions.txt"), { encoding: "utf8" }); + this._crashInstructions = readFileSync(resolve(__dirname, './templates/crash_instructions.txt'), { encoding: 'utf8' }); } return this._crashInstructions - .replace(/__name__/, name || "[no error name found]") - .replace(/__message__/, message || "[no error message found]") - .replace(/__stack__/, stack || "[no error stack found]") + .replace(/__name__/, name || '[no error name found]') + .replace(/__message__/, message || '[no error message found]') + .replace(/__stack__/, stack || '[no error stack found]') .replace(/__signature__/, this.signature); } @@ -88,23 +87,19 @@ export class DashSessionAgent extends AppliedSessionAgent { private dispatchSessionPassword = async (sessionKey: string): Promise<void> => { const { mainLog } = this.sessionMonitor; const { notificationRecipient } = DashSessionAgent; - mainLog(green("dispatching session key...")); + mainLog(green('dispatching session key...')); const error = await Email.dispatch({ to: notificationRecipient, - subject: "Dash Release Session Admin Authentication Key", - content: [ - `Here's the key for this session (started @ ${new Date().toUTCString()}):`, - sessionKey, - this.signature - ].join("\n\n") + subject: 'Dash Release Session Admin Authentication Key', + content: [`Here's the key for this session (started @ ${new Date().toUTCString()}):`, sessionKey, this.signature].join('\n\n'), }); if (error) { this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`)); - mainLog(red("distribution of session key experienced errors")); + mainLog(red('distribution of session key experienced errors')); } else { - mainLog(green("successfully distributed session key to recipients")); + mainLog(green('successfully distributed session key to recipients')); } - } + }; /** * This sends an email with the generated crash report. @@ -114,37 +109,37 @@ export class DashSessionAgent extends AppliedSessionAgent { const { notificationRecipient } = DashSessionAgent; const error = await Email.dispatch({ to: notificationRecipient, - subject: "Dash Web Server Crash", - content: this.generateCrashInstructions(crashCause) + subject: 'Dash Web Server Crash', + content: this.generateCrashInstructions(crashCause), }); if (error) { this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} ${yellow(`(${error.message})`)}`)); - mainLog(red("distribution of crash notification experienced errors")); + mainLog(red('distribution of crash notification experienced errors')); } else { - mainLog(green("successfully distributed crash notification to recipients")); + mainLog(green('successfully distributed crash notification to recipients')); } - } + }; /** - * Logic for interfacing with Solr. Either starts it, + * Logic for interfacing with Solr. Either starts it, * stops it, or rebuilds its indices. */ private executeSolrCommand = async (args: string[]): Promise<void> => { const { exec, mainLog } = this.sessionMonitor; const action = args[0]; - if (action === "index") { - exec("npx ts-node ./updateSearch.ts", { cwd: pathFromRoot("./src/server") }); + if (action === 'index') { + exec('npx ts-node ./updateSearch.ts', { cwd: pathFromRoot('./src/server') }); } else { - const command = `${onWindows ? "solr.cmd" : "solr"} ${args[0] === "start" ? "start" : "stop -p 8983"}`; - await exec(command, { cwd: "./solr-8.3.1/bin" }); + const command = `${onWindows ? 'solr.cmd' : 'solr'} ${args[0] === 'start' ? 'start' : 'stop -p 8983'}`; + await exec(command, { cwd: './solr-8.3.1/bin' }); try { - await get("http://localhost:8983"); - mainLog(green("successfully connected to 8983 after running solr initialization")); + await get('http://localhost:8983'); + mainLog(green('successfully connected to 8983 after running solr initialization')); } catch { - mainLog(red("unable to connect at 8983 after running solr initialization")); + mainLog(red('unable to connect at 8983 after running solr initialization')); } } - } + }; /** * Broadcast to all clients that their connection @@ -153,16 +148,16 @@ export class DashSessionAgent extends AppliedSessionAgent { private notifyClient: ExitHandler = reason => { const { _socket } = WebSocket; if (_socket) { - const message = typeof reason === "boolean" ? (reason ? "exit" : "temporary") : "crash"; + const message = typeof reason === 'boolean' ? (reason ? 'exit' : 'temporary') : 'crash'; Utils.Emit(_socket, MessageStore.ConnectionTerminated, message); } - } + }; /** * Performs a backup of the database, saved to the desktop subdirectory. * This should work as is only on our specific release server. */ - private backup = async (): Promise<void> => this.sessionMonitor.exec("backup.bat", { cwd: this.releaseDesktop }); + private backup = async (): Promise<void> => this.sessionMonitor.exec('backup.bat', { cwd: this.releaseDesktop }); /** * Compress either a brand new backup or the most recent backup and send it @@ -175,15 +170,17 @@ export class DashSessionAgent extends AppliedSessionAgent { try { // if desired, complete an immediate backup to send await this.backup(); - mainLog("backup complete"); + mainLog('backup complete'); const backupsDirectory = `${this.releaseDesktop}/backups`; // sort all backups by their modified time, and choose the most recent one - const target = readdirSync(backupsDirectory).map(filename => ({ - modifiedTime: statSync(`${backupsDirectory}/${filename}`).mtimeMs, - filename - })).sort((a, b) => b.modifiedTime - a.modifiedTime)[0].filename; + const target = readdirSync(backupsDirectory) + .map(filename => ({ + modifiedTime: statSync(`${backupsDirectory}/${filename}`).mtimeMs, + filename, + })) + .sort((a, b) => b.modifiedTime - a.modifiedTime)[0].filename; mainLog(`targeting ${target}...`); // create a zip file and to it, write the contents of the backup directory @@ -202,28 +199,25 @@ export class DashSessionAgent extends AppliedSessionAgent { to, subject: `Remote debug: compressed backup of ${target}...`, content: this.generateDebugInstructions(zipName, target), - attachments: [{ filename: zipName, path: zipPath }] + attachments: [{ filename: zipName, path: zipPath }], }); - // since this is intended to be a zero-footprint operation, clean up + // since this is intended to be a zero-footprint operation, clean up // by unlinking both the backup generated earlier in the function and the compressed zip file. // to generate a persistent backup, just run backup. unlinkSync(zipPath); rimraf.sync(targetPath); // indicate success or failure - mainLog(`${error === null ? green("successfully dispatched") : red("failed to dispatch")} ${zipName} to ${cyan(to)}`); + mainLog(`${error === null ? green('successfully dispatched') : red('failed to dispatch')} ${zipName} to ${cyan(to)}`); error && mainLog(red(error.message)); } catch (error: any) { - mainLog(red("unable to dispatch zipped backup...")); + mainLog(red('unable to dispatch zipped backup...')); mainLog(red(error.message)); } } - } export namespace DashSessionAgent { - - export const notificationRecipient = "browndashptc@gmail.com"; - + export const notificationRecipient = 'browndashptc@gmail.com'; } diff --git a/src/server/DashSession/Session/agents/applied_session_agent.ts b/src/server/DashSession/Session/agents/applied_session_agent.ts index 8339a06dc..2037e93e5 100644 --- a/src/server/DashSession/Session/agents/applied_session_agent.ts +++ b/src/server/DashSession/Session/agents/applied_session_agent.ts @@ -1,7 +1,8 @@ -import { isMaster } from "cluster"; +import * as _cluster from "cluster"; import { Monitor } from "./monitor"; import { ServerWorker } from "./server_worker"; -import { Utilities } from "../utilities/utilities"; +const cluster = _cluster as any; +const isMaster = cluster.isPrimary; export type ExitHandler = (reason: Error | boolean) => void | Promise<void>; @@ -15,13 +16,13 @@ export abstract class AppliedSessionAgent { private launched = false; public killSession = (reason: string, graceful = true, errorCode = 0) => { - const target = isMaster ? this.sessionMonitor : this.serverWorker; + const target = cluster.default.isPrimary ? this.sessionMonitor : this.serverWorker; target.killSession(reason, graceful, errorCode); } private sessionMonitorRef: Monitor | undefined; public get sessionMonitor(): Monitor { - if (!isMaster) { + if (!cluster.default.isPrimary) { this.serverWorker.emit("kill", { graceful: false, reason: "Cannot access the session monitor directly from the server worker thread.", diff --git a/src/server/DashSession/Session/agents/monitor.ts b/src/server/DashSession/Session/agents/monitor.ts index 9cb5ab576..a6fde4356 100644 --- a/src/server/DashSession/Session/agents/monitor.ts +++ b/src/server/DashSession/Session/agents/monitor.ts @@ -1,15 +1,21 @@ -import { ExitHandler } from "./applied_session_agent"; -import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config"; -import Repl, { ReplAction } from "../utilities/repl"; -import { isWorker, setupMaster, on, Worker, fork } from "cluster"; -import { manage, MessageHandler, ErrorLike } from "./promisified_ipc_manager"; -import { red, cyan, white, yellow, blue } from "colors"; -import { exec, ExecOptions } from "child_process"; -import { validate, ValidationError } from "jsonschema"; -import { Utilities } from "../utilities/utilities"; -import { readFileSync } from "fs"; -import IPCMessageReceiver from "./process_message_router"; -import { ServerWorker } from "./server_worker"; +import { ExitHandler } from './applied_session_agent'; +import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from '../utilities/session_config'; +import Repl, { ReplAction } from '../utilities/repl'; +import * as _cluster from 'cluster'; +import { Worker } from 'cluster'; +import { manage, MessageHandler, ErrorLike } from './promisified_ipc_manager'; +import { red, cyan, white, yellow, blue } from 'colors'; +import { exec, ExecOptions } from 'child_process'; +import { validate, ValidationError } from 'jsonschema'; +import { Utilities } from '../utilities/utilities'; +import { readFileSync } from 'fs'; +import IPCMessageReceiver from './process_message_router'; +import { ServerWorker } from './server_worker'; +const cluster = _cluster as any; +const isWorker = cluster.isWorker; +const setupMaster = cluster.setupPrimary; +const on = cluster.on; +const fork = cluster.fork; /** * Validates and reads the configuration file, accordingly builds a child process factory @@ -26,14 +32,14 @@ export class Monitor extends IPCMessageReceiver { public static Create() { if (isWorker) { - ServerWorker.IPCManager.emit("kill", { - reason: "cannot create a monitor on the worker process.", + ServerWorker.IPCManager.emit('kill', { + reason: 'cannot create a monitor on the worker process.', graceful: false, - errorCode: 1 + errorCode: 1, }); process.exit(1); } else if (++Monitor.count > 1) { - console.error(red("cannot create more than one monitor.")); + console.error(red('cannot create more than one monitor.')); process.exit(1); } else { return new Monitor(); @@ -42,7 +48,7 @@ export class Monitor extends IPCMessageReceiver { private constructor() { super(); - console.log(this.timestamp(), cyan("initializing session...")); + console.log(this.timestamp(), cyan('initializing session...')); this.configureInternalHandlers(); this.config = this.loadAndValidateConfiguration(); this.initializeClusterFunctions(); @@ -53,8 +59,8 @@ export class Monitor extends IPCMessageReceiver { // handle exceptions in the master thread - there shouldn't be many of these // the IPC (inter process communication) channel closed exception can't seem // to be caught in a try catch, and is inconsequential, so it is ignored - process.on("uncaughtException", ({ message, stack }): void => { - if (message !== "Channel closed") { + process.on('uncaughtException', ({ message, stack }): void => { + if (message !== 'Channel closed') { this.mainLog(red(message)); if (stack) { this.mainLog(`uncaught exception\n${red(stack)}`); @@ -62,36 +68,36 @@ export class Monitor extends IPCMessageReceiver { } }); - this.on("kill", ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode)); - this.on("lifecycle", ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`)); - } + this.on('kill', ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode)); + this.on('lifecycle', ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`)); + }; private initializeClusterFunctions = () => { // determines whether or not we see the compilation / initialization / runtime output of each child server process - const output = this.config.showServerOutput ? "inherit" : "ignore"; - setupMaster({ stdio: ["ignore", output, output, "ipc"] }); + const output = this.config.showServerOutput ? 'inherit' : 'ignore'; + setupMaster({ stdio: ['ignore', output, output, 'ipc'] }); // a helpful cluster event called on the master thread each time a child process exits - on("exit", ({ process: { pid } }, code, signal) => { - const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`; + on('exit', ({ process: { pid } }: { process: { pid: any } }, code: any, signal: any) => { + const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? '' : `, having encountered signal ${signal}`}.`; this.mainLog(cyan(prompt)); // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one this.spawn(); }); - } + }; public finalize = (sessionKey: string): void => { if (this.finalized) { - throw new Error("Session monitor is already finalized"); + throw new Error('Session monitor is already finalized'); } this.finalized = true; this.key = sessionKey; this.spawn(); - } + }; public readonly coreHooks = Object.freeze({ onCrashDetected: (listener: MessageHandler<{ error: ErrorLike }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener), - onServerRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener) + onServerRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener), }); /** @@ -101,12 +107,12 @@ export class Monitor extends IPCMessageReceiver { * requests to complete) or immediately. */ public killSession = async (reason: string, graceful = true, errorCode = 0) => { - this.mainLog(cyan(`exiting session ${graceful ? "clean" : "immediate"}ly`)); - this.mainLog(`session exit reason: ${(red(reason))}`); + this.mainLog(cyan(`exiting session ${graceful ? 'clean' : 'immediate'}ly`)); + this.mainLog(`session exit reason: ${red(reason)}`); await this.executeExitHandlers(true); await this.killActiveWorker(graceful, true); process.exit(errorCode); - } + }; /** * Execute the list of functions registered to be called @@ -120,27 +126,27 @@ export class Monitor extends IPCMessageReceiver { */ public addReplCommand = (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => { this.repl.registerCommand(basename, argPatterns, action); - } + }; public exec = (command: string, options?: ExecOptions) => { return new Promise<void>(resolve => { - exec(command, { ...options, encoding: "utf8" }, (error, stdout, stderr) => { + exec(command, { ...options, encoding: 'utf8' }, (error, stdout, stderr) => { if (error) { this.execLog(red(`unable to execute ${white(command)}`)); - error.message.split("\n").forEach(line => line.length && this.execLog(red(`(error) ${line}`))); + error.message.split('\n').forEach(line => line.length && this.execLog(red(`(error) ${line}`))); } else { let outLines: string[], errorLines: string[]; - if ((outLines = stdout.split("\n").filter(line => line.length)).length) { + if ((outLines = stdout.split('\n').filter(line => line.length)).length) { outLines.forEach(line => line.length && this.execLog(cyan(`(stdout) ${line}`))); } - if ((errorLines = stderr.split("\n").filter(line => line.length)).length) { + if ((errorLines = stderr.split('\n').filter(line => line.length)).length) { errorLines.forEach(line => line.length && this.execLog(yellow(`(stderr) ${line}`))); } } resolve(); }); }); - } + }; /** * Generates a blue UTC string associated with the time @@ -153,14 +159,14 @@ export class Monitor extends IPCMessageReceiver { */ public mainLog = (...optionalParams: any[]) => { console.log(this.timestamp(), this.config.identifiers.master.text, ...optionalParams); - } + }; /** * A formatted, identified and timestamped log in color for non- */ private execLog = (...optionalParams: any[]) => { console.log(this.timestamp(), this.config.identifiers.exec.text, ...optionalParams); - } + }; /** * Reads in configuration .json file only once, in the master thread @@ -169,28 +175,28 @@ export class Monitor extends IPCMessageReceiver { private loadAndValidateConfiguration = (): Configuration => { let config: Configuration | undefined; try { - console.log(this.timestamp(), cyan("validating configuration...")); + console.log(this.timestamp(), cyan('validating configuration...')); config = JSON.parse(readFileSync('./session.config.json', 'utf8')); const options = { throwError: true, - allowUnknownAttributes: false + allowUnknownAttributes: false, }; // ensure all necessary and no excess information is specified by the configuration file validate(config, configurationSchema, options); config = Utilities.preciseAssign({}, defaultConfig, config); } catch (error: any) { if (error instanceof ValidationError) { - console.log(red("\nSession configuration failed.")); - console.log("The given session.config.json configuration file is invalid."); + console.log(red('\nSession configuration failed.')); + console.log('The given session.config.json configuration file is invalid.'); console.log(`${error.instance}: ${error.stack}`); process.exit(0); - } else if (error.code === "ENOENT" && error.path === "./session.config.json") { - console.log(cyan("Loading default session parameters...")); - console.log("Consider including a session.config.json configuration file in your project root for customization."); + } else if (error.code === 'ENOENT' && error.path === './session.config.json') { + console.log(cyan('Loading default session parameters...')); + console.log('Consider including a session.config.json configuration file in your project root for customization.'); config = Utilities.preciseAssign({}, defaultConfig); } else { - console.log(red("\nSession configuration failed.")); - console.log("The following unknown error occurred during configuration."); + console.log(red('\nSession configuration failed.')); + console.log('The following unknown error occurred during configuration.'); console.log(error.stack); process.exit(0); } @@ -203,7 +209,7 @@ export class Monitor extends IPCMessageReceiver { }); return config!; } - } + }; /** * Builds the repl that allows the following commands to be typed into stdin of the master thread. @@ -213,24 +219,24 @@ export class Monitor extends IPCMessageReceiver { const boolean = /true|false/; const number = /\d+/; const letters = /[a-zA-Z]+/; - repl.registerCommand("exit", [/clean|force/], args => this.killSession("manual exit requested by repl", args[0] === "clean", 0)); - repl.registerCommand("restart", [/clean|force/], args => this.killActiveWorker(args[0] === "clean")); - repl.registerCommand("set", [letters, "port", number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === "true")); - repl.registerCommand("set", [/polling/, number, boolean], args => { + repl.registerCommand('exit', [/clean|force/], args => this.killSession('manual exit requested by repl', args[0] === 'clean', 0)); + repl.registerCommand('restart', [/clean|force/], args => this.killActiveWorker(args[0] === 'clean')); + repl.registerCommand('set', [letters, 'port', number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === 'true')); + repl.registerCommand('set', [/polling/, number, boolean], args => { const newPollingIntervalSeconds = Math.floor(Number(args[1])); if (newPollingIntervalSeconds < 0) { - this.mainLog(red("the polling interval must be a non-negative integer")); + this.mainLog(red('the polling interval must be a non-negative integer')); } else { if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) { this.config.polling.intervalSeconds = newPollingIntervalSeconds; - if (args[2] === "true") { - Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds }); + if (args[2] === 'true') { + Monitor.IPCManager.emit('updatePollingInterval', { newPollingIntervalSeconds }); } } } }); return repl; - } + }; private executeExitHandlers = async (reason: Error | boolean) => Promise.all(this.exitHandlers.map(handler => handler(reason))); @@ -240,13 +246,13 @@ export class Monitor extends IPCMessageReceiver { private killActiveWorker = async (graceful = true, isSessionEnd = false): Promise<void> => { if (this.activeWorker && !this.activeWorker.isDead()) { if (graceful) { - Monitor.IPCManager.emit("manualExit", { isSessionEnd }); + Monitor.IPCManager.emit('manualExit', { isSessionEnd }); } else { await ServerWorker.IPCManager.destroy(); this.activeWorker.process.kill(); } } - } + }; /** * Allows the caller to set the port at which the target (be it the server, @@ -255,7 +261,7 @@ export class Monitor extends IPCMessageReceiver { * at the port. Otherwise, the updated port won't be used until / unless the child * dies on its own and triggers a restart. */ - private setPort = (port: "server" | "socket" | string, value: number, immediateRestart: boolean): void => { + private setPort = (port: 'server' | 'socket' | string, value: number, immediateRestart: boolean): void => { if (value > 1023 && value < 65536) { this.config.ports[port] = value; if (immediateRestart) { @@ -264,7 +270,7 @@ export class Monitor extends IPCMessageReceiver { } else { this.mainLog(red(`${port} is an invalid port number`)); } - } + }; /** * Kills the current active worker and proceeds to spawn a new worker, @@ -272,27 +278,29 @@ export class Monitor extends IPCMessageReceiver { */ private spawn = async (): Promise<void> => { await this.killActiveWorker(); - const { config: { polling, ports }, key } = this; + const { + config: { polling, ports }, + key, + } = this; this.activeWorker = fork({ pollingRoute: polling.route, pollingFailureTolerance: polling.failureTolerance, serverPort: ports.server, socketPort: ports.socket, pollingIntervalSeconds: polling.intervalSeconds, - session_key: key + session_key: key, }); - Monitor.IPCManager = manage(this.activeWorker.process, this.handlers); + if (this.activeWorker) { + Monitor.IPCManager = manage(this.activeWorker.process, this.handlers); + } this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); - } - + }; } export namespace Monitor { - export enum IntrinsicEvents { - KeyGenerated = "key_generated", - CrashDetected = "crash_detected", - ServerRunning = "server_running" + KeyGenerated = 'key_generated', + CrashDetected = 'crash_detected', + ServerRunning = 'server_running', } - -}
\ No newline at end of file +} diff --git a/src/server/DashSession/Session/agents/promisified_ipc_manager.ts b/src/server/DashSession/Session/agents/promisified_ipc_manager.ts index f6c8de521..76e218977 100644 --- a/src/server/DashSession/Session/agents/promisified_ipc_manager.ts +++ b/src/server/DashSession/Session/agents/promisified_ipc_manager.ts @@ -1,9 +1,9 @@ -import { Utilities } from "../utilities/utilities"; -import { ChildProcess } from "child_process"; +import { Utilities } from '../utilities/utilities'; +import { ChildProcess } from 'child_process'; /** * Convenience constructor - * @param target the process / worker to which to attach the specialized listeners + * @param target the process / worker to which to attach the specialized listeners */ export function manage(target: IPCTarget, handlers?: HandlerMap) { return new PromisifiedIPCManager(target, handlers); @@ -18,26 +18,30 @@ export type HandlerMap = { [name: string]: MessageHandler[] }; /** * This will always literally be a child process. But, though setting * up a manager in the parent will indeed see the target as the ChildProcess, - * setting up a manager in the child will just see itself as a regular NodeJS.Process. + * setting up a manager in the child will just see itself as a regular NodeJS.Process. */ export type IPCTarget = NodeJS.Process | ChildProcess; /** - * Specifies a general message format for this API + * Specifies a general message format for this API */ export type Message<T = any> = { name: string; args?: T; }; -export type MessageHandler<T = any> = (args: T) => (any | Promise<any>); +export type MessageHandler<T = any> = (args: T) => any | Promise<any>; /** * When a message is emitted, it is embedded with private metadata * to facilitate the resolution of promises, etc. */ -interface InternalMessage extends Message { metadata: Metadata; } -interface Metadata { isResponse: boolean; id: string; } -type InternalMessageHandler = (message: InternalMessage) => (any | Promise<any>); +interface InternalMessage extends Message { + metadata: Metadata; +} +interface Metadata { + isResponse: boolean; + id: string; +} /** * Allows for the transmission of the error's key features over IPC. @@ -56,12 +60,12 @@ export interface Response<T = any> { error?: ErrorLike; } -const destroyEvent = "__destroy__"; +const destroyEvent = '__destroy__'; /** * This is a wrapper utility class that allows the caller process * to emit an event and return a promise that resolves when it and all - * other processes listening to its emission of this event have completed. + * other processes listening to its emission of this event have completed. */ export class PromisifiedIPCManager { private readonly target: IPCTarget; @@ -75,7 +79,7 @@ export class PromisifiedIPCManager { this.target = target; if (handlers) { handlers[destroyEvent] = [this.destroyHelper]; - this.target.addListener("message", this.generateInternalHandler(handlers)); + this.target.addListener('message', this.generateInternalHandler(handlers)); } } @@ -86,26 +90,27 @@ export class PromisifiedIPCManager { */ public emit = async <T = any>(name: string, args?: any): Promise<Response<T>> => { if (this.isDestroyed) { - const error = { name: "FailedDispatch", message: "Cannot use a destroyed IPC manager to emit a message." }; + const error = { name: 'FailedDispatch', message: 'Cannot use a destroyed IPC manager to emit a message.' }; return { error }; } return new Promise<Response<T>>(resolve => { const messageId = Utilities.guid(); + type InternalMessageHandler = (message: any /* MessageListener*/) => any | Promise<any>; const responseHandler: InternalMessageHandler = ({ metadata: { id, isResponse }, args }) => { if (isResponse && id === messageId) { - this.target.removeListener("message", responseHandler); + this.target.removeListener('message', responseHandler); resolve(args); } }; - this.target.addListener("message", responseHandler); + this.target.addListener('message', responseHandler); const message = { name, args, metadata: { id: messageId, isResponse: false } }; if (!(this.target.send && this.target.send(message))) { - const error: ErrorLike = { name: "FailedDispatch", message: "Either the target's send method was undefined or the act of sending failed." }; + const error: ErrorLike = { name: 'FailedDispatch', message: "Either the target's send method was undefined or the act of sending failed." }; resolve({ error }); - this.target.removeListener("message", responseHandler); + this.target.removeListener('message', responseHandler); } }); - } + }; /** * Invoked from either the parent or the child process, this allows @@ -122,7 +127,7 @@ export class PromisifiedIPCManager { } resolve(); }); - } + }; /** * Dispatches the dummy responses and sets the isDestroyed flag to true. @@ -131,12 +136,12 @@ export class PromisifiedIPCManager { const { pendingMessages } = this; this.isDestroyed = true; Object.keys(pendingMessages).forEach(id => { - const error: ErrorLike = { name: "ManagerDestroyed", message: "The IPC manager was destroyed before the response could be returned." }; + const error: ErrorLike = { name: 'ManagerDestroyed', message: 'The IPC manager was destroyed before the response could be returned.' }; const message: InternalMessage = { name: pendingMessages[id], args: { error }, metadata: { id, isResponse: true } }; this.target.send?.(message); }); this.pendingMessages = {}; - } + }; /** * This routine receives a uniquely identified message. If the message is itself a response, @@ -145,29 +150,30 @@ export class PromisifiedIPCManager { * which will ultimately invoke the responseHandler of the original emission and resolve the * sender's promise. */ - private generateInternalHandler = (handlers: HandlerMap): MessageHandler => async (message: InternalMessage) => { - const { name, args, metadata } = message; - if (name && metadata && !metadata.isResponse) { - const { id } = metadata; - this.pendingMessages[id] = name; - let error: Error | undefined; - let results: any[] | undefined; - try { - const registered = handlers[name]; - if (registered) { - results = await Promise.all(registered.map(handler => handler(args))); + private generateInternalHandler = + (handlers: HandlerMap): MessageHandler => + async (message: InternalMessage) => { + const { name, args, metadata } = message; + if (name && metadata && !metadata.isResponse) { + const { id } = metadata; + this.pendingMessages[id] = name; + let error: Error | undefined; + let results: any[] | undefined; + try { + const registered = handlers[name]; + if (registered) { + results = await Promise.all(registered.map(handler => handler(args))); + } + } catch (e: any) { + error = e; + } + if (!this.isDestroyed && this.target.send) { + const metadata = { id, isResponse: true }; + const response: Response = { results, error }; + const message = { name, args: response, metadata }; + delete this.pendingMessages[id]; + this.target.send(message); } - } catch (e: any) { - error = e; - } - if (!this.isDestroyed && this.target.send) { - const metadata = { id, isResponse: true }; - const response: Response = { results, error }; - const message = { name, args: response, metadata }; - delete this.pendingMessages[id]; - this.target.send(message); } - } - } - -}
\ No newline at end of file + }; +} diff --git a/src/server/DashSession/Session/agents/server_worker.ts b/src/server/DashSession/Session/agents/server_worker.ts index 634b0113d..d8b3ee80b 100644 --- a/src/server/DashSession/Session/agents/server_worker.ts +++ b/src/server/DashSession/Session/agents/server_worker.ts @@ -1,4 +1,4 @@ -import { isMaster } from "cluster"; +import cluster from "cluster"; import { green, red, white, yellow } from "colors"; import { get } from "request-promise"; import { ExitHandler } from "./applied_session_agent"; @@ -22,7 +22,7 @@ export class ServerWorker extends IPCMessageReceiver { private serverPort: number; private isInitialized = false; public static Create(work: Function) { - if (isMaster) { + if (cluster.isPrimary) { console.error(red("cannot create a worker on the monitor process.")); process.exit(1); } else if (++ServerWorker.count > 1) { |