aboutsummaryrefslogtreecommitdiff
path: root/src/server/DashSession/Session/agents/monitor.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/DashSession/Session/agents/monitor.ts')
-rw-r--r--src/server/DashSession/Session/agents/monitor.ts156
1 files changed, 82 insertions, 74 deletions
diff --git a/src/server/DashSession/Session/agents/monitor.ts b/src/server/DashSession/Session/agents/monitor.ts
index 9cb5ab576..a6fde4356 100644
--- a/src/server/DashSession/Session/agents/monitor.ts
+++ b/src/server/DashSession/Session/agents/monitor.ts
@@ -1,15 +1,21 @@
-import { ExitHandler } from "./applied_session_agent";
-import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config";
-import Repl, { ReplAction } from "../utilities/repl";
-import { isWorker, setupMaster, on, Worker, fork } from "cluster";
-import { manage, MessageHandler, ErrorLike } from "./promisified_ipc_manager";
-import { red, cyan, white, yellow, blue } from "colors";
-import { exec, ExecOptions } from "child_process";
-import { validate, ValidationError } from "jsonschema";
-import { Utilities } from "../utilities/utilities";
-import { readFileSync } from "fs";
-import IPCMessageReceiver from "./process_message_router";
-import { ServerWorker } from "./server_worker";
+import { ExitHandler } from './applied_session_agent';
+import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from '../utilities/session_config';
+import Repl, { ReplAction } from '../utilities/repl';
+import * as _cluster from 'cluster';
+import { Worker } from 'cluster';
+import { manage, MessageHandler, ErrorLike } from './promisified_ipc_manager';
+import { red, cyan, white, yellow, blue } from 'colors';
+import { exec, ExecOptions } from 'child_process';
+import { validate, ValidationError } from 'jsonschema';
+import { Utilities } from '../utilities/utilities';
+import { readFileSync } from 'fs';
+import IPCMessageReceiver from './process_message_router';
+import { ServerWorker } from './server_worker';
+const cluster = _cluster as any;
+const isWorker = cluster.isWorker;
+const setupMaster = cluster.setupPrimary;
+const on = cluster.on;
+const fork = cluster.fork;
/**
* Validates and reads the configuration file, accordingly builds a child process factory
@@ -26,14 +32,14 @@ export class Monitor extends IPCMessageReceiver {
public static Create() {
if (isWorker) {
- ServerWorker.IPCManager.emit("kill", {
- reason: "cannot create a monitor on the worker process.",
+ ServerWorker.IPCManager.emit('kill', {
+ reason: 'cannot create a monitor on the worker process.',
graceful: false,
- errorCode: 1
+ errorCode: 1,
});
process.exit(1);
} else if (++Monitor.count > 1) {
- console.error(red("cannot create more than one monitor."));
+ console.error(red('cannot create more than one monitor.'));
process.exit(1);
} else {
return new Monitor();
@@ -42,7 +48,7 @@ export class Monitor extends IPCMessageReceiver {
private constructor() {
super();
- console.log(this.timestamp(), cyan("initializing session..."));
+ console.log(this.timestamp(), cyan('initializing session...'));
this.configureInternalHandlers();
this.config = this.loadAndValidateConfiguration();
this.initializeClusterFunctions();
@@ -53,8 +59,8 @@ export class Monitor extends IPCMessageReceiver {
// handle exceptions in the master thread - there shouldn't be many of these
// the IPC (inter process communication) channel closed exception can't seem
// to be caught in a try catch, and is inconsequential, so it is ignored
- process.on("uncaughtException", ({ message, stack }): void => {
- if (message !== "Channel closed") {
+ process.on('uncaughtException', ({ message, stack }): void => {
+ if (message !== 'Channel closed') {
this.mainLog(red(message));
if (stack) {
this.mainLog(`uncaught exception\n${red(stack)}`);
@@ -62,36 +68,36 @@ export class Monitor extends IPCMessageReceiver {
}
});
- this.on("kill", ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode));
- this.on("lifecycle", ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`));
- }
+ this.on('kill', ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode));
+ this.on('lifecycle', ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`));
+ };
private initializeClusterFunctions = () => {
// determines whether or not we see the compilation / initialization / runtime output of each child server process
- const output = this.config.showServerOutput ? "inherit" : "ignore";
- setupMaster({ stdio: ["ignore", output, output, "ipc"] });
+ const output = this.config.showServerOutput ? 'inherit' : 'ignore';
+ setupMaster({ stdio: ['ignore', output, output, 'ipc'] });
// a helpful cluster event called on the master thread each time a child process exits
- on("exit", ({ process: { pid } }, code, signal) => {
- const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`;
+ on('exit', ({ process: { pid } }: { process: { pid: any } }, code: any, signal: any) => {
+ const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? '' : `, having encountered signal ${signal}`}.`;
this.mainLog(cyan(prompt));
// to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one
this.spawn();
});
- }
+ };
public finalize = (sessionKey: string): void => {
if (this.finalized) {
- throw new Error("Session monitor is already finalized");
+ throw new Error('Session monitor is already finalized');
}
this.finalized = true;
this.key = sessionKey;
this.spawn();
- }
+ };
public readonly coreHooks = Object.freeze({
onCrashDetected: (listener: MessageHandler<{ error: ErrorLike }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener),
- onServerRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener)
+ onServerRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener),
});
/**
@@ -101,12 +107,12 @@ export class Monitor extends IPCMessageReceiver {
* requests to complete) or immediately.
*/
public killSession = async (reason: string, graceful = true, errorCode = 0) => {
- this.mainLog(cyan(`exiting session ${graceful ? "clean" : "immediate"}ly`));
- this.mainLog(`session exit reason: ${(red(reason))}`);
+ this.mainLog(cyan(`exiting session ${graceful ? 'clean' : 'immediate'}ly`));
+ this.mainLog(`session exit reason: ${red(reason)}`);
await this.executeExitHandlers(true);
await this.killActiveWorker(graceful, true);
process.exit(errorCode);
- }
+ };
/**
* Execute the list of functions registered to be called
@@ -120,27 +126,27 @@ export class Monitor extends IPCMessageReceiver {
*/
public addReplCommand = (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => {
this.repl.registerCommand(basename, argPatterns, action);
- }
+ };
public exec = (command: string, options?: ExecOptions) => {
return new Promise<void>(resolve => {
- exec(command, { ...options, encoding: "utf8" }, (error, stdout, stderr) => {
+ exec(command, { ...options, encoding: 'utf8' }, (error, stdout, stderr) => {
if (error) {
this.execLog(red(`unable to execute ${white(command)}`));
- error.message.split("\n").forEach(line => line.length && this.execLog(red(`(error) ${line}`)));
+ error.message.split('\n').forEach(line => line.length && this.execLog(red(`(error) ${line}`)));
} else {
let outLines: string[], errorLines: string[];
- if ((outLines = stdout.split("\n").filter(line => line.length)).length) {
+ if ((outLines = stdout.split('\n').filter(line => line.length)).length) {
outLines.forEach(line => line.length && this.execLog(cyan(`(stdout) ${line}`)));
}
- if ((errorLines = stderr.split("\n").filter(line => line.length)).length) {
+ if ((errorLines = stderr.split('\n').filter(line => line.length)).length) {
errorLines.forEach(line => line.length && this.execLog(yellow(`(stderr) ${line}`)));
}
}
resolve();
});
});
- }
+ };
/**
* Generates a blue UTC string associated with the time
@@ -153,14 +159,14 @@ export class Monitor extends IPCMessageReceiver {
*/
public mainLog = (...optionalParams: any[]) => {
console.log(this.timestamp(), this.config.identifiers.master.text, ...optionalParams);
- }
+ };
/**
* A formatted, identified and timestamped log in color for non-
*/
private execLog = (...optionalParams: any[]) => {
console.log(this.timestamp(), this.config.identifiers.exec.text, ...optionalParams);
- }
+ };
/**
* Reads in configuration .json file only once, in the master thread
@@ -169,28 +175,28 @@ export class Monitor extends IPCMessageReceiver {
private loadAndValidateConfiguration = (): Configuration => {
let config: Configuration | undefined;
try {
- console.log(this.timestamp(), cyan("validating configuration..."));
+ console.log(this.timestamp(), cyan('validating configuration...'));
config = JSON.parse(readFileSync('./session.config.json', 'utf8'));
const options = {
throwError: true,
- allowUnknownAttributes: false
+ allowUnknownAttributes: false,
};
// ensure all necessary and no excess information is specified by the configuration file
validate(config, configurationSchema, options);
config = Utilities.preciseAssign({}, defaultConfig, config);
} catch (error: any) {
if (error instanceof ValidationError) {
- console.log(red("\nSession configuration failed."));
- console.log("The given session.config.json configuration file is invalid.");
+ console.log(red('\nSession configuration failed.'));
+ console.log('The given session.config.json configuration file is invalid.');
console.log(`${error.instance}: ${error.stack}`);
process.exit(0);
- } else if (error.code === "ENOENT" && error.path === "./session.config.json") {
- console.log(cyan("Loading default session parameters..."));
- console.log("Consider including a session.config.json configuration file in your project root for customization.");
+ } else if (error.code === 'ENOENT' && error.path === './session.config.json') {
+ console.log(cyan('Loading default session parameters...'));
+ console.log('Consider including a session.config.json configuration file in your project root for customization.');
config = Utilities.preciseAssign({}, defaultConfig);
} else {
- console.log(red("\nSession configuration failed."));
- console.log("The following unknown error occurred during configuration.");
+ console.log(red('\nSession configuration failed.'));
+ console.log('The following unknown error occurred during configuration.');
console.log(error.stack);
process.exit(0);
}
@@ -203,7 +209,7 @@ export class Monitor extends IPCMessageReceiver {
});
return config!;
}
- }
+ };
/**
* Builds the repl that allows the following commands to be typed into stdin of the master thread.
@@ -213,24 +219,24 @@ export class Monitor extends IPCMessageReceiver {
const boolean = /true|false/;
const number = /\d+/;
const letters = /[a-zA-Z]+/;
- repl.registerCommand("exit", [/clean|force/], args => this.killSession("manual exit requested by repl", args[0] === "clean", 0));
- repl.registerCommand("restart", [/clean|force/], args => this.killActiveWorker(args[0] === "clean"));
- repl.registerCommand("set", [letters, "port", number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === "true"));
- repl.registerCommand("set", [/polling/, number, boolean], args => {
+ repl.registerCommand('exit', [/clean|force/], args => this.killSession('manual exit requested by repl', args[0] === 'clean', 0));
+ repl.registerCommand('restart', [/clean|force/], args => this.killActiveWorker(args[0] === 'clean'));
+ repl.registerCommand('set', [letters, 'port', number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === 'true'));
+ repl.registerCommand('set', [/polling/, number, boolean], args => {
const newPollingIntervalSeconds = Math.floor(Number(args[1]));
if (newPollingIntervalSeconds < 0) {
- this.mainLog(red("the polling interval must be a non-negative integer"));
+ this.mainLog(red('the polling interval must be a non-negative integer'));
} else {
if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) {
this.config.polling.intervalSeconds = newPollingIntervalSeconds;
- if (args[2] === "true") {
- Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds });
+ if (args[2] === 'true') {
+ Monitor.IPCManager.emit('updatePollingInterval', { newPollingIntervalSeconds });
}
}
}
});
return repl;
- }
+ };
private executeExitHandlers = async (reason: Error | boolean) => Promise.all(this.exitHandlers.map(handler => handler(reason)));
@@ -240,13 +246,13 @@ export class Monitor extends IPCMessageReceiver {
private killActiveWorker = async (graceful = true, isSessionEnd = false): Promise<void> => {
if (this.activeWorker && !this.activeWorker.isDead()) {
if (graceful) {
- Monitor.IPCManager.emit("manualExit", { isSessionEnd });
+ Monitor.IPCManager.emit('manualExit', { isSessionEnd });
} else {
await ServerWorker.IPCManager.destroy();
this.activeWorker.process.kill();
}
}
- }
+ };
/**
* Allows the caller to set the port at which the target (be it the server,
@@ -255,7 +261,7 @@ export class Monitor extends IPCMessageReceiver {
* at the port. Otherwise, the updated port won't be used until / unless the child
* dies on its own and triggers a restart.
*/
- private setPort = (port: "server" | "socket" | string, value: number, immediateRestart: boolean): void => {
+ private setPort = (port: 'server' | 'socket' | string, value: number, immediateRestart: boolean): void => {
if (value > 1023 && value < 65536) {
this.config.ports[port] = value;
if (immediateRestart) {
@@ -264,7 +270,7 @@ export class Monitor extends IPCMessageReceiver {
} else {
this.mainLog(red(`${port} is an invalid port number`));
}
- }
+ };
/**
* Kills the current active worker and proceeds to spawn a new worker,
@@ -272,27 +278,29 @@ export class Monitor extends IPCMessageReceiver {
*/
private spawn = async (): Promise<void> => {
await this.killActiveWorker();
- const { config: { polling, ports }, key } = this;
+ const {
+ config: { polling, ports },
+ key,
+ } = this;
this.activeWorker = fork({
pollingRoute: polling.route,
pollingFailureTolerance: polling.failureTolerance,
serverPort: ports.server,
socketPort: ports.socket,
pollingIntervalSeconds: polling.intervalSeconds,
- session_key: key
+ session_key: key,
});
- Monitor.IPCManager = manage(this.activeWorker.process, this.handlers);
+ if (this.activeWorker) {
+ Monitor.IPCManager = manage(this.activeWorker.process, this.handlers);
+ }
this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`));
- }
-
+ };
}
export namespace Monitor {
-
export enum IntrinsicEvents {
- KeyGenerated = "key_generated",
- CrashDetected = "crash_detected",
- ServerRunning = "server_running"
+ KeyGenerated = 'key_generated',
+ CrashDetected = 'crash_detected',
+ ServerRunning = 'server_running',
}
-
-} \ No newline at end of file
+}