diff options
Diffstat (limited to 'src/server/Session')
-rw-r--r-- | src/server/Session/session.ts | 125 |
1 files changed, 91 insertions, 34 deletions
diff --git a/src/server/Session/session.ts b/src/server/Session/session.ts index 2a483fbab..b22b6404d 100644 --- a/src/server/Session/session.ts +++ b/src/server/Session/session.ts @@ -1,5 +1,5 @@ import { red, cyan, green, yellow, magenta, blue } from "colors"; -import { on, fork, setupMaster, Worker } from "cluster"; +import { on, fork, setupMaster, Worker, isMaster } from "cluster"; import { get } from "request-promise"; import { Utils } from "../../Utils"; import Repl, { ReplAction } from "../repl"; @@ -20,6 +20,51 @@ import { configurationSchema } from "./session_config_schema"; */ export namespace Session { + export abstract class AppliedSessionAgent { + + private launched = false; + + protected sessionMonitorRef: Session.Monitor | undefined; + public get sessionMonitor(): Session.Monitor { + if (!isMaster) { + throw new Error("Cannot access the session monitor directly from the server worker thread"); + } + return this.sessionMonitorRef!; + } + public set sessionMonitor(monitor: Session.Monitor) { + if (!isMaster) { + throw new Error("Cannot set the session monitor directly from the server worker thread"); + } + this.sessionMonitorRef = monitor; + } + + protected serverWorkerRef: Session.ServerWorker | undefined; + public get serverWorker(): Session.ServerWorker { + if (isMaster) { + throw new Error("Cannot access the server worker directly from the session monitor thread"); + } + return this.serverWorkerRef!; + } + public set serverWorker(worker: Session.ServerWorker) { + if (isMaster) { + throw new Error("Cannot set the server worker directly from the session monitor thread"); + } + this.serverWorkerRef = worker; + } + + public async launch(): Promise<void> { + if (!this.launched) { + this.launched = true; + await this.launchImplementation(); + } else { + throw new Error("Cannot launch a session thread more than once per process."); + } + } + + protected abstract async launchImplementation(): Promise<void>; + + } + interface Configuration { showServerOutput: boolean; masterIdentifier: string; @@ -41,12 +86,21 @@ export namespace Session { pollingFailureTolerance: 1 }; - interface MasterExtensions { + export interface Monitor { + log: (...optionalParams: any[]) => void; + restartServer: () => void; + setPort: (port: "server" | "socket" | string, value: number, immediateRestart: boolean) => void; + killSession: (graceful?: boolean) => never; addReplCommand: (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => void; addChildMessageHandler: (message: string, handler: ActionHandler) => void; } - export interface NotifierHooks { + export interface ServerWorker { + killSession: () => void; + addExitHandler: (handler: ExitHandler) => void; + } + + export interface MonitorNotifierHooks { key?: (key: string, masterLog: (...optionalParams: any[]) => void) => boolean | Promise<boolean>; crash?: (error: Error, masterLog: (...optionalParams: any[]) => void) => boolean | Promise<boolean>; } @@ -56,14 +110,8 @@ export namespace Session { args: any; } - export interface SessionHooks { - masterLog: (...optionalParams: any[]) => void; - killSession: (graceful?: boolean) => never; - restartServer: () => void; - } - - export type ExitHandler = (error: Error) => void | Promise<void>; - export type ActionHandler = (action: SessionAction, hooks: SessionHooks) => void | Promise<void>; + export type ExitHandler = (reason: Error | null) => void | Promise<void>; + export type ActionHandler = (action: SessionAction) => void | Promise<void>; export interface EmailTemplate { subject: string; body: string; @@ -125,7 +173,7 @@ export namespace Session { * Validates and reads the configuration file, accordingly builds a child process factory * and spawns off an initial process that will respawn as predecessors die. */ - export async function initializeMonitorThread(notifiers?: NotifierHooks): Promise<MasterExtensions> { + export async function initializeMonitorThread(notifiers?: MonitorNotifierHooks): Promise<Monitor> { console.log(timestamp(), cyan("initializing session...")); let activeWorker: Worker; const childMessageHandlers: { [message: string]: ActionHandler } = {}; @@ -143,26 +191,26 @@ export namespace Session { } = configuration; let { pollingIntervalSeconds } = configuration; - const masterLog = (...optionalParams: any[]) => console.log(timestamp(), masterIdentifier, ...optionalParams); + const log = (...optionalParams: any[]) => console.log(timestamp(), masterIdentifier, ...optionalParams); // this sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone // to kill the server via the /kill/:key route let key: string | undefined; if (notifiers && notifiers.key) { key = Utils.GenerateGuid(); - const success = await notifiers.key(key, masterLog); + const success = await notifiers.key(key, log); const statement = success ? green("distributed session key to recipients") : red("distribution of session key failed"); - masterLog(statement); + log(statement); } // handle exceptions in the master thread - there shouldn't be many of these // the IPC (inter process communication) channel closed exception can't seem // to be caught in a try catch, and is inconsequential, so it is ignored - process.on("uncaughtException", ({ message, stack }) => { + process.on("uncaughtException", ({ message, stack }): void => { if (message !== "Channel closed") { - masterLog(red(message)); + log(red(message)); if (stack) { - masterLog(`uncaught exception\n${red(stack)}`); + log(`uncaught exception\n${red(stack)}`); } } }); @@ -183,26 +231,26 @@ export namespace Session { return false; }; - const restartServer = () => { + const restartServer = (): void => { // indicate to the worker that we are 'expecting' this restart activeWorker.send({ setResponsiveness: false }); tryKillActiveWorker(true); }; - const killSession = (graceful = true) => { - masterLog(cyan(`exiting session ${graceful ? "clean" : "immediate"}ly`)); + const killSession = (graceful = true): never => { + log(cyan(`exiting session ${graceful ? "clean" : "immediate"}ly`)); tryKillActiveWorker(graceful); process.exit(0); }; - const setPort = (port: string, value: number, immediateRestart: boolean) => { + const setPort = (port: "server" | "socket" | string, value: number, immediateRestart: boolean): void => { if (value > 1023 && value < 65536) { ports[port] = value; if (immediateRestart) { restartServer(); } } else { - masterLog(red(`${port} is an invalid port number`)); + log(red(`${port} is an invalid port number`)); } }; @@ -218,7 +266,7 @@ export namespace Session { pollingIntervalSeconds, session_key: key }); - masterLog(cyan(`spawned new server worker with process id ${activeWorker.process.pid}`)); + log(cyan(`spawned new server worker with process id ${activeWorker.process.pid}`)); // an IPC message handler that executes actions on the master thread when prompted by the active worker activeWorker.on("message", async ({ lifecycle, action }) => { if (action) { @@ -226,14 +274,14 @@ export namespace Session { console.log(timestamp(), `${workerIdentifier} action requested (${cyan(message)})`); switch (message) { case "kill": - masterLog(red("an authorized user has manually ended the server session")); + log(red("an authorized user has manually ended the server session")); killSession(); case "notify_crash": if (notifiers && notifiers.crash) { const { error } = args; - const success = await notifiers.crash(error, masterLog); + const success = await notifiers.crash(error, log); const statement = success ? green("distributed crash notification to recipients") : red("distribution of crash notification failed"); - masterLog(statement); + log(statement); } case "set_port": const { port, value, immediateRestart } = args; @@ -241,7 +289,7 @@ export namespace Session { default: const handler = childMessageHandlers[message]; if (handler) { - handler({ message, args }, { restartServer, killSession, masterLog }); + handler({ message, args }); } } } else if (lifecycle) { @@ -253,7 +301,7 @@ export namespace Session { // a helpful cluster event called on the master thread each time a child process exits on("exit", ({ process: { pid } }, code, signal) => { const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`; - masterLog(cyan(prompt)); + log(cyan(prompt)); // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one spawn(); }); @@ -269,7 +317,7 @@ export namespace Session { repl.registerCommand("set", [/polling/, number, boolean], args => { const newPollingIntervalSeconds = Math.floor(Number(args[2])); if (newPollingIntervalSeconds < 0) { - masterLog(red("the polling interval must be a non-negative integer")); + log(red("the polling interval must be a non-negative integer")); } else { if (newPollingIntervalSeconds !== pollingIntervalSeconds) { pollingIntervalSeconds = newPollingIntervalSeconds; @@ -285,7 +333,11 @@ export namespace Session { // returned to allow the caller to add custom commands return { addReplCommand: repl.registerCommand, - addChildMessageHandler: (message: string, handler: ActionHandler) => { childMessageHandlers[message] = handler; } + addChildMessageHandler: (message: string, handler: ActionHandler) => { childMessageHandlers[message] = handler; }, + restartServer, + killSession, + setPort, + log }; } @@ -295,7 +347,7 @@ export namespace Session { * email if the server encounters an uncaught exception or if the server cannot be reached. * @param work the function specifying the work to be done by each worker thread */ - export async function initializeWorkerThread(work: Function): Promise<(handler: ExitHandler) => void> { + export async function initializeWorkerThread(work: Function): Promise<ServerWorker> { let shouldServerBeResponsive = false; const exitHandlers: ExitHandler[] = []; let pollingFailureCount = 0; @@ -315,6 +367,8 @@ export namespace Session { } }); + const executeExitHandlers = async (reason: Error | null) => Promise.all(exitHandlers.map(handler => handler(reason))); + // called whenever the process has a reason to terminate, either through an uncaught exception // in the process (potentially inconsistent state) or the server cannot be reached const activeExit = async (error: Error): Promise<void> => { @@ -326,7 +380,7 @@ export namespace Session { args: { error } } }); - await Promise.all(exitHandlers.map(handler => handler(error))); + await executeExitHandlers(error); // notify master thread (which will log update in the console) of crash event via IPC lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`)); lifecycleNotification(red(error.message)); @@ -375,7 +429,10 @@ export namespace Session { work(); pollServer(); // begin polling - return (handler: ExitHandler) => exitHandlers.push(handler); + return { + addExitHandler: (handler: ExitHandler) => exitHandlers.push(handler), + killSession: () => process.send!({ action: { message: "kill" } }) + }; } }
\ No newline at end of file |