aboutsummaryrefslogtreecommitdiff
path: root/src/server/Session/session.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/Session/session.ts')
-rw-r--r--src/server/Session/session.ts131
1 files changed, 83 insertions, 48 deletions
diff --git a/src/server/Session/session.ts b/src/server/Session/session.ts
index 789a40c42..61b8bcf16 100644
--- a/src/server/Session/session.ts
+++ b/src/server/Session/session.ts
@@ -1,12 +1,10 @@
import { red, cyan, green, yellow, magenta } from "colors";
-import { isMaster, on, fork, setupMaster, Worker } from "cluster";
+import { on, fork, setupMaster, Worker } from "cluster";
import { execSync } from "child_process";
import { get } from "request-promise";
-import { WebSocket } from "../Websocket/Websocket";
import { Utils } from "../../Utils";
-import { MessageStore } from "../Message";
import { Email } from "../ActionUtilities";
-import Repl from "../repl";
+import Repl, { ReplAction } from "../repl";
import { readFileSync } from "fs";
import { validate, ValidationError } from "jsonschema";
import { configurationSchema } from "./session_config_schema";
@@ -26,26 +24,35 @@ const onWindows = process.platform === "win32";
*/
export namespace Session {
+ interface MasterCustomizer {
+ addReplCommand: (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => void;
+ addChildMessageHandler: (message: string, handler: ActionHandler) => void;
+ }
+
+ export interface SessionAction {
+ message: string;
+ args: any;
+ }
+
+ export type ExitHandler = (error: Error) => void | Promise<void>;
+ export type ActionHandler = (action: SessionAction) => void | Promise<void>;
+ export interface EmailTemplate {
+ subject: string;
+ body: string;
+ }
+ export type CrashEmailGenerator = (error: Error) => EmailTemplate | Promise<EmailTemplate>;
+
/**
* Validates and reads the configuration file, accordingly builds a child process factory
* and spawns off an initial process that will respawn as predecessors die.
*/
- export async function initializeMonitorThread(): Promise<Repl> {
+ export async function initializeMonitorThread(crashEmailGenerator?: CrashEmailGenerator): Promise<MasterCustomizer> {
let activeWorker: Worker;
+ const childMessageHandlers: { [message: string]: (action: SessionAction, args: any) => void } = {};
// read in configuration .json file only once, in the master thread
// pass down any variables the pertinent to the child processes as environment variables
- const {
- masterIdentifier,
- workerIdentifier,
- recipients,
- signature,
- heartbeatRoute,
- serverPort,
- socketPort,
- showServerOutput,
- pollingIntervalSeconds
- } = function loadConfiguration(): any {
+ const configuration = function loadConfiguration(): any {
try {
const configuration = JSON.parse(readFileSync('./session.config.json', 'utf8'));
const options = {
@@ -54,8 +61,8 @@ export namespace Session {
};
// ensure all necessary and no excess information is specified by the configuration file
validate(configuration, configurationSchema, options);
- configuration.masterIdentifier = `${yellow(configuration.masterIdentifier)}:`;
- configuration.workerIdentifier = `${magenta(configuration.workerIdentifier)}:`;
+ configuration.masterIdentifier = yellow(configuration.masterIdentifier + ":");
+ configuration.workerIdentifier = magenta(configuration.workerIdentifier + ":");
return configuration;
} catch (error) {
console.log(red("\nSession configuration failed."));
@@ -73,6 +80,17 @@ export namespace Session {
}
}();
+ const {
+ masterIdentifier,
+ workerIdentifier,
+ recipients,
+ ports,
+ signature,
+ heartbeatRoute,
+ showServerOutput,
+ pollingIntervalSeconds
+ } = configuration;
+
// this sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone
// to kill the server via the /kill/:key route
const key = Utils.GenerateGuid();
@@ -92,7 +110,7 @@ export namespace Session {
if (message !== "Channel closed") {
console.log(masterIdentifier, red(message));
if (stack) {
- console.log(masterIdentifier, `\n${red(stack)}`);
+ console.log(masterIdentifier, `uncaught exception\n${red(stack)}`);
}
}
});
@@ -113,39 +131,56 @@ export namespace Session {
return false;
};
+ const restart = () => {
+ // indicate to the worker that we are 'expecting' this restart
+ activeWorker.send({ setListening: false });
+ tryKillActiveWorker();
+ };
+
+ const setPort = (port: string, value: number, immediateRestart: boolean) => {
+ ports[port] = value;
+ if (immediateRestart) {
+ restart();
+ }
+ };
+
// kills the current active worker and proceeds to spawn a new worker,
// feeding in configuration information as environment variables
const spawn = (): void => {
tryKillActiveWorker();
activeWorker = fork({
heartbeatRoute,
- serverPort,
- socketPort,
+ serverPort: ports.server,
+ socketPort: ports.socket,
pollingIntervalSeconds,
session_key: key
});
console.log(masterIdentifier, `spawned new server worker with process id ${activeWorker.process.pid}`);
// an IPC message handler that executes actions on the master thread when prompted by the active worker
- activeWorker.on("message", ({ lifecycle, action }) => {
+ activeWorker.on("message", async ({ lifecycle, action }) => {
if (action) {
- const { message, args } = action;
+ const { message, args } = action as SessionAction;
console.log(`${workerIdentifier} action requested (${cyan(message)})`);
switch (message) {
case "kill":
- console.log(masterIdentifier, red("An authorized user has ended the server session from the /kill route"));
+ console.log(masterIdentifier, red("An authorized user has manually ended the server session"));
tryKillActiveWorker(false);
process.exit(0);
case "notify_crash":
- const { error: { name, message, stack } } = args;
- const content = [
- "You, as a Dash Administrator, are being notified of a server crash event. Here's what we know:",
- `name:\n${name}`,
- `message:\n${message}`,
- `stack:\n${stack}`,
- "The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress.",
- signature
- ].join("\n\n");
- Email.dispatchAll(recipients, "Dash Web Server Crash", content);
+ if (crashEmailGenerator) {
+ const { error } = args;
+ const { subject, body } = await crashEmailGenerator(error);
+ const content = `${body}\n\n${signature}`;
+ Email.dispatchAll(recipients, subject, content);
+ }
+ case "set_port":
+ const { port, value, immediateRestart } = args;
+ setPort(port, value, immediateRestart);
+ default:
+ const handler = childMessageHandlers[message];
+ if (handler) {
+ handler(action, args);
+ }
}
} else if (lifecycle) {
console.log(`${workerIdentifier} lifecycle phase (${lifecycle})`);
@@ -155,7 +190,7 @@ export namespace Session {
// a helpful cluster event called on the master thread each time a child process exits
on("exit", ({ process: { pid } }, code, signal) => {
- const prompt = `Server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`;
+ const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`;
console.log(masterIdentifier, cyan(prompt));
// to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one
spawn();
@@ -164,17 +199,18 @@ export namespace Session {
// builds the repl that allows the following commands to be typed into stdin of the master thread
const repl = new Repl({ identifier: masterIdentifier });
repl.registerCommand("exit", [], () => execSync(onWindows ? "taskkill /f /im node.exe" : "killall -9 node"));
- repl.registerCommand("restart", [], () => {
- // indicate to the worker that we are 'expecting' this restart
- activeWorker.send({ setListening: false });
- tryKillActiveWorker();
+ repl.registerCommand("restart", [], restart);
+ repl.registerCommand("set", [/[a-zA-Z]+/g, "port", /\d+/g, /true|false/g], args => {
+ setPort(args[0], Number(args[2]), args[3] === "true");
});
-
// finally, set things in motion by spawning off the first child (server) process
spawn();
// returned to allow the caller to add custom commands
- return repl;
+ return {
+ addReplCommand: repl.registerCommand,
+ addChildMessageHandler: (message: string, handler: ActionHandler) => { childMessageHandlers[message] = handler; }
+ };
}
/**
@@ -183,8 +219,9 @@ export namespace Session {
* email if the server encounters an uncaught exception or if the server cannot be reached.
* @param work the function specifying the work to be done by each worker thread
*/
- export async function initializeWorkerThread(work: Function): Promise<void> {
+ export async function initializeWorkerThread(work: Function): Promise<(handler: ExitHandler) => void> {
let listening = false;
+ const exitHandlers: ExitHandler[] = [];
// notify master thread (which will log update in the console) of initialization via IPC
process.send?.({ lifecycle: green("initializing...") });
@@ -194,7 +231,7 @@ export namespace Session {
// called whenever the process has a reason to terminate, either through an uncaught exception
// in the process (potentially inconsistent state) or the server cannot be reached
- const activeExit = (error: Error): void => {
+ const activeExit = async (error: Error): Promise<void> => {
if (!listening) {
return;
}
@@ -206,11 +243,7 @@ export namespace Session {
args: { error }
}
});
- const { _socket } = WebSocket;
- // notifies all client users of a crash event
- if (_socket) {
- Utils.Emit(_socket, MessageStore.ConnectionTerminated, "Manual");
- }
+ await Promise.all(exitHandlers.map(handler => handler(error)));
// notify master thread (which will log update in the console) of crash event via IPC
process.send?.({ lifecycle: red(`Crash event detected @ ${new Date().toUTCString()}`) });
process.send?.({ lifecycle: red(error.message) });
@@ -253,6 +286,8 @@ export namespace Session {
work();
checkHeartbeat(); // begin polling
+
+ return (handler: ExitHandler) => exitHandlers.push(handler);
}
} \ No newline at end of file