diff options
author | Sam Wilkins <samwilkins333@gmail.com> | 2020-01-05 14:50:54 -0800 |
---|---|---|
committer | Sam Wilkins <samwilkins333@gmail.com> | 2020-01-05 14:50:54 -0800 |
commit | d4e7e354ec9209f117054ead9970ea9f180dd17b (patch) | |
tree | f36a17978ef9595f5ee1f4255299b2a51fcda73a | |
parent | ba1a85c4833820bc228779aa9187315d8b711268 (diff) |
port config, customizers
-rw-r--r-- | session.config.json | 6 | ||||
-rw-r--r-- | src/server/Session/session.ts | 131 | ||||
-rw-r--r-- | src/server/Session/session_config_schema.ts | 13 | ||||
-rw-r--r-- | src/server/index.ts | 42 | ||||
-rw-r--r-- | src/server/repl.ts | 27 |
5 files changed, 145 insertions, 74 deletions
diff --git a/session.config.json b/session.config.json index d8f86d239..ebe17763e 100644 --- a/session.config.json +++ b/session.config.json @@ -3,8 +3,10 @@ "recipients": [ "samuel_wilkins@brown.edu" ], - "serverPort": 1050, - "socketPort": 4321, + "ports": { + "server": 1050, + "socket": 4321 + }, "heartbeatRoute": "/serverHeartbeat", "pollingIntervalSeconds": 15, "masterIdentifier": "__master__", diff --git a/src/server/Session/session.ts b/src/server/Session/session.ts index 789a40c42..61b8bcf16 100644 --- a/src/server/Session/session.ts +++ b/src/server/Session/session.ts @@ -1,12 +1,10 @@ import { red, cyan, green, yellow, magenta } from "colors"; -import { isMaster, on, fork, setupMaster, Worker } from "cluster"; +import { on, fork, setupMaster, Worker } from "cluster"; import { execSync } from "child_process"; import { get } from "request-promise"; -import { WebSocket } from "../Websocket/Websocket"; import { Utils } from "../../Utils"; -import { MessageStore } from "../Message"; import { Email } from "../ActionUtilities"; -import Repl from "../repl"; +import Repl, { ReplAction } from "../repl"; import { readFileSync } from "fs"; import { validate, ValidationError } from "jsonschema"; import { configurationSchema } from "./session_config_schema"; @@ -26,26 +24,35 @@ const onWindows = process.platform === "win32"; */ export namespace Session { + interface MasterCustomizer { + addReplCommand: (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => void; + addChildMessageHandler: (message: string, handler: ActionHandler) => void; + } + + export interface SessionAction { + message: string; + args: any; + } + + export type ExitHandler = (error: Error) => void | Promise<void>; + export type ActionHandler = (action: SessionAction) => void | Promise<void>; + export interface EmailTemplate { + subject: string; + body: string; + } + export type CrashEmailGenerator = (error: Error) => EmailTemplate | Promise<EmailTemplate>; + /** * Validates and reads the configuration file, accordingly builds a child process factory * and spawns off an initial process that will respawn as predecessors die. */ - export async function initializeMonitorThread(): Promise<Repl> { + export async function initializeMonitorThread(crashEmailGenerator?: CrashEmailGenerator): Promise<MasterCustomizer> { let activeWorker: Worker; + const childMessageHandlers: { [message: string]: (action: SessionAction, args: any) => void } = {}; // read in configuration .json file only once, in the master thread // pass down any variables the pertinent to the child processes as environment variables - const { - masterIdentifier, - workerIdentifier, - recipients, - signature, - heartbeatRoute, - serverPort, - socketPort, - showServerOutput, - pollingIntervalSeconds - } = function loadConfiguration(): any { + const configuration = function loadConfiguration(): any { try { const configuration = JSON.parse(readFileSync('./session.config.json', 'utf8')); const options = { @@ -54,8 +61,8 @@ export namespace Session { }; // ensure all necessary and no excess information is specified by the configuration file validate(configuration, configurationSchema, options); - configuration.masterIdentifier = `${yellow(configuration.masterIdentifier)}:`; - configuration.workerIdentifier = `${magenta(configuration.workerIdentifier)}:`; + configuration.masterIdentifier = yellow(configuration.masterIdentifier + ":"); + configuration.workerIdentifier = magenta(configuration.workerIdentifier + ":"); return configuration; } catch (error) { console.log(red("\nSession configuration failed.")); @@ -73,6 +80,17 @@ export namespace Session { } }(); + const { + masterIdentifier, + workerIdentifier, + recipients, + ports, + signature, + heartbeatRoute, + showServerOutput, + pollingIntervalSeconds + } = configuration; + // this sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone // to kill the server via the /kill/:key route const key = Utils.GenerateGuid(); @@ -92,7 +110,7 @@ export namespace Session { if (message !== "Channel closed") { console.log(masterIdentifier, red(message)); if (stack) { - console.log(masterIdentifier, `\n${red(stack)}`); + console.log(masterIdentifier, `uncaught exception\n${red(stack)}`); } } }); @@ -113,39 +131,56 @@ export namespace Session { return false; }; + const restart = () => { + // indicate to the worker that we are 'expecting' this restart + activeWorker.send({ setListening: false }); + tryKillActiveWorker(); + }; + + const setPort = (port: string, value: number, immediateRestart: boolean) => { + ports[port] = value; + if (immediateRestart) { + restart(); + } + }; + // kills the current active worker and proceeds to spawn a new worker, // feeding in configuration information as environment variables const spawn = (): void => { tryKillActiveWorker(); activeWorker = fork({ heartbeatRoute, - serverPort, - socketPort, + serverPort: ports.server, + socketPort: ports.socket, pollingIntervalSeconds, session_key: key }); console.log(masterIdentifier, `spawned new server worker with process id ${activeWorker.process.pid}`); // an IPC message handler that executes actions on the master thread when prompted by the active worker - activeWorker.on("message", ({ lifecycle, action }) => { + activeWorker.on("message", async ({ lifecycle, action }) => { if (action) { - const { message, args } = action; + const { message, args } = action as SessionAction; console.log(`${workerIdentifier} action requested (${cyan(message)})`); switch (message) { case "kill": - console.log(masterIdentifier, red("An authorized user has ended the server session from the /kill route")); + console.log(masterIdentifier, red("An authorized user has manually ended the server session")); tryKillActiveWorker(false); process.exit(0); case "notify_crash": - const { error: { name, message, stack } } = args; - const content = [ - "You, as a Dash Administrator, are being notified of a server crash event. Here's what we know:", - `name:\n${name}`, - `message:\n${message}`, - `stack:\n${stack}`, - "The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress.", - signature - ].join("\n\n"); - Email.dispatchAll(recipients, "Dash Web Server Crash", content); + if (crashEmailGenerator) { + const { error } = args; + const { subject, body } = await crashEmailGenerator(error); + const content = `${body}\n\n${signature}`; + Email.dispatchAll(recipients, subject, content); + } + case "set_port": + const { port, value, immediateRestart } = args; + setPort(port, value, immediateRestart); + default: + const handler = childMessageHandlers[message]; + if (handler) { + handler(action, args); + } } } else if (lifecycle) { console.log(`${workerIdentifier} lifecycle phase (${lifecycle})`); @@ -155,7 +190,7 @@ export namespace Session { // a helpful cluster event called on the master thread each time a child process exits on("exit", ({ process: { pid } }, code, signal) => { - const prompt = `Server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`; + const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`; console.log(masterIdentifier, cyan(prompt)); // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one spawn(); @@ -164,17 +199,18 @@ export namespace Session { // builds the repl that allows the following commands to be typed into stdin of the master thread const repl = new Repl({ identifier: masterIdentifier }); repl.registerCommand("exit", [], () => execSync(onWindows ? "taskkill /f /im node.exe" : "killall -9 node")); - repl.registerCommand("restart", [], () => { - // indicate to the worker that we are 'expecting' this restart - activeWorker.send({ setListening: false }); - tryKillActiveWorker(); + repl.registerCommand("restart", [], restart); + repl.registerCommand("set", [/[a-zA-Z]+/g, "port", /\d+/g, /true|false/g], args => { + setPort(args[0], Number(args[2]), args[3] === "true"); }); - // finally, set things in motion by spawning off the first child (server) process spawn(); // returned to allow the caller to add custom commands - return repl; + return { + addReplCommand: repl.registerCommand, + addChildMessageHandler: (message: string, handler: ActionHandler) => { childMessageHandlers[message] = handler; } + }; } /** @@ -183,8 +219,9 @@ export namespace Session { * email if the server encounters an uncaught exception or if the server cannot be reached. * @param work the function specifying the work to be done by each worker thread */ - export async function initializeWorkerThread(work: Function): Promise<void> { + export async function initializeWorkerThread(work: Function): Promise<(handler: ExitHandler) => void> { let listening = false; + const exitHandlers: ExitHandler[] = []; // notify master thread (which will log update in the console) of initialization via IPC process.send?.({ lifecycle: green("initializing...") }); @@ -194,7 +231,7 @@ export namespace Session { // called whenever the process has a reason to terminate, either through an uncaught exception // in the process (potentially inconsistent state) or the server cannot be reached - const activeExit = (error: Error): void => { + const activeExit = async (error: Error): Promise<void> => { if (!listening) { return; } @@ -206,11 +243,7 @@ export namespace Session { args: { error } } }); - const { _socket } = WebSocket; - // notifies all client users of a crash event - if (_socket) { - Utils.Emit(_socket, MessageStore.ConnectionTerminated, "Manual"); - } + await Promise.all(exitHandlers.map(handler => handler(error))); // notify master thread (which will log update in the console) of crash event via IPC process.send?.({ lifecycle: red(`Crash event detected @ ${new Date().toUTCString()}`) }); process.send?.({ lifecycle: red(error.message) }); @@ -253,6 +286,8 @@ export namespace Session { work(); checkHeartbeat(); // begin polling + + return (handler: ExitHandler) => exitHandlers.push(handler); } }
\ No newline at end of file diff --git a/src/server/Session/session_config_schema.ts b/src/server/Session/session_config_schema.ts index 03009a351..34d1ad523 100644 --- a/src/server/Session/session_config_schema.ts +++ b/src/server/Session/session_config_schema.ts @@ -3,7 +3,7 @@ import { Schema } from "jsonschema"; const emailPattern = /^(([a-zA-Z0-9_.-])+@([a-zA-Z0-9_.-])+\.([a-zA-Z])+([a-zA-Z])+)?$/g; const localPortPattern = /\/[a-zA-Z]+/g; -const properties = { +const properties: { [name: string]: Schema } = { recipients: { type: "array", items: { @@ -12,8 +12,15 @@ const properties = { }, minLength: 1 }, - serverPort: { type: "number" }, - socketPort: { type: "number" }, + ports: { + type: "object", + properties: { + server: { type: "number" }, + socket: { type: "number" } + }, + required: ["server"], + additionalProperties: true + }, heartbeatRoute: { type: "string", pattern: localPortPattern diff --git a/src/server/index.ts b/src/server/index.ts index 4400687d8..0fee41bd8 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -26,6 +26,8 @@ import { yellow } from "colors"; import { Session } from "./Session/session"; import { isMaster } from "cluster"; import { execSync } from "child_process"; +import { Utils } from "../Utils"; +import { MessageStore } from "./Message"; export const publicDirectory = path.resolve(__dirname, "public"); export const filesDirectory = path.resolve(publicDirectory, "files"); @@ -132,17 +134,31 @@ function routeSetter({ isRelease, addSupervisedRoute, logRegistrationOutcome }: /** * Thread dependent session initialization */ -if (isMaster) { - Session.initializeMonitorThread().then(({ registerCommand }) => { - registerCommand("pull", [], () => execSync("git pull", { stdio: ["ignore", "inherit", "inherit"] })); - }); -} else { - Session.initializeWorkerThread(async () => { - await log_execution({ - startMessage: "\nstarting execution of preliminary functions", - endMessage: "completed preliminary functions\n", - action: preliminaryFunctions +(async function launch() { + if (isMaster) { + const emailGenerator = (error: Error) => { + const subject = "Dash Web Server Crash"; + const { name, message, stack } = error; + const body = [ + "You, as a Dash Administrator, are being notified of a server crash event. Here's what we know:", + `name:\n${name}`, + `message:\n${message}`, + `stack:\n${stack}`, + "The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress.", + ].join("\n\n"); + return { subject, body }; + }; + const customizer = await Session.initializeMonitorThread(emailGenerator); + customizer.addReplCommand("pull", [], () => execSync("git pull", { stdio: ["ignore", "inherit", "inherit"] })); + } else { + const addExitHandler = await Session.initializeWorkerThread(async () => { + await log_execution({ + startMessage: "\nstarting execution of preliminary functions", + endMessage: "completed preliminary functions\n", + action: preliminaryFunctions + }); + await initializeServer(routeSetter); }); - await initializeServer(routeSetter); - }); -}
\ No newline at end of file + addExitHandler(() => Utils.Emit(WebSocket._socket, MessageStore.ConnectionTerminated, "Manual")); + } +})();
\ No newline at end of file diff --git a/src/server/repl.ts b/src/server/repl.ts index ec525582b..a47d4aad4 100644 --- a/src/server/repl.ts +++ b/src/server/repl.ts @@ -1,30 +1,33 @@ import { createInterface, Interface } from "readline"; -import { red } from "colors"; +import { red, green, white } from "colors"; export interface Configuration { identifier: string; onInvalid?: (culprit?: string) => string | string; + onValid?: (success?: string) => string | string; isCaseSensitive?: boolean; } -type Action = (parsedArgs: IterableIterator<string>) => any | Promise<any>; +export type ReplAction = (parsedArgs: Array<string>) => any | Promise<any>; export interface Registration { argPatterns: RegExp[]; - action: Action; + action: ReplAction; } export default class Repl { private identifier: string; - private onInvalid: ((culprit?: string) => string) | string; + private onInvalid: (culprit?: string) => string | string; + private onValid: (success: string) => string | string; private isCaseSensitive: boolean; private commandMap = new Map<string, Registration[]>(); public interface: Interface; private busy = false; private keys: string | undefined; - constructor({ identifier: prompt, onInvalid, isCaseSensitive }: Configuration) { + constructor({ identifier: prompt, onInvalid, onValid, isCaseSensitive }: Configuration) { this.identifier = prompt; this.onInvalid = onInvalid || this.usage; + this.onValid = onValid || this.success; this.isCaseSensitive = isCaseSensitive ?? true; this.interface = createInterface(process.stdin, process.stdout).on('line', this.considerInput); } @@ -43,7 +46,9 @@ export default class Repl { return `${this.identifier} commands: { ${members.sort().join(", ")} }`; } - public registerCommand = (basename: string, argPatterns: (RegExp | string)[], action: Action) => { + private success = (command: string) => `${this.identifier} completed execution of ${white(command)}`; + + public registerCommand = (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => { const existing = this.commandMap.get(basename); const converted = argPatterns.map(input => input instanceof RegExp ? input : new RegExp(input)); const registration = { argPatterns: converted, action }; @@ -59,7 +64,13 @@ export default class Repl { this.busy = false; } + private valid = (command: string) => { + console.log(green(typeof this.onValid === "string" ? this.onValid : this.onValid(command))); + this.busy = false; + } + private considerInput = async (line: string) => { + console.log("raw", line); if (this.busy) { console.log(red("Busy")); return; @@ -91,8 +102,8 @@ export default class Repl { matched = true; } if (!length || matched) { - await action(parsed[Symbol.iterator]()); - this.busy = false; + await action(parsed); + this.valid(`${command} ${parsed.join(" ")}`); return; } } |