aboutsummaryrefslogtreecommitdiff
path: root/src/server/DashSession
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/DashSession')
-rw-r--r--src/server/DashSession/DashSessionAgent.ts136
-rw-r--r--src/server/DashSession/Session/agents/applied_session_agent.ts9
-rw-r--r--src/server/DashSession/Session/agents/monitor.ts156
-rw-r--r--src/server/DashSession/Session/agents/promisified_ipc_manager.ts96
-rw-r--r--src/server/DashSession/Session/agents/server_worker.ts4
5 files changed, 205 insertions, 196 deletions
diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts
index 1a5934d8f..1ef7a131d 100644
--- a/src/server/DashSession/DashSessionAgent.ts
+++ b/src/server/DashSession/DashSessionAgent.ts
@@ -1,18 +1,18 @@
-import { Email, pathFromRoot } from "../ActionUtilities";
-import { red, yellow, green, cyan } from "colors";
-import { get } from "request-promise";
-import { Utils } from "../../Utils";
-import { WebSocket } from "../websocket";
-import { MessageStore } from "../Message";
-import { launchServer, onWindows } from "..";
-import { readdirSync, statSync, createWriteStream, readFileSync, unlinkSync } from "fs";
-import * as Archiver from "archiver";
-import { resolve } from "path";
-import rimraf = require("rimraf");
-import { AppliedSessionAgent, ExitHandler } from "./Session/agents/applied_session_agent";
-import { ServerWorker } from "./Session/agents/server_worker";
-import { Monitor } from "./Session/agents/monitor";
-import { MessageHandler, ErrorLike } from "./Session/agents/promisified_ipc_manager";
+import { Email, pathFromRoot } from '../ActionUtilities';
+import { red, yellow, green, cyan } from 'colors';
+import { get } from 'request-promise';
+import { Utils } from '../../Utils';
+import { WebSocket } from '../websocket';
+import { MessageStore } from '../Message';
+import { launchServer, onWindows } from '..';
+import { readdirSync, statSync, createWriteStream, readFileSync, unlinkSync } from 'fs';
+import * as Archiver from 'archiver';
+import { resolve } from 'path';
+import { rimraf } from 'rimraf';
+import { AppliedSessionAgent, ExitHandler } from './Session/agents/applied_session_agent';
+import { ServerWorker } from './Session/agents/server_worker';
+import { Monitor } from './Session/agents/monitor';
+import { MessageHandler, ErrorLike } from './Session/agents/promisified_ipc_manager';
/**
* If we're the monitor (master) thread, we should launch the monitor logic for the session.
@@ -20,9 +20,8 @@ import { MessageHandler, ErrorLike } from "./Session/agents/promisified_ipc_mana
* our job should be to run the server.
*/
export class DashSessionAgent extends AppliedSessionAgent {
-
- private readonly signature = "-Dash Server Session Manager";
- private readonly releaseDesktop = pathFromRoot("../../Desktop");
+ private readonly signature = '-Dash Server Session Manager';
+ private readonly releaseDesktop = pathFromRoot('../../Desktop');
/**
* The core method invoked when the single master thread is initialized.
@@ -31,13 +30,13 @@ export class DashSessionAgent extends AppliedSessionAgent {
protected async initializeMonitor(monitor: Monitor): Promise<string> {
const sessionKey = Utils.GenerateGuid();
await this.dispatchSessionPassword(sessionKey);
- monitor.addReplCommand("pull", [], () => monitor.exec("git pull"));
- monitor.addReplCommand("solr", [/start|stop|index/], this.executeSolrCommand);
- monitor.addReplCommand("backup", [], this.backup);
- monitor.addReplCommand("debug", [/\S+\@\S+/], async ([to]) => this.dispatchZippedDebugBackup(to));
- monitor.on("backup", this.backup);
- monitor.on("debug", async ({ to }) => this.dispatchZippedDebugBackup(to));
- monitor.on("delete", WebSocket.doDelete);
+ monitor.addReplCommand('pull', [], () => monitor.exec('git pull'));
+ monitor.addReplCommand('solr', [/start|stop|index/], this.executeSolrCommand);
+ monitor.addReplCommand('backup', [], this.backup);
+ monitor.addReplCommand('debug', [/\S+\@\S+/], async ([to]) => this.dispatchZippedDebugBackup(to));
+ monitor.on('backup', this.backup);
+ monitor.on('debug', async ({ to }) => this.dispatchZippedDebugBackup(to));
+ monitor.on('delete', WebSocket.doDelete);
monitor.coreHooks.onCrashDetected(this.dispatchCrashReport);
return sessionKey;
}
@@ -58,13 +57,13 @@ export class DashSessionAgent extends AppliedSessionAgent {
private _remoteDebugInstructions: string | undefined;
private generateDebugInstructions = (zipName: string, target: string): string => {
if (!this._remoteDebugInstructions) {
- this._remoteDebugInstructions = readFileSync(resolve(__dirname, "./templates/remote_debug_instructions.txt"), { encoding: "utf8" });
+ this._remoteDebugInstructions = readFileSync(resolve(__dirname, './templates/remote_debug_instructions.txt'), { encoding: 'utf8' });
}
return this._remoteDebugInstructions
.replace(/__zipname__/, zipName)
.replace(/__target__/, target)
.replace(/__signature__/, this.signature);
- }
+ };
/**
* Prepares the body of the email with information regarding a crash event.
@@ -72,12 +71,12 @@ export class DashSessionAgent extends AppliedSessionAgent {
private _crashInstructions: string | undefined;
private generateCrashInstructions({ name, message, stack }: ErrorLike): string {
if (!this._crashInstructions) {
- this._crashInstructions = readFileSync(resolve(__dirname, "./templates/crash_instructions.txt"), { encoding: "utf8" });
+ this._crashInstructions = readFileSync(resolve(__dirname, './templates/crash_instructions.txt'), { encoding: 'utf8' });
}
return this._crashInstructions
- .replace(/__name__/, name || "[no error name found]")
- .replace(/__message__/, message || "[no error message found]")
- .replace(/__stack__/, stack || "[no error stack found]")
+ .replace(/__name__/, name || '[no error name found]')
+ .replace(/__message__/, message || '[no error message found]')
+ .replace(/__stack__/, stack || '[no error stack found]')
.replace(/__signature__/, this.signature);
}
@@ -88,23 +87,19 @@ export class DashSessionAgent extends AppliedSessionAgent {
private dispatchSessionPassword = async (sessionKey: string): Promise<void> => {
const { mainLog } = this.sessionMonitor;
const { notificationRecipient } = DashSessionAgent;
- mainLog(green("dispatching session key..."));
+ mainLog(green('dispatching session key...'));
const error = await Email.dispatch({
to: notificationRecipient,
- subject: "Dash Release Session Admin Authentication Key",
- content: [
- `Here's the key for this session (started @ ${new Date().toUTCString()}):`,
- sessionKey,
- this.signature
- ].join("\n\n")
+ subject: 'Dash Release Session Admin Authentication Key',
+ content: [`Here's the key for this session (started @ ${new Date().toUTCString()}):`, sessionKey, this.signature].join('\n\n'),
});
if (error) {
this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`));
- mainLog(red("distribution of session key experienced errors"));
+ mainLog(red('distribution of session key experienced errors'));
} else {
- mainLog(green("successfully distributed session key to recipients"));
+ mainLog(green('successfully distributed session key to recipients'));
}
- }
+ };
/**
* This sends an email with the generated crash report.
@@ -114,37 +109,37 @@ export class DashSessionAgent extends AppliedSessionAgent {
const { notificationRecipient } = DashSessionAgent;
const error = await Email.dispatch({
to: notificationRecipient,
- subject: "Dash Web Server Crash",
- content: this.generateCrashInstructions(crashCause)
+ subject: 'Dash Web Server Crash',
+ content: this.generateCrashInstructions(crashCause),
});
if (error) {
this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} ${yellow(`(${error.message})`)}`));
- mainLog(red("distribution of crash notification experienced errors"));
+ mainLog(red('distribution of crash notification experienced errors'));
} else {
- mainLog(green("successfully distributed crash notification to recipients"));
+ mainLog(green('successfully distributed crash notification to recipients'));
}
- }
+ };
/**
- * Logic for interfacing with Solr. Either starts it,
+ * Logic for interfacing with Solr. Either starts it,
* stops it, or rebuilds its indices.
*/
private executeSolrCommand = async (args: string[]): Promise<void> => {
const { exec, mainLog } = this.sessionMonitor;
const action = args[0];
- if (action === "index") {
- exec("npx ts-node ./updateSearch.ts", { cwd: pathFromRoot("./src/server") });
+ if (action === 'index') {
+ exec('npx ts-node ./updateSearch.ts', { cwd: pathFromRoot('./src/server') });
} else {
- const command = `${onWindows ? "solr.cmd" : "solr"} ${args[0] === "start" ? "start" : "stop -p 8983"}`;
- await exec(command, { cwd: "./solr-8.3.1/bin" });
+ const command = `${onWindows ? 'solr.cmd' : 'solr'} ${args[0] === 'start' ? 'start' : 'stop -p 8983'}`;
+ await exec(command, { cwd: './solr-8.3.1/bin' });
try {
- await get("http://localhost:8983");
- mainLog(green("successfully connected to 8983 after running solr initialization"));
+ await get('http://localhost:8983');
+ mainLog(green('successfully connected to 8983 after running solr initialization'));
} catch {
- mainLog(red("unable to connect at 8983 after running solr initialization"));
+ mainLog(red('unable to connect at 8983 after running solr initialization'));
}
}
- }
+ };
/**
* Broadcast to all clients that their connection
@@ -153,16 +148,16 @@ export class DashSessionAgent extends AppliedSessionAgent {
private notifyClient: ExitHandler = reason => {
const { _socket } = WebSocket;
if (_socket) {
- const message = typeof reason === "boolean" ? (reason ? "exit" : "temporary") : "crash";
+ const message = typeof reason === 'boolean' ? (reason ? 'exit' : 'temporary') : 'crash';
Utils.Emit(_socket, MessageStore.ConnectionTerminated, message);
}
- }
+ };
/**
* Performs a backup of the database, saved to the desktop subdirectory.
* This should work as is only on our specific release server.
*/
- private backup = async (): Promise<void> => this.sessionMonitor.exec("backup.bat", { cwd: this.releaseDesktop });
+ private backup = async (): Promise<void> => this.sessionMonitor.exec('backup.bat', { cwd: this.releaseDesktop });
/**
* Compress either a brand new backup or the most recent backup and send it
@@ -175,15 +170,17 @@ export class DashSessionAgent extends AppliedSessionAgent {
try {
// if desired, complete an immediate backup to send
await this.backup();
- mainLog("backup complete");
+ mainLog('backup complete');
const backupsDirectory = `${this.releaseDesktop}/backups`;
// sort all backups by their modified time, and choose the most recent one
- const target = readdirSync(backupsDirectory).map(filename => ({
- modifiedTime: statSync(`${backupsDirectory}/${filename}`).mtimeMs,
- filename
- })).sort((a, b) => b.modifiedTime - a.modifiedTime)[0].filename;
+ const target = readdirSync(backupsDirectory)
+ .map(filename => ({
+ modifiedTime: statSync(`${backupsDirectory}/${filename}`).mtimeMs,
+ filename,
+ }))
+ .sort((a, b) => b.modifiedTime - a.modifiedTime)[0].filename;
mainLog(`targeting ${target}...`);
// create a zip file and to it, write the contents of the backup directory
@@ -202,28 +199,25 @@ export class DashSessionAgent extends AppliedSessionAgent {
to,
subject: `Remote debug: compressed backup of ${target}...`,
content: this.generateDebugInstructions(zipName, target),
- attachments: [{ filename: zipName, path: zipPath }]
+ attachments: [{ filename: zipName, path: zipPath }],
});
- // since this is intended to be a zero-footprint operation, clean up
+ // since this is intended to be a zero-footprint operation, clean up
// by unlinking both the backup generated earlier in the function and the compressed zip file.
// to generate a persistent backup, just run backup.
unlinkSync(zipPath);
rimraf.sync(targetPath);
// indicate success or failure
- mainLog(`${error === null ? green("successfully dispatched") : red("failed to dispatch")} ${zipName} to ${cyan(to)}`);
+ mainLog(`${error === null ? green('successfully dispatched') : red('failed to dispatch')} ${zipName} to ${cyan(to)}`);
error && mainLog(red(error.message));
} catch (error: any) {
- mainLog(red("unable to dispatch zipped backup..."));
+ mainLog(red('unable to dispatch zipped backup...'));
mainLog(red(error.message));
}
}
-
}
export namespace DashSessionAgent {
-
- export const notificationRecipient = "browndashptc@gmail.com";
-
+ export const notificationRecipient = 'browndashptc@gmail.com';
}
diff --git a/src/server/DashSession/Session/agents/applied_session_agent.ts b/src/server/DashSession/Session/agents/applied_session_agent.ts
index 8339a06dc..2037e93e5 100644
--- a/src/server/DashSession/Session/agents/applied_session_agent.ts
+++ b/src/server/DashSession/Session/agents/applied_session_agent.ts
@@ -1,7 +1,8 @@
-import { isMaster } from "cluster";
+import * as _cluster from "cluster";
import { Monitor } from "./monitor";
import { ServerWorker } from "./server_worker";
-import { Utilities } from "../utilities/utilities";
+const cluster = _cluster as any;
+const isMaster = cluster.isPrimary;
export type ExitHandler = (reason: Error | boolean) => void | Promise<void>;
@@ -15,13 +16,13 @@ export abstract class AppliedSessionAgent {
private launched = false;
public killSession = (reason: string, graceful = true, errorCode = 0) => {
- const target = isMaster ? this.sessionMonitor : this.serverWorker;
+ const target = cluster.default.isPrimary ? this.sessionMonitor : this.serverWorker;
target.killSession(reason, graceful, errorCode);
}
private sessionMonitorRef: Monitor | undefined;
public get sessionMonitor(): Monitor {
- if (!isMaster) {
+ if (!cluster.default.isPrimary) {
this.serverWorker.emit("kill", {
graceful: false,
reason: "Cannot access the session monitor directly from the server worker thread.",
diff --git a/src/server/DashSession/Session/agents/monitor.ts b/src/server/DashSession/Session/agents/monitor.ts
index 9cb5ab576..a6fde4356 100644
--- a/src/server/DashSession/Session/agents/monitor.ts
+++ b/src/server/DashSession/Session/agents/monitor.ts
@@ -1,15 +1,21 @@
-import { ExitHandler } from "./applied_session_agent";
-import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config";
-import Repl, { ReplAction } from "../utilities/repl";
-import { isWorker, setupMaster, on, Worker, fork } from "cluster";
-import { manage, MessageHandler, ErrorLike } from "./promisified_ipc_manager";
-import { red, cyan, white, yellow, blue } from "colors";
-import { exec, ExecOptions } from "child_process";
-import { validate, ValidationError } from "jsonschema";
-import { Utilities } from "../utilities/utilities";
-import { readFileSync } from "fs";
-import IPCMessageReceiver from "./process_message_router";
-import { ServerWorker } from "./server_worker";
+import { ExitHandler } from './applied_session_agent';
+import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from '../utilities/session_config';
+import Repl, { ReplAction } from '../utilities/repl';
+import * as _cluster from 'cluster';
+import { Worker } from 'cluster';
+import { manage, MessageHandler, ErrorLike } from './promisified_ipc_manager';
+import { red, cyan, white, yellow, blue } from 'colors';
+import { exec, ExecOptions } from 'child_process';
+import { validate, ValidationError } from 'jsonschema';
+import { Utilities } from '../utilities/utilities';
+import { readFileSync } from 'fs';
+import IPCMessageReceiver from './process_message_router';
+import { ServerWorker } from './server_worker';
+const cluster = _cluster as any;
+const isWorker = cluster.isWorker;
+const setupMaster = cluster.setupPrimary;
+const on = cluster.on;
+const fork = cluster.fork;
/**
* Validates and reads the configuration file, accordingly builds a child process factory
@@ -26,14 +32,14 @@ export class Monitor extends IPCMessageReceiver {
public static Create() {
if (isWorker) {
- ServerWorker.IPCManager.emit("kill", {
- reason: "cannot create a monitor on the worker process.",
+ ServerWorker.IPCManager.emit('kill', {
+ reason: 'cannot create a monitor on the worker process.',
graceful: false,
- errorCode: 1
+ errorCode: 1,
});
process.exit(1);
} else if (++Monitor.count > 1) {
- console.error(red("cannot create more than one monitor."));
+ console.error(red('cannot create more than one monitor.'));
process.exit(1);
} else {
return new Monitor();
@@ -42,7 +48,7 @@ export class Monitor extends IPCMessageReceiver {
private constructor() {
super();
- console.log(this.timestamp(), cyan("initializing session..."));
+ console.log(this.timestamp(), cyan('initializing session...'));
this.configureInternalHandlers();
this.config = this.loadAndValidateConfiguration();
this.initializeClusterFunctions();
@@ -53,8 +59,8 @@ export class Monitor extends IPCMessageReceiver {
// handle exceptions in the master thread - there shouldn't be many of these
// the IPC (inter process communication) channel closed exception can't seem
// to be caught in a try catch, and is inconsequential, so it is ignored
- process.on("uncaughtException", ({ message, stack }): void => {
- if (message !== "Channel closed") {
+ process.on('uncaughtException', ({ message, stack }): void => {
+ if (message !== 'Channel closed') {
this.mainLog(red(message));
if (stack) {
this.mainLog(`uncaught exception\n${red(stack)}`);
@@ -62,36 +68,36 @@ export class Monitor extends IPCMessageReceiver {
}
});
- this.on("kill", ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode));
- this.on("lifecycle", ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`));
- }
+ this.on('kill', ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode));
+ this.on('lifecycle', ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`));
+ };
private initializeClusterFunctions = () => {
// determines whether or not we see the compilation / initialization / runtime output of each child server process
- const output = this.config.showServerOutput ? "inherit" : "ignore";
- setupMaster({ stdio: ["ignore", output, output, "ipc"] });
+ const output = this.config.showServerOutput ? 'inherit' : 'ignore';
+ setupMaster({ stdio: ['ignore', output, output, 'ipc'] });
// a helpful cluster event called on the master thread each time a child process exits
- on("exit", ({ process: { pid } }, code, signal) => {
- const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`;
+ on('exit', ({ process: { pid } }: { process: { pid: any } }, code: any, signal: any) => {
+ const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? '' : `, having encountered signal ${signal}`}.`;
this.mainLog(cyan(prompt));
// to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one
this.spawn();
});
- }
+ };
public finalize = (sessionKey: string): void => {
if (this.finalized) {
- throw new Error("Session monitor is already finalized");
+ throw new Error('Session monitor is already finalized');
}
this.finalized = true;
this.key = sessionKey;
this.spawn();
- }
+ };
public readonly coreHooks = Object.freeze({
onCrashDetected: (listener: MessageHandler<{ error: ErrorLike }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener),
- onServerRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener)
+ onServerRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener),
});
/**
@@ -101,12 +107,12 @@ export class Monitor extends IPCMessageReceiver {
* requests to complete) or immediately.
*/
public killSession = async (reason: string, graceful = true, errorCode = 0) => {
- this.mainLog(cyan(`exiting session ${graceful ? "clean" : "immediate"}ly`));
- this.mainLog(`session exit reason: ${(red(reason))}`);
+ this.mainLog(cyan(`exiting session ${graceful ? 'clean' : 'immediate'}ly`));
+ this.mainLog(`session exit reason: ${red(reason)}`);
await this.executeExitHandlers(true);
await this.killActiveWorker(graceful, true);
process.exit(errorCode);
- }
+ };
/**
* Execute the list of functions registered to be called
@@ -120,27 +126,27 @@ export class Monitor extends IPCMessageReceiver {
*/
public addReplCommand = (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => {
this.repl.registerCommand(basename, argPatterns, action);
- }
+ };
public exec = (command: string, options?: ExecOptions) => {
return new Promise<void>(resolve => {
- exec(command, { ...options, encoding: "utf8" }, (error, stdout, stderr) => {
+ exec(command, { ...options, encoding: 'utf8' }, (error, stdout, stderr) => {
if (error) {
this.execLog(red(`unable to execute ${white(command)}`));
- error.message.split("\n").forEach(line => line.length && this.execLog(red(`(error) ${line}`)));
+ error.message.split('\n').forEach(line => line.length && this.execLog(red(`(error) ${line}`)));
} else {
let outLines: string[], errorLines: string[];
- if ((outLines = stdout.split("\n").filter(line => line.length)).length) {
+ if ((outLines = stdout.split('\n').filter(line => line.length)).length) {
outLines.forEach(line => line.length && this.execLog(cyan(`(stdout) ${line}`)));
}
- if ((errorLines = stderr.split("\n").filter(line => line.length)).length) {
+ if ((errorLines = stderr.split('\n').filter(line => line.length)).length) {
errorLines.forEach(line => line.length && this.execLog(yellow(`(stderr) ${line}`)));
}
}
resolve();
});
});
- }
+ };
/**
* Generates a blue UTC string associated with the time
@@ -153,14 +159,14 @@ export class Monitor extends IPCMessageReceiver {
*/
public mainLog = (...optionalParams: any[]) => {
console.log(this.timestamp(), this.config.identifiers.master.text, ...optionalParams);
- }
+ };
/**
* A formatted, identified and timestamped log in color for non-
*/
private execLog = (...optionalParams: any[]) => {
console.log(this.timestamp(), this.config.identifiers.exec.text, ...optionalParams);
- }
+ };
/**
* Reads in configuration .json file only once, in the master thread
@@ -169,28 +175,28 @@ export class Monitor extends IPCMessageReceiver {
private loadAndValidateConfiguration = (): Configuration => {
let config: Configuration | undefined;
try {
- console.log(this.timestamp(), cyan("validating configuration..."));
+ console.log(this.timestamp(), cyan('validating configuration...'));
config = JSON.parse(readFileSync('./session.config.json', 'utf8'));
const options = {
throwError: true,
- allowUnknownAttributes: false
+ allowUnknownAttributes: false,
};
// ensure all necessary and no excess information is specified by the configuration file
validate(config, configurationSchema, options);
config = Utilities.preciseAssign({}, defaultConfig, config);
} catch (error: any) {
if (error instanceof ValidationError) {
- console.log(red("\nSession configuration failed."));
- console.log("The given session.config.json configuration file is invalid.");
+ console.log(red('\nSession configuration failed.'));
+ console.log('The given session.config.json configuration file is invalid.');
console.log(`${error.instance}: ${error.stack}`);
process.exit(0);
- } else if (error.code === "ENOENT" && error.path === "./session.config.json") {
- console.log(cyan("Loading default session parameters..."));
- console.log("Consider including a session.config.json configuration file in your project root for customization.");
+ } else if (error.code === 'ENOENT' && error.path === './session.config.json') {
+ console.log(cyan('Loading default session parameters...'));
+ console.log('Consider including a session.config.json configuration file in your project root for customization.');
config = Utilities.preciseAssign({}, defaultConfig);
} else {
- console.log(red("\nSession configuration failed."));
- console.log("The following unknown error occurred during configuration.");
+ console.log(red('\nSession configuration failed.'));
+ console.log('The following unknown error occurred during configuration.');
console.log(error.stack);
process.exit(0);
}
@@ -203,7 +209,7 @@ export class Monitor extends IPCMessageReceiver {
});
return config!;
}
- }
+ };
/**
* Builds the repl that allows the following commands to be typed into stdin of the master thread.
@@ -213,24 +219,24 @@ export class Monitor extends IPCMessageReceiver {
const boolean = /true|false/;
const number = /\d+/;
const letters = /[a-zA-Z]+/;
- repl.registerCommand("exit", [/clean|force/], args => this.killSession("manual exit requested by repl", args[0] === "clean", 0));
- repl.registerCommand("restart", [/clean|force/], args => this.killActiveWorker(args[0] === "clean"));
- repl.registerCommand("set", [letters, "port", number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === "true"));
- repl.registerCommand("set", [/polling/, number, boolean], args => {
+ repl.registerCommand('exit', [/clean|force/], args => this.killSession('manual exit requested by repl', args[0] === 'clean', 0));
+ repl.registerCommand('restart', [/clean|force/], args => this.killActiveWorker(args[0] === 'clean'));
+ repl.registerCommand('set', [letters, 'port', number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === 'true'));
+ repl.registerCommand('set', [/polling/, number, boolean], args => {
const newPollingIntervalSeconds = Math.floor(Number(args[1]));
if (newPollingIntervalSeconds < 0) {
- this.mainLog(red("the polling interval must be a non-negative integer"));
+ this.mainLog(red('the polling interval must be a non-negative integer'));
} else {
if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) {
this.config.polling.intervalSeconds = newPollingIntervalSeconds;
- if (args[2] === "true") {
- Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds });
+ if (args[2] === 'true') {
+ Monitor.IPCManager.emit('updatePollingInterval', { newPollingIntervalSeconds });
}
}
}
});
return repl;
- }
+ };
private executeExitHandlers = async (reason: Error | boolean) => Promise.all(this.exitHandlers.map(handler => handler(reason)));
@@ -240,13 +246,13 @@ export class Monitor extends IPCMessageReceiver {
private killActiveWorker = async (graceful = true, isSessionEnd = false): Promise<void> => {
if (this.activeWorker && !this.activeWorker.isDead()) {
if (graceful) {
- Monitor.IPCManager.emit("manualExit", { isSessionEnd });
+ Monitor.IPCManager.emit('manualExit', { isSessionEnd });
} else {
await ServerWorker.IPCManager.destroy();
this.activeWorker.process.kill();
}
}
- }
+ };
/**
* Allows the caller to set the port at which the target (be it the server,
@@ -255,7 +261,7 @@ export class Monitor extends IPCMessageReceiver {
* at the port. Otherwise, the updated port won't be used until / unless the child
* dies on its own and triggers a restart.
*/
- private setPort = (port: "server" | "socket" | string, value: number, immediateRestart: boolean): void => {
+ private setPort = (port: 'server' | 'socket' | string, value: number, immediateRestart: boolean): void => {
if (value > 1023 && value < 65536) {
this.config.ports[port] = value;
if (immediateRestart) {
@@ -264,7 +270,7 @@ export class Monitor extends IPCMessageReceiver {
} else {
this.mainLog(red(`${port} is an invalid port number`));
}
- }
+ };
/**
* Kills the current active worker and proceeds to spawn a new worker,
@@ -272,27 +278,29 @@ export class Monitor extends IPCMessageReceiver {
*/
private spawn = async (): Promise<void> => {
await this.killActiveWorker();
- const { config: { polling, ports }, key } = this;
+ const {
+ config: { polling, ports },
+ key,
+ } = this;
this.activeWorker = fork({
pollingRoute: polling.route,
pollingFailureTolerance: polling.failureTolerance,
serverPort: ports.server,
socketPort: ports.socket,
pollingIntervalSeconds: polling.intervalSeconds,
- session_key: key
+ session_key: key,
});
- Monitor.IPCManager = manage(this.activeWorker.process, this.handlers);
+ if (this.activeWorker) {
+ Monitor.IPCManager = manage(this.activeWorker.process, this.handlers);
+ }
this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`));
- }
-
+ };
}
export namespace Monitor {
-
export enum IntrinsicEvents {
- KeyGenerated = "key_generated",
- CrashDetected = "crash_detected",
- ServerRunning = "server_running"
+ KeyGenerated = 'key_generated',
+ CrashDetected = 'crash_detected',
+ ServerRunning = 'server_running',
}
-
-} \ No newline at end of file
+}
diff --git a/src/server/DashSession/Session/agents/promisified_ipc_manager.ts b/src/server/DashSession/Session/agents/promisified_ipc_manager.ts
index f6c8de521..76e218977 100644
--- a/src/server/DashSession/Session/agents/promisified_ipc_manager.ts
+++ b/src/server/DashSession/Session/agents/promisified_ipc_manager.ts
@@ -1,9 +1,9 @@
-import { Utilities } from "../utilities/utilities";
-import { ChildProcess } from "child_process";
+import { Utilities } from '../utilities/utilities';
+import { ChildProcess } from 'child_process';
/**
* Convenience constructor
- * @param target the process / worker to which to attach the specialized listeners
+ * @param target the process / worker to which to attach the specialized listeners
*/
export function manage(target: IPCTarget, handlers?: HandlerMap) {
return new PromisifiedIPCManager(target, handlers);
@@ -18,26 +18,30 @@ export type HandlerMap = { [name: string]: MessageHandler[] };
/**
* This will always literally be a child process. But, though setting
* up a manager in the parent will indeed see the target as the ChildProcess,
- * setting up a manager in the child will just see itself as a regular NodeJS.Process.
+ * setting up a manager in the child will just see itself as a regular NodeJS.Process.
*/
export type IPCTarget = NodeJS.Process | ChildProcess;
/**
- * Specifies a general message format for this API
+ * Specifies a general message format for this API
*/
export type Message<T = any> = {
name: string;
args?: T;
};
-export type MessageHandler<T = any> = (args: T) => (any | Promise<any>);
+export type MessageHandler<T = any> = (args: T) => any | Promise<any>;
/**
* When a message is emitted, it is embedded with private metadata
* to facilitate the resolution of promises, etc.
*/
-interface InternalMessage extends Message { metadata: Metadata; }
-interface Metadata { isResponse: boolean; id: string; }
-type InternalMessageHandler = (message: InternalMessage) => (any | Promise<any>);
+interface InternalMessage extends Message {
+ metadata: Metadata;
+}
+interface Metadata {
+ isResponse: boolean;
+ id: string;
+}
/**
* Allows for the transmission of the error's key features over IPC.
@@ -56,12 +60,12 @@ export interface Response<T = any> {
error?: ErrorLike;
}
-const destroyEvent = "__destroy__";
+const destroyEvent = '__destroy__';
/**
* This is a wrapper utility class that allows the caller process
* to emit an event and return a promise that resolves when it and all
- * other processes listening to its emission of this event have completed.
+ * other processes listening to its emission of this event have completed.
*/
export class PromisifiedIPCManager {
private readonly target: IPCTarget;
@@ -75,7 +79,7 @@ export class PromisifiedIPCManager {
this.target = target;
if (handlers) {
handlers[destroyEvent] = [this.destroyHelper];
- this.target.addListener("message", this.generateInternalHandler(handlers));
+ this.target.addListener('message', this.generateInternalHandler(handlers));
}
}
@@ -86,26 +90,27 @@ export class PromisifiedIPCManager {
*/
public emit = async <T = any>(name: string, args?: any): Promise<Response<T>> => {
if (this.isDestroyed) {
- const error = { name: "FailedDispatch", message: "Cannot use a destroyed IPC manager to emit a message." };
+ const error = { name: 'FailedDispatch', message: 'Cannot use a destroyed IPC manager to emit a message.' };
return { error };
}
return new Promise<Response<T>>(resolve => {
const messageId = Utilities.guid();
+ type InternalMessageHandler = (message: any /* MessageListener*/) => any | Promise<any>;
const responseHandler: InternalMessageHandler = ({ metadata: { id, isResponse }, args }) => {
if (isResponse && id === messageId) {
- this.target.removeListener("message", responseHandler);
+ this.target.removeListener('message', responseHandler);
resolve(args);
}
};
- this.target.addListener("message", responseHandler);
+ this.target.addListener('message', responseHandler);
const message = { name, args, metadata: { id: messageId, isResponse: false } };
if (!(this.target.send && this.target.send(message))) {
- const error: ErrorLike = { name: "FailedDispatch", message: "Either the target's send method was undefined or the act of sending failed." };
+ const error: ErrorLike = { name: 'FailedDispatch', message: "Either the target's send method was undefined or the act of sending failed." };
resolve({ error });
- this.target.removeListener("message", responseHandler);
+ this.target.removeListener('message', responseHandler);
}
});
- }
+ };
/**
* Invoked from either the parent or the child process, this allows
@@ -122,7 +127,7 @@ export class PromisifiedIPCManager {
}
resolve();
});
- }
+ };
/**
* Dispatches the dummy responses and sets the isDestroyed flag to true.
@@ -131,12 +136,12 @@ export class PromisifiedIPCManager {
const { pendingMessages } = this;
this.isDestroyed = true;
Object.keys(pendingMessages).forEach(id => {
- const error: ErrorLike = { name: "ManagerDestroyed", message: "The IPC manager was destroyed before the response could be returned." };
+ const error: ErrorLike = { name: 'ManagerDestroyed', message: 'The IPC manager was destroyed before the response could be returned.' };
const message: InternalMessage = { name: pendingMessages[id], args: { error }, metadata: { id, isResponse: true } };
this.target.send?.(message);
});
this.pendingMessages = {};
- }
+ };
/**
* This routine receives a uniquely identified message. If the message is itself a response,
@@ -145,29 +150,30 @@ export class PromisifiedIPCManager {
* which will ultimately invoke the responseHandler of the original emission and resolve the
* sender's promise.
*/
- private generateInternalHandler = (handlers: HandlerMap): MessageHandler => async (message: InternalMessage) => {
- const { name, args, metadata } = message;
- if (name && metadata && !metadata.isResponse) {
- const { id } = metadata;
- this.pendingMessages[id] = name;
- let error: Error | undefined;
- let results: any[] | undefined;
- try {
- const registered = handlers[name];
- if (registered) {
- results = await Promise.all(registered.map(handler => handler(args)));
+ private generateInternalHandler =
+ (handlers: HandlerMap): MessageHandler =>
+ async (message: InternalMessage) => {
+ const { name, args, metadata } = message;
+ if (name && metadata && !metadata.isResponse) {
+ const { id } = metadata;
+ this.pendingMessages[id] = name;
+ let error: Error | undefined;
+ let results: any[] | undefined;
+ try {
+ const registered = handlers[name];
+ if (registered) {
+ results = await Promise.all(registered.map(handler => handler(args)));
+ }
+ } catch (e: any) {
+ error = e;
+ }
+ if (!this.isDestroyed && this.target.send) {
+ const metadata = { id, isResponse: true };
+ const response: Response = { results, error };
+ const message = { name, args: response, metadata };
+ delete this.pendingMessages[id];
+ this.target.send(message);
}
- } catch (e: any) {
- error = e;
- }
- if (!this.isDestroyed && this.target.send) {
- const metadata = { id, isResponse: true };
- const response: Response = { results, error };
- const message = { name, args: response, metadata };
- delete this.pendingMessages[id];
- this.target.send(message);
}
- }
- }
-
-} \ No newline at end of file
+ };
+}
diff --git a/src/server/DashSession/Session/agents/server_worker.ts b/src/server/DashSession/Session/agents/server_worker.ts
index 634b0113d..d8b3ee80b 100644
--- a/src/server/DashSession/Session/agents/server_worker.ts
+++ b/src/server/DashSession/Session/agents/server_worker.ts
@@ -1,4 +1,4 @@
-import { isMaster } from "cluster";
+import cluster from "cluster";
import { green, red, white, yellow } from "colors";
import { get } from "request-promise";
import { ExitHandler } from "./applied_session_agent";
@@ -22,7 +22,7 @@ export class ServerWorker extends IPCMessageReceiver {
private serverPort: number;
private isInitialized = false;
public static Create(work: Function) {
- if (isMaster) {
+ if (cluster.isPrimary) {
console.error(red("cannot create a worker on the monitor process."));
process.exit(1);
} else if (++ServerWorker.count > 1) {