diff options
Diffstat (limited to 'src/server/Session/session.ts')
-rw-r--r-- | src/server/Session/session.ts | 131 |
1 files changed, 83 insertions, 48 deletions
diff --git a/src/server/Session/session.ts b/src/server/Session/session.ts index 789a40c42..61b8bcf16 100644 --- a/src/server/Session/session.ts +++ b/src/server/Session/session.ts @@ -1,12 +1,10 @@ import { red, cyan, green, yellow, magenta } from "colors"; -import { isMaster, on, fork, setupMaster, Worker } from "cluster"; +import { on, fork, setupMaster, Worker } from "cluster"; import { execSync } from "child_process"; import { get } from "request-promise"; -import { WebSocket } from "../Websocket/Websocket"; import { Utils } from "../../Utils"; -import { MessageStore } from "../Message"; import { Email } from "../ActionUtilities"; -import Repl from "../repl"; +import Repl, { ReplAction } from "../repl"; import { readFileSync } from "fs"; import { validate, ValidationError } from "jsonschema"; import { configurationSchema } from "./session_config_schema"; @@ -26,26 +24,35 @@ const onWindows = process.platform === "win32"; */ export namespace Session { + interface MasterCustomizer { + addReplCommand: (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => void; + addChildMessageHandler: (message: string, handler: ActionHandler) => void; + } + + export interface SessionAction { + message: string; + args: any; + } + + export type ExitHandler = (error: Error) => void | Promise<void>; + export type ActionHandler = (action: SessionAction) => void | Promise<void>; + export interface EmailTemplate { + subject: string; + body: string; + } + export type CrashEmailGenerator = (error: Error) => EmailTemplate | Promise<EmailTemplate>; + /** * Validates and reads the configuration file, accordingly builds a child process factory * and spawns off an initial process that will respawn as predecessors die. */ - export async function initializeMonitorThread(): Promise<Repl> { + export async function initializeMonitorThread(crashEmailGenerator?: CrashEmailGenerator): Promise<MasterCustomizer> { let activeWorker: Worker; + const childMessageHandlers: { [message: string]: (action: SessionAction, args: any) => void } = {}; // read in configuration .json file only once, in the master thread // pass down any variables the pertinent to the child processes as environment variables - const { - masterIdentifier, - workerIdentifier, - recipients, - signature, - heartbeatRoute, - serverPort, - socketPort, - showServerOutput, - pollingIntervalSeconds - } = function loadConfiguration(): any { + const configuration = function loadConfiguration(): any { try { const configuration = JSON.parse(readFileSync('./session.config.json', 'utf8')); const options = { @@ -54,8 +61,8 @@ export namespace Session { }; // ensure all necessary and no excess information is specified by the configuration file validate(configuration, configurationSchema, options); - configuration.masterIdentifier = `${yellow(configuration.masterIdentifier)}:`; - configuration.workerIdentifier = `${magenta(configuration.workerIdentifier)}:`; + configuration.masterIdentifier = yellow(configuration.masterIdentifier + ":"); + configuration.workerIdentifier = magenta(configuration.workerIdentifier + ":"); return configuration; } catch (error) { console.log(red("\nSession configuration failed.")); @@ -73,6 +80,17 @@ export namespace Session { } }(); + const { + masterIdentifier, + workerIdentifier, + recipients, + ports, + signature, + heartbeatRoute, + showServerOutput, + pollingIntervalSeconds + } = configuration; + // this sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone // to kill the server via the /kill/:key route const key = Utils.GenerateGuid(); @@ -92,7 +110,7 @@ export namespace Session { if (message !== "Channel closed") { console.log(masterIdentifier, red(message)); if (stack) { - console.log(masterIdentifier, `\n${red(stack)}`); + console.log(masterIdentifier, `uncaught exception\n${red(stack)}`); } } }); @@ -113,39 +131,56 @@ export namespace Session { return false; }; + const restart = () => { + // indicate to the worker that we are 'expecting' this restart + activeWorker.send({ setListening: false }); + tryKillActiveWorker(); + }; + + const setPort = (port: string, value: number, immediateRestart: boolean) => { + ports[port] = value; + if (immediateRestart) { + restart(); + } + }; + // kills the current active worker and proceeds to spawn a new worker, // feeding in configuration information as environment variables const spawn = (): void => { tryKillActiveWorker(); activeWorker = fork({ heartbeatRoute, - serverPort, - socketPort, + serverPort: ports.server, + socketPort: ports.socket, pollingIntervalSeconds, session_key: key }); console.log(masterIdentifier, `spawned new server worker with process id ${activeWorker.process.pid}`); // an IPC message handler that executes actions on the master thread when prompted by the active worker - activeWorker.on("message", ({ lifecycle, action }) => { + activeWorker.on("message", async ({ lifecycle, action }) => { if (action) { - const { message, args } = action; + const { message, args } = action as SessionAction; console.log(`${workerIdentifier} action requested (${cyan(message)})`); switch (message) { case "kill": - console.log(masterIdentifier, red("An authorized user has ended the server session from the /kill route")); + console.log(masterIdentifier, red("An authorized user has manually ended the server session")); tryKillActiveWorker(false); process.exit(0); case "notify_crash": - const { error: { name, message, stack } } = args; - const content = [ - "You, as a Dash Administrator, are being notified of a server crash event. Here's what we know:", - `name:\n${name}`, - `message:\n${message}`, - `stack:\n${stack}`, - "The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress.", - signature - ].join("\n\n"); - Email.dispatchAll(recipients, "Dash Web Server Crash", content); + if (crashEmailGenerator) { + const { error } = args; + const { subject, body } = await crashEmailGenerator(error); + const content = `${body}\n\n${signature}`; + Email.dispatchAll(recipients, subject, content); + } + case "set_port": + const { port, value, immediateRestart } = args; + setPort(port, value, immediateRestart); + default: + const handler = childMessageHandlers[message]; + if (handler) { + handler(action, args); + } } } else if (lifecycle) { console.log(`${workerIdentifier} lifecycle phase (${lifecycle})`); @@ -155,7 +190,7 @@ export namespace Session { // a helpful cluster event called on the master thread each time a child process exits on("exit", ({ process: { pid } }, code, signal) => { - const prompt = `Server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`; + const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`; console.log(masterIdentifier, cyan(prompt)); // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one spawn(); @@ -164,17 +199,18 @@ export namespace Session { // builds the repl that allows the following commands to be typed into stdin of the master thread const repl = new Repl({ identifier: masterIdentifier }); repl.registerCommand("exit", [], () => execSync(onWindows ? "taskkill /f /im node.exe" : "killall -9 node")); - repl.registerCommand("restart", [], () => { - // indicate to the worker that we are 'expecting' this restart - activeWorker.send({ setListening: false }); - tryKillActiveWorker(); + repl.registerCommand("restart", [], restart); + repl.registerCommand("set", [/[a-zA-Z]+/g, "port", /\d+/g, /true|false/g], args => { + setPort(args[0], Number(args[2]), args[3] === "true"); }); - // finally, set things in motion by spawning off the first child (server) process spawn(); // returned to allow the caller to add custom commands - return repl; + return { + addReplCommand: repl.registerCommand, + addChildMessageHandler: (message: string, handler: ActionHandler) => { childMessageHandlers[message] = handler; } + }; } /** @@ -183,8 +219,9 @@ export namespace Session { * email if the server encounters an uncaught exception or if the server cannot be reached. * @param work the function specifying the work to be done by each worker thread */ - export async function initializeWorkerThread(work: Function): Promise<void> { + export async function initializeWorkerThread(work: Function): Promise<(handler: ExitHandler) => void> { let listening = false; + const exitHandlers: ExitHandler[] = []; // notify master thread (which will log update in the console) of initialization via IPC process.send?.({ lifecycle: green("initializing...") }); @@ -194,7 +231,7 @@ export namespace Session { // called whenever the process has a reason to terminate, either through an uncaught exception // in the process (potentially inconsistent state) or the server cannot be reached - const activeExit = (error: Error): void => { + const activeExit = async (error: Error): Promise<void> => { if (!listening) { return; } @@ -206,11 +243,7 @@ export namespace Session { args: { error } } }); - const { _socket } = WebSocket; - // notifies all client users of a crash event - if (_socket) { - Utils.Emit(_socket, MessageStore.ConnectionTerminated, "Manual"); - } + await Promise.all(exitHandlers.map(handler => handler(error))); // notify master thread (which will log update in the console) of crash event via IPC process.send?.({ lifecycle: red(`Crash event detected @ ${new Date().toUTCString()}`) }); process.send?.({ lifecycle: red(error.message) }); @@ -253,6 +286,8 @@ export namespace Session { work(); checkHeartbeat(); // begin polling + + return (handler: ExitHandler) => exitHandlers.push(handler); } }
\ No newline at end of file |