diff options
author | Sam Wilkins <samwilkins333@gmail.com> | 2020-01-08 02:42:42 -0500 |
---|---|---|
committer | Sam Wilkins <samwilkins333@gmail.com> | 2020-01-08 02:42:42 -0500 |
commit | 1bc1ed8c5c31a0d3126d27434c690b8d744971ba (patch) | |
tree | 9f8a0ab39647edbd69247a47640d3ac4dfc2553c /src | |
parent | 7d6dd8470384d345771ed67c88c9b15e2b333a6b (diff) |
final session changes
Diffstat (limited to 'src')
-rw-r--r-- | src/server/DashSession.ts | 61 | ||||
-rw-r--r-- | src/server/Session/session.ts | 125 | ||||
-rw-r--r-- | src/server/index.ts | 57 |
3 files changed, 157 insertions, 86 deletions
diff --git a/src/server/DashSession.ts b/src/server/DashSession.ts new file mode 100644 index 000000000..9c36fa17f --- /dev/null +++ b/src/server/DashSession.ts @@ -0,0 +1,61 @@ +import { Session } from "./Session/session"; +import { Email } from "./ActionUtilities"; +import { red, yellow } from "colors"; +import { SolrManager } from "./ApiManagers/SearchManager"; +import { execSync } from "child_process"; +import { isMaster } from "cluster"; +import { Utils } from "../Utils"; +import { WebSocket } from "./Websocket/Websocket"; +import { MessageStore } from "./Message"; +import { launchServer } from "."; + +const notificationRecipients = ["samuel_wilkins@brown.edu"]; +const signature = "-Dash Server Session Manager"; + +const monitorHooks: Session.MonitorNotifierHooks = { + key: async (key, masterLog) => { + const content = `The key for this session (started @ ${new Date().toUTCString()}) is ${key}.\n\n${signature}`; + const failures = await Email.dispatchAll(notificationRecipients, "Server Termination Key", content); + if (failures) { + failures.map(({ recipient, error: { message } }) => masterLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`))); + return false; + } + return true; + }, + crash: async ({ name, message, stack }, masterLog) => { + const body = [ + "You, as a Dash Administrator, are being notified of a server crash event. Here's what we know:", + `name:\n${name}`, + `message:\n${message}`, + `stack:\n${stack}`, + "The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress.", + ].join("\n\n"); + const content = `${body}\n\n${signature}`; + const failures = await Email.dispatchAll(notificationRecipients, "Dash Web Server Crash", content); + if (failures) { + failures.map(({ recipient, error: { message } }) => masterLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`))); + return false; + } + return true; + } +}; + +export class DashSessionAgent extends Session.AppliedSessionAgent { + + /** + * If we're the monitor (master) thread, we should launch the monitor logic for the session. + * Otherwise, we must be on a worker thread that was spawned *by* the monitor (master) thread, and thus + * our job should be to run the server. + */ + protected async launchImplementation() { + if (isMaster) { + this.sessionMonitor = await Session.initializeMonitorThread(monitorHooks); + this.sessionMonitor.addReplCommand("pull", [], () => execSync("git pull", { stdio: ["ignore", "inherit", "inherit"] })); + this.sessionMonitor.addReplCommand("solr", [/start|stop/g], args => SolrManager.SetRunning(args[0] === "start")); + } else { + this.serverWorker = await Session.initializeWorkerThread(launchServer); // server initialization delegated to worker + this.serverWorker.addExitHandler(() => Utils.Emit(WebSocket._socket, MessageStore.ConnectionTerminated, "Manual")); + } + } + +}
\ No newline at end of file diff --git a/src/server/Session/session.ts b/src/server/Session/session.ts index 2a483fbab..b22b6404d 100644 --- a/src/server/Session/session.ts +++ b/src/server/Session/session.ts @@ -1,5 +1,5 @@ import { red, cyan, green, yellow, magenta, blue } from "colors"; -import { on, fork, setupMaster, Worker } from "cluster"; +import { on, fork, setupMaster, Worker, isMaster } from "cluster"; import { get } from "request-promise"; import { Utils } from "../../Utils"; import Repl, { ReplAction } from "../repl"; @@ -20,6 +20,51 @@ import { configurationSchema } from "./session_config_schema"; */ export namespace Session { + export abstract class AppliedSessionAgent { + + private launched = false; + + protected sessionMonitorRef: Session.Monitor | undefined; + public get sessionMonitor(): Session.Monitor { + if (!isMaster) { + throw new Error("Cannot access the session monitor directly from the server worker thread"); + } + return this.sessionMonitorRef!; + } + public set sessionMonitor(monitor: Session.Monitor) { + if (!isMaster) { + throw new Error("Cannot set the session monitor directly from the server worker thread"); + } + this.sessionMonitorRef = monitor; + } + + protected serverWorkerRef: Session.ServerWorker | undefined; + public get serverWorker(): Session.ServerWorker { + if (isMaster) { + throw new Error("Cannot access the server worker directly from the session monitor thread"); + } + return this.serverWorkerRef!; + } + public set serverWorker(worker: Session.ServerWorker) { + if (isMaster) { + throw new Error("Cannot set the server worker directly from the session monitor thread"); + } + this.serverWorkerRef = worker; + } + + public async launch(): Promise<void> { + if (!this.launched) { + this.launched = true; + await this.launchImplementation(); + } else { + throw new Error("Cannot launch a session thread more than once per process."); + } + } + + protected abstract async launchImplementation(): Promise<void>; + + } + interface Configuration { showServerOutput: boolean; masterIdentifier: string; @@ -41,12 +86,21 @@ export namespace Session { pollingFailureTolerance: 1 }; - interface MasterExtensions { + export interface Monitor { + log: (...optionalParams: any[]) => void; + restartServer: () => void; + setPort: (port: "server" | "socket" | string, value: number, immediateRestart: boolean) => void; + killSession: (graceful?: boolean) => never; addReplCommand: (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => void; addChildMessageHandler: (message: string, handler: ActionHandler) => void; } - export interface NotifierHooks { + export interface ServerWorker { + killSession: () => void; + addExitHandler: (handler: ExitHandler) => void; + } + + export interface MonitorNotifierHooks { key?: (key: string, masterLog: (...optionalParams: any[]) => void) => boolean | Promise<boolean>; crash?: (error: Error, masterLog: (...optionalParams: any[]) => void) => boolean | Promise<boolean>; } @@ -56,14 +110,8 @@ export namespace Session { args: any; } - export interface SessionHooks { - masterLog: (...optionalParams: any[]) => void; - killSession: (graceful?: boolean) => never; - restartServer: () => void; - } - - export type ExitHandler = (error: Error) => void | Promise<void>; - export type ActionHandler = (action: SessionAction, hooks: SessionHooks) => void | Promise<void>; + export type ExitHandler = (reason: Error | null) => void | Promise<void>; + export type ActionHandler = (action: SessionAction) => void | Promise<void>; export interface EmailTemplate { subject: string; body: string; @@ -125,7 +173,7 @@ export namespace Session { * Validates and reads the configuration file, accordingly builds a child process factory * and spawns off an initial process that will respawn as predecessors die. */ - export async function initializeMonitorThread(notifiers?: NotifierHooks): Promise<MasterExtensions> { + export async function initializeMonitorThread(notifiers?: MonitorNotifierHooks): Promise<Monitor> { console.log(timestamp(), cyan("initializing session...")); let activeWorker: Worker; const childMessageHandlers: { [message: string]: ActionHandler } = {}; @@ -143,26 +191,26 @@ export namespace Session { } = configuration; let { pollingIntervalSeconds } = configuration; - const masterLog = (...optionalParams: any[]) => console.log(timestamp(), masterIdentifier, ...optionalParams); + const log = (...optionalParams: any[]) => console.log(timestamp(), masterIdentifier, ...optionalParams); // this sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone // to kill the server via the /kill/:key route let key: string | undefined; if (notifiers && notifiers.key) { key = Utils.GenerateGuid(); - const success = await notifiers.key(key, masterLog); + const success = await notifiers.key(key, log); const statement = success ? green("distributed session key to recipients") : red("distribution of session key failed"); - masterLog(statement); + log(statement); } // handle exceptions in the master thread - there shouldn't be many of these // the IPC (inter process communication) channel closed exception can't seem // to be caught in a try catch, and is inconsequential, so it is ignored - process.on("uncaughtException", ({ message, stack }) => { + process.on("uncaughtException", ({ message, stack }): void => { if (message !== "Channel closed") { - masterLog(red(message)); + log(red(message)); if (stack) { - masterLog(`uncaught exception\n${red(stack)}`); + log(`uncaught exception\n${red(stack)}`); } } }); @@ -183,26 +231,26 @@ export namespace Session { return false; }; - const restartServer = () => { + const restartServer = (): void => { // indicate to the worker that we are 'expecting' this restart activeWorker.send({ setResponsiveness: false }); tryKillActiveWorker(true); }; - const killSession = (graceful = true) => { - masterLog(cyan(`exiting session ${graceful ? "clean" : "immediate"}ly`)); + const killSession = (graceful = true): never => { + log(cyan(`exiting session ${graceful ? "clean" : "immediate"}ly`)); tryKillActiveWorker(graceful); process.exit(0); }; - const setPort = (port: string, value: number, immediateRestart: boolean) => { + const setPort = (port: "server" | "socket" | string, value: number, immediateRestart: boolean): void => { if (value > 1023 && value < 65536) { ports[port] = value; if (immediateRestart) { restartServer(); } } else { - masterLog(red(`${port} is an invalid port number`)); + log(red(`${port} is an invalid port number`)); } }; @@ -218,7 +266,7 @@ export namespace Session { pollingIntervalSeconds, session_key: key }); - masterLog(cyan(`spawned new server worker with process id ${activeWorker.process.pid}`)); + log(cyan(`spawned new server worker with process id ${activeWorker.process.pid}`)); // an IPC message handler that executes actions on the master thread when prompted by the active worker activeWorker.on("message", async ({ lifecycle, action }) => { if (action) { @@ -226,14 +274,14 @@ export namespace Session { console.log(timestamp(), `${workerIdentifier} action requested (${cyan(message)})`); switch (message) { case "kill": - masterLog(red("an authorized user has manually ended the server session")); + log(red("an authorized user has manually ended the server session")); killSession(); case "notify_crash": if (notifiers && notifiers.crash) { const { error } = args; - const success = await notifiers.crash(error, masterLog); + const success = await notifiers.crash(error, log); const statement = success ? green("distributed crash notification to recipients") : red("distribution of crash notification failed"); - masterLog(statement); + log(statement); } case "set_port": const { port, value, immediateRestart } = args; @@ -241,7 +289,7 @@ export namespace Session { default: const handler = childMessageHandlers[message]; if (handler) { - handler({ message, args }, { restartServer, killSession, masterLog }); + handler({ message, args }); } } } else if (lifecycle) { @@ -253,7 +301,7 @@ export namespace Session { // a helpful cluster event called on the master thread each time a child process exits on("exit", ({ process: { pid } }, code, signal) => { const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`; - masterLog(cyan(prompt)); + log(cyan(prompt)); // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one spawn(); }); @@ -269,7 +317,7 @@ export namespace Session { repl.registerCommand("set", [/polling/, number, boolean], args => { const newPollingIntervalSeconds = Math.floor(Number(args[2])); if (newPollingIntervalSeconds < 0) { - masterLog(red("the polling interval must be a non-negative integer")); + log(red("the polling interval must be a non-negative integer")); } else { if (newPollingIntervalSeconds !== pollingIntervalSeconds) { pollingIntervalSeconds = newPollingIntervalSeconds; @@ -285,7 +333,11 @@ export namespace Session { // returned to allow the caller to add custom commands return { addReplCommand: repl.registerCommand, - addChildMessageHandler: (message: string, handler: ActionHandler) => { childMessageHandlers[message] = handler; } + addChildMessageHandler: (message: string, handler: ActionHandler) => { childMessageHandlers[message] = handler; }, + restartServer, + killSession, + setPort, + log }; } @@ -295,7 +347,7 @@ export namespace Session { * email if the server encounters an uncaught exception or if the server cannot be reached. * @param work the function specifying the work to be done by each worker thread */ - export async function initializeWorkerThread(work: Function): Promise<(handler: ExitHandler) => void> { + export async function initializeWorkerThread(work: Function): Promise<ServerWorker> { let shouldServerBeResponsive = false; const exitHandlers: ExitHandler[] = []; let pollingFailureCount = 0; @@ -315,6 +367,8 @@ export namespace Session { } }); + const executeExitHandlers = async (reason: Error | null) => Promise.all(exitHandlers.map(handler => handler(reason))); + // called whenever the process has a reason to terminate, either through an uncaught exception // in the process (potentially inconsistent state) or the server cannot be reached const activeExit = async (error: Error): Promise<void> => { @@ -326,7 +380,7 @@ export namespace Session { args: { error } } }); - await Promise.all(exitHandlers.map(handler => handler(error))); + await executeExitHandlers(error); // notify master thread (which will log update in the console) of crash event via IPC lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`)); lifecycleNotification(red(error.message)); @@ -375,7 +429,10 @@ export namespace Session { work(); pollServer(); // begin polling - return (handler: ExitHandler) => exitHandlers.push(handler); + return { + addExitHandler: (handler: ExitHandler) => exitHandlers.push(handler), + killSession: () => process.send!({ action: { message: "kill" } }) + }; } }
\ No newline at end of file diff --git a/src/server/index.ts b/src/server/index.ts index 9bc3c7617..27e9a677b 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -3,7 +3,6 @@ import { GoogleApiServerUtils } from "./apis/google/GoogleApiServerUtils"; import * as mobileDetect from 'mobile-detect'; import * as path from 'path'; import { Database } from './database'; -const serverPort = 4321; import { DashUploadUtils } from './DashUploadUtils'; import RouteSubscriber from './RouteSubscriber'; import initializeServer from './server_initialization'; @@ -24,10 +23,7 @@ import GooglePhotosManager from "./ApiManagers/GooglePhotosManager"; import { Logger } from "./ProcessFactory"; import { yellow, red } from "colors"; import { Session } from "./Session/session"; -import { isMaster } from "cluster"; -import { execSync } from "child_process"; -import { Utils } from "../Utils"; -import { MessageStore } from "./Message"; +import { DashSessionAgent } from "./DashSession"; export const publicDirectory = path.resolve(__dirname, "public"); export const filesDirectory = path.resolve(publicDirectory, "files"); @@ -96,7 +92,7 @@ function routeSetter({ isRelease, addSupervisedRoute, logRegistrationOutcome }: secureHandler: ({ req, res }) => { if (req.params.key === process.env.session_key) { res.send("<img src='https://media.giphy.com/media/NGIfqtcS81qi4/giphy.gif' style='width:100%;height:100%;'/>"); - process.send!({ action: { message: "kill" } }); + sessionAgent.serverWorker.killSession(); } else { res.redirect("/home"); } @@ -136,7 +132,7 @@ function routeSetter({ isRelease, addSupervisedRoute, logRegistrationOutcome }: * however, this becomes the logic invoked by a single worker thread spawned by * the main monitor (master) thread. */ -async function launchServer() { +export async function launchServer() { await log_execution({ startMessage: "\nstarting execution of preliminary functions", endMessage: "completed preliminary functions\n", @@ -145,50 +141,7 @@ async function launchServer() { await initializeServer(routeSetter); } -/** - * If we're the monitor (master) thread, we should launch the monitor logic for the session. - * Otherwise, we must be on a worker thread that was spawned *by* the monitor (master) thread, and thus - * our job should be to run the server. - */ -async function launchMonitoredSession() { - if (isMaster) { - const notificationRecipients = ["samuel_wilkins@brown.edu"]; - const signature = "-Dash Server Session Manager"; - const extensions = await Session.initializeMonitorThread({ - key: async (key, masterLog) => { - const content = `The key for this session (started @ ${new Date().toUTCString()}) is ${key}.\n\n${signature}`; - const failures = await Email.dispatchAll(notificationRecipients, "Server Termination Key", content); - if (failures) { - failures.map(({ recipient, error: { message } }) => masterLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`))); - return false; - } - return true; - }, - crash: async ({ name, message, stack }, masterLog) => { - const body = [ - "You, as a Dash Administrator, are being notified of a server crash event. Here's what we know:", - `name:\n${name}`, - `message:\n${message}`, - `stack:\n${stack}`, - "The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress.", - ].join("\n\n"); - const content = `${body}\n\n${signature}`; - const failures = await Email.dispatchAll(notificationRecipients, "Dash Web Server Crash", content); - if (failures) { - failures.map(({ recipient, error: { message } }) => masterLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`))); - return false; - } - return true; - } - }); - extensions.addReplCommand("pull", [], () => execSync("git pull", { stdio: ["ignore", "inherit", "inherit"] })); - extensions.addReplCommand("solr", [/start|stop/g], args => SolrManager.SetRunning(args[0] === "start")); - } else { - const addExitHandler = await Session.initializeWorkerThread(launchServer); // server initialization delegated to worker - addExitHandler(() => Utils.Emit(WebSocket._socket, MessageStore.ConnectionTerminated, "Manual")); - } -} - +export const sessionAgent: Session.AppliedSessionAgent = new DashSessionAgent(); /** * If you're in development mode, you won't need to run a session. * The session spawns off new server processes each time an error is encountered, and doesn't @@ -196,7 +149,7 @@ async function launchMonitoredSession() { * So, the 'else' clause is exactly what we've always run when executing npm start. */ if (process.env.RELEASE) { - launchMonitoredSession(); + sessionAgent.launch(); } else { launchServer(); }
\ No newline at end of file |