diff options
author | Sam Wilkins <samwilkins333@gmail.com> | 2020-01-04 11:12:45 -0800 |
---|---|---|
committer | Sam Wilkins <samwilkins333@gmail.com> | 2020-01-04 11:12:45 -0800 |
commit | 19b62446a1f05048c6fa940ea4cd7a94021d4ab1 (patch) | |
tree | 629f8eefd3af2f9c4bf51fcf4749452f32150ecb /src | |
parent | 9a3df8031b55a16ad55cef6975ff8bf4f681f14e (diff) |
cleaner refactor, improved use of configuration
Diffstat (limited to 'src')
-rw-r--r-- | src/server/ActionUtilities.ts | 4 | ||||
-rw-r--r-- | src/server/Session/session.ts | 348 | ||||
-rw-r--r-- | src/server/Session/session_config_schema.ts | 44 | ||||
-rw-r--r-- | src/server/index.ts | 11 |
4 files changed, 246 insertions, 161 deletions
diff --git a/src/server/ActionUtilities.ts b/src/server/ActionUtilities.ts index 950fba093..3125f8683 100644 --- a/src/server/ActionUtilities.ts +++ b/src/server/ActionUtilities.ts @@ -118,6 +118,10 @@ export namespace Email { } }); + export async function dispatchAll(recipients: string[], subject: string, content: string) { + return Promise.all(recipients.map((recipient: string) => Email.dispatch(recipient, subject, content))); + } + export async function dispatch(recipient: string, subject: string, content: string): Promise<boolean> { const mailOptions = { to: recipient, diff --git a/src/server/Session/session.ts b/src/server/Session/session.ts index 1ff4ce4de..2ff4ef0d2 100644 --- a/src/server/Session/session.ts +++ b/src/server/Session/session.ts @@ -13,162 +13,232 @@ import { configurationSchema } from "./session_config_schema"; const onWindows = process.platform === "win32"; +/** + * This namespace relies on NodeJS's cluster module, which allows a parent (master) process to share + * code with its children (workers). A simple `isMaster` flag indicates who is trying to access + * the code, and thus determines the functionality that actually gets invoked (checked by the caller, not internally). + * + * Think of the master thread as a factory, and the workers as the helpers that actually run the server. + * + * So, when we run `npm start`, given the appropriate check, initializeMaster() is called in the parent process + * This will spawn off its own child process (by default, mirrors the execution path of its parent), + * in which initializeWorker() is invoked. + */ export namespace Session { - export let key: string; - let activeWorker: Worker; - let listening = false; - - function loadConfiguration() { - try { - const raw = readFileSync('./session.config.json', 'utf8'); - const configuration = JSON.parse(raw); - const options = { - throwError: true, - allowUnknownAttributes: false - }; - validate(configuration, configurationSchema, options); - configuration.masterIdentifier = `${yellow(configuration.masterIdentifier)}:`; - configuration.workerIdentifier = `${magenta(configuration.workerIdentifier)}:`; - return configuration; - } catch (error) { - console.log(red("\nSession configuration failed.")); - if (error instanceof ValidationError) { - console.log("The given session.config.json configuration file is invalid."); - console.log(`${error.instance}: ${error.stack}`); - } else if (error.code === "ENOENT" && error.path === "./session.config.json") { - console.log("Please include a session.config.json configuration file in your project root."); - } else { - console.log(error.stack); + /** + * Validates and reads the configuration file, accordingly builds a child process factory + * and spawns off an initial process that will respawn as predecessors die. + */ + export async function initializeMaster() { + let activeWorker: Worker; + + // read in configuration .json file only once, in the master thread + // pass down any variables the pertinent to the child processes as environment variables + const { + masterIdentifier, + workerIdentifier, + recipients, + signature, + heartbeat, + showServerOutput, + pollingIntervalSeconds + } = function loadConfiguration() { + try { + const raw = readFileSync('./session.config.json', 'utf8'); + const configuration = JSON.parse(raw); + const options = { + throwError: true, + allowUnknownAttributes: false + }; + // ensure all necessary and no excess information is specified by the configuration file + validate(configuration, configurationSchema, options); + configuration.masterIdentifier = `${yellow(configuration.masterIdentifier)}:`; + configuration.workerIdentifier = `${magenta(configuration.workerIdentifier)}:`; + return configuration; + } catch (error) { + console.log(red("\nSession configuration failed.")); + if (error instanceof ValidationError) { + console.log("The given session.config.json configuration file is invalid."); + console.log(`${error.instance}: ${error.stack}`); + } else if (error.code === "ENOENT" && error.path === "./session.config.json") { + console.log("Please include a session.config.json configuration file in your project root."); + } else { + console.log("The following unknown error occurred during configuration."); + console.log(error.stack); + } + console.log(); + process.exit(0); } - console.log(); - process.exit(0); - } - } + }(); - export async function email(recipients: string[], subject: string, content: string) { - return Promise.all(recipients.map((recipient: string) => Email.dispatch(recipient, subject, content))); - } + // this sends a random guid to the configuration's recipients, allowing them alone + // to kill the server via the /kill/:password route + const key = Utils.GenerateGuid(); + const timestamp = new Date().toUTCString(); + const content = `The key for this session (started @ ${timestamp}) is ${key}.\n\n${signature}`; + await Email.dispatchAll(recipients, "Server Termination Key", content); - function tryKillActiveWorker() { - if (activeWorker && !activeWorker.isDead()) { - activeWorker.process.kill(); - return true; - } - return false; - } + console.log(masterIdentifier, "distributed session key to recipients"); - async function activeExit(error: Error) { - if (!listening) { - return; - } - listening = false; - process.send?.({ - action: { - message: "notify_crash", - args: { error } + // handle exceptions in the master thread - there shouldn't be many of these + // the IPC (inter process communication) channel closed exception can't seem to be caught in a try catch, and is inconsequential, so it is ignored + process.on("uncaughtException", ({ message, stack }) => { + if (message !== "Channel closed") { + console.log(masterIdentifier, red(message)); + if (stack) { + console.log(masterIdentifier, `\n${red(stack)}`); + } } }); - const { _socket } = WebSocket; - if (_socket) { - Utils.Emit(_socket, MessageStore.ConnectionTerminated, "Manual"); - } - process.send?.({ lifecycle: red(`Crash event detected @ ${new Date().toUTCString()}`) }); - process.send?.({ lifecycle: red(error.message) }); - process.exit(1); - } - export async function initialize(work: Function) { - if (isMaster) { - const { - masterIdentifier, - workerIdentifier, - recipients, - signature, + // determines whether or not we see the compilation / initialization / runtime output of each child server process + setupMaster({ silent: !showServerOutput }); + + // attempts to kills the active worker ungracefully + const tryKillActiveWorker = () => { + if (activeWorker && !activeWorker.isDead()) { + activeWorker.process.kill(); + return true; + } + return false; + }; + + // kills the current active worker and proceeds to spawn a new worker, feeding in configuration information as environment variables + const spawn = () => { + tryKillActiveWorker(); + activeWorker = fork({ heartbeat, - silentChildren - } = loadConfiguration(); - await (async function distributeKey() { - key = Utils.GenerateGuid(); - const timestamp = new Date().toUTCString(); - const content = `The key for this session (started @ ${timestamp}) is ${key}.\n\n${signature}`; - return email(recipients, "Server Termination Key", content); - })(); - console.log(masterIdentifier, "distributed session key to recipients"); - process.on("uncaughtException", ({ message, stack }) => { - if (message !== "Channel closed") { - console.log(masterIdentifier, red(message)); - if (stack) { - console.log(masterIdentifier, `\n${red(stack)}`); + pollingIntervalSeconds, + session_key: key + }); + console.log(masterIdentifier, `spawned new server worker with process id ${activeWorker.process.pid}`); + // an IPC message handler that executes actions on the master thread when prompted by the active worker + activeWorker.on("message", ({ lifecycle, action }) => { + if (action) { + const { message, args } = action; + console.log(`${workerIdentifier} action requested (${cyan(message)})`); + switch (message) { + case "kill": + console.log(masterIdentifier, red("An authorized user has ended the server session from the /kill route")); + tryKillActiveWorker(); + process.exit(0); + case "notify_crash": + const { error: { name, message, stack } } = args; + const content = [ + "You, as a Dash Administrator, are being notified of a server crash event. Here's what we know:", + `name:\n${name}`, + `message:\n${message}`, + `stack:\n${stack}`, + "The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress.", + signature + ].join("\n\n"); + Email.dispatchAll(recipients, "Dash Web Server Crash", content); } + } else if (lifecycle) { + console.log(`${workerIdentifier} lifecycle phase (${lifecycle})`); } }); - setupMaster({ silent: silentChildren }); - const spawn = () => { - tryKillActiveWorker(); - activeWorker = fork({ heartbeat, session_key: key }); - console.log(masterIdentifier, `spawned new server worker with process id ${activeWorker.process.pid}`); - activeWorker.on("message", ({ lifecycle, action }) => { - if (action) { - const { message, args } = action; - console.log(`${workerIdentifier} action requested (${cyan(message)})`); - switch (message) { - case "kill": - console.log(masterIdentifier, red("An authorized user has ended the server session from the /kill route")); - tryKillActiveWorker(); - process.exit(0); - case "notify_crash": - const { error: { name, message, stack } } = args; - email(recipients, "Dash Web Server Crash", [ - "You, as a Dash Administrator, are being notified of a server crash event. Here's what we know:", - `name:\n${name}`, - `message:\n${message}`, - `stack:\n${stack}`, - "The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress.", - signature - ].join("\n\n")); - } - } else if (lifecycle) { - console.log(`${workerIdentifier} lifecycle phase (${lifecycle})`); - } - }); - }; + }; + + // a helpful cluster event called on the master thread each time a child process exits + on("exit", ({ process: { pid } }, code, signal) => { + const prompt = `Server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`; + console.log(masterIdentifier, cyan(prompt)); + // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one spawn(); - on("exit", ({ process: { pid } }, code, signal) => { - const prompt = `Server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`; - console.log(masterIdentifier, cyan(prompt)); - spawn(); - }); - const { registerCommand } = new Repl({ identifier: masterIdentifier }); - registerCommand("exit", [], () => execSync(onWindows ? "taskkill /f /im node.exe" : "killall -9 node")); - registerCommand("pull", [], () => execSync("git pull", { stdio: ["ignore", "inherit", "inherit"] })); - registerCommand("restart", [], () => { - listening = false; - tryKillActiveWorker(); + }); + + // builds the repl that allows the following commands to be typed into stdin of the master thread + const { registerCommand } = new Repl({ identifier: masterIdentifier }); + registerCommand("exit", [], () => execSync(onWindows ? "taskkill /f /im node.exe" : "killall -9 node")); + registerCommand("pull", [], () => execSync("git pull", { stdio: ["ignore", "inherit", "inherit"] })); + registerCommand("restart", [], () => { + // indicate to the worker that we are 'expecting' this restart + activeWorker.send({ setListening: false }); + tryKillActiveWorker(); + }); + + // finally, set things in motion by spawning off the first child (server) process + spawn(); + } + + /** + * Effectively, each worker repairs the connection to the server by reintroducing a consistent state + * if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification + * email if the server encounters an uncaught exception or if the server cannot be reached. + * @param work the function specifying the work to be done by each worker thread + */ + export async function initializeWorker(work: Function) { + let listening = false; + + // notify master thread (which will log update in the console) of initialization via IPC + process.send?.({ lifecycle: green("initializing...") }); + + // updates the local value of listening to the value sent from master + process.on("message", ({ setListening }) => listening = setListening); + + // called whenever the process has a reason to terminate, either through an uncaught exception + // in the process (potentially inconsistent state) or the server cannot be reached + const activeExit = (error: Error) => { + if (!listening) { + return; + } + listening = false; + // communicates via IPC to the master thread that it should dispatch a crash notification email + process.send?.({ + action: { + message: "notify_crash", + args: { error } + } }); - } else { - process.send?.({ lifecycle: green("initializing...") }); - process.on('uncaughtException', activeExit); - const checkHeartbeat = async () => { - await new Promise<void>(resolve => { - setTimeout(async () => { - try { - await get(process.env.heartbeat!); - if (!listening) { - process.send?.({ lifecycle: green("listening...") }); - } - listening = true; - resolve(); - } catch (error) { - await activeExit(error); + const { _socket } = WebSocket; + // notifies all client users of a crash event + if (_socket) { + Utils.Emit(_socket, MessageStore.ConnectionTerminated, "Manual"); + } + // notify master thread (which will log update in the console) of crash event via IPC + process.send?.({ lifecycle: red(`Crash event detected @ ${new Date().toUTCString()}`) }); + process.send?.({ lifecycle: red(error.message) }); + process.exit(1); + }; + + // one reason to exit, as the process might be in an inconsistent state after such an exception + process.on('uncaughtException', activeExit); + + const { pollingIntervalSeconds, heartbeat } = process.env; + + // this monitors the health of the server by submitting a get request to whatever port / route specified + // by the configuration every n seconds, where n is also given by the configuration. + const checkHeartbeat = async () => { + await new Promise<void>(resolve => { + setTimeout(async () => { + try { + await get(heartbeat!); + if (!listening) { + // notify master thread (which will log update in the console) via IPC that the server is up and running + process.send?.({ lifecycle: green("listening...") }); } - }, 1000 * 15); - }); - checkHeartbeat(); - }; - work(); + listening = true; + resolve(); + } catch (error) { + // if we expect the server to be unavailable, i.e. during compilation, + // the listening variable is false, activeExit will return early and the child + // process will continue + activeExit(error); + } + }, 1000 * Number(pollingIntervalSeconds)); + }); + // controlled, asynchronous infinite recursion achieves a persistent poll that does not submit a new request until the previous has completed checkHeartbeat(); - } + }; + + // the actual work of the process, may be asynchronous + // for Dash, this is the code that launches the server + work(); + + // begin polling + checkHeartbeat(); } }
\ No newline at end of file diff --git a/src/server/Session/session_config_schema.ts b/src/server/Session/session_config_schema.ts index 25d95c243..a5010055a 100644 --- a/src/server/Session/session_config_schema.ts +++ b/src/server/Session/session_config_schema.ts @@ -1,25 +1,31 @@ import { Schema } from "jsonschema"; -export const configurationSchema: Schema = { - id: "/Configuration", - type: "object", - properties: { - recipients: { - type: "array", - items: { - type: "string", - pattern: /[^\@]+\@[^\@]+/g - }, - minLength: 1 - }, - heartbeat: { +const emailPattern = /^(([a-zA-Z0-9_.-])+@([a-zA-Z0-9_.-])+\.([a-zA-Z])+([a-zA-Z])+)?$/g; +const localPortPattern = /http\:\/\/localhost:\d+\/[a-zA-Z]+/g; + +const properties = { + recipients: { + type: "array", + items: { type: "string", - pattern: /http\:\/\/localhost:\d+\/[a-zA-Z]+/g + pattern: emailPattern }, - signature: { type: "string" }, - masterIdentifier: { type: "string", minLength: 1 }, - workerIdentifier: { type: "string", minLength: 1 }, - silentChildren: { type: "boolean" } + minLength: 1 }, - required: ["heartbeat", "recipients", "signature", "masterIdentifier", "workerIdentifier", "silentChildren"] + heartbeat: { + type: "string", + pattern: localPortPattern + }, + signature: { type: "string" }, + masterIdentifier: { type: "string", minLength: 1 }, + workerIdentifier: { type: "string", minLength: 1 }, + showServerOutput: { type: "boolean" }, + pollingIntervalSeconds: { type: "number", minimum: 1, maximum: 86400 } +}; + +export const configurationSchema: Schema = { + id: "/configuration", + type: "object", + properties, + required: Object.keys(properties) };
\ No newline at end of file diff --git a/src/server/index.ts b/src/server/index.ts index 381496c20..0a5b4afae 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -24,6 +24,13 @@ import GooglePhotosManager from "./ApiManagers/GooglePhotosManager"; import { Logger } from "./ProcessFactory"; import { yellow } from "colors"; import { Session } from "./Session/session"; +import { isMaster } from "cluster"; + +if (isMaster) { + Session.initializeMaster(); +} else { + Session.initializeWorker(launch); +} export const publicDirectory = path.resolve(__dirname, "public"); export const filesDirectory = path.resolve(publicDirectory, "files"); @@ -133,6 +140,4 @@ async function launch() { action: preliminaryFunctions }); await initializeServer({ serverPort: 1050, routeSetter }); -} - -Session.initialize(launch);
\ No newline at end of file +}
\ No newline at end of file |