aboutsummaryrefslogtreecommitdiff
path: root/src/server/Session
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/Session')
-rw-r--r--src/server/Session/session.ts125
1 files changed, 91 insertions, 34 deletions
diff --git a/src/server/Session/session.ts b/src/server/Session/session.ts
index 2a483fbab..b22b6404d 100644
--- a/src/server/Session/session.ts
+++ b/src/server/Session/session.ts
@@ -1,5 +1,5 @@
import { red, cyan, green, yellow, magenta, blue } from "colors";
-import { on, fork, setupMaster, Worker } from "cluster";
+import { on, fork, setupMaster, Worker, isMaster } from "cluster";
import { get } from "request-promise";
import { Utils } from "../../Utils";
import Repl, { ReplAction } from "../repl";
@@ -20,6 +20,51 @@ import { configurationSchema } from "./session_config_schema";
*/
export namespace Session {
+ export abstract class AppliedSessionAgent {
+
+ private launched = false;
+
+ protected sessionMonitorRef: Session.Monitor | undefined;
+ public get sessionMonitor(): Session.Monitor {
+ if (!isMaster) {
+ throw new Error("Cannot access the session monitor directly from the server worker thread");
+ }
+ return this.sessionMonitorRef!;
+ }
+ public set sessionMonitor(monitor: Session.Monitor) {
+ if (!isMaster) {
+ throw new Error("Cannot set the session monitor directly from the server worker thread");
+ }
+ this.sessionMonitorRef = monitor;
+ }
+
+ protected serverWorkerRef: Session.ServerWorker | undefined;
+ public get serverWorker(): Session.ServerWorker {
+ if (isMaster) {
+ throw new Error("Cannot access the server worker directly from the session monitor thread");
+ }
+ return this.serverWorkerRef!;
+ }
+ public set serverWorker(worker: Session.ServerWorker) {
+ if (isMaster) {
+ throw new Error("Cannot set the server worker directly from the session monitor thread");
+ }
+ this.serverWorkerRef = worker;
+ }
+
+ public async launch(): Promise<void> {
+ if (!this.launched) {
+ this.launched = true;
+ await this.launchImplementation();
+ } else {
+ throw new Error("Cannot launch a session thread more than once per process.");
+ }
+ }
+
+ protected abstract async launchImplementation(): Promise<void>;
+
+ }
+
interface Configuration {
showServerOutput: boolean;
masterIdentifier: string;
@@ -41,12 +86,21 @@ export namespace Session {
pollingFailureTolerance: 1
};
- interface MasterExtensions {
+ export interface Monitor {
+ log: (...optionalParams: any[]) => void;
+ restartServer: () => void;
+ setPort: (port: "server" | "socket" | string, value: number, immediateRestart: boolean) => void;
+ killSession: (graceful?: boolean) => never;
addReplCommand: (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => void;
addChildMessageHandler: (message: string, handler: ActionHandler) => void;
}
- export interface NotifierHooks {
+ export interface ServerWorker {
+ killSession: () => void;
+ addExitHandler: (handler: ExitHandler) => void;
+ }
+
+ export interface MonitorNotifierHooks {
key?: (key: string, masterLog: (...optionalParams: any[]) => void) => boolean | Promise<boolean>;
crash?: (error: Error, masterLog: (...optionalParams: any[]) => void) => boolean | Promise<boolean>;
}
@@ -56,14 +110,8 @@ export namespace Session {
args: any;
}
- export interface SessionHooks {
- masterLog: (...optionalParams: any[]) => void;
- killSession: (graceful?: boolean) => never;
- restartServer: () => void;
- }
-
- export type ExitHandler = (error: Error) => void | Promise<void>;
- export type ActionHandler = (action: SessionAction, hooks: SessionHooks) => void | Promise<void>;
+ export type ExitHandler = (reason: Error | null) => void | Promise<void>;
+ export type ActionHandler = (action: SessionAction) => void | Promise<void>;
export interface EmailTemplate {
subject: string;
body: string;
@@ -125,7 +173,7 @@ export namespace Session {
* Validates and reads the configuration file, accordingly builds a child process factory
* and spawns off an initial process that will respawn as predecessors die.
*/
- export async function initializeMonitorThread(notifiers?: NotifierHooks): Promise<MasterExtensions> {
+ export async function initializeMonitorThread(notifiers?: MonitorNotifierHooks): Promise<Monitor> {
console.log(timestamp(), cyan("initializing session..."));
let activeWorker: Worker;
const childMessageHandlers: { [message: string]: ActionHandler } = {};
@@ -143,26 +191,26 @@ export namespace Session {
} = configuration;
let { pollingIntervalSeconds } = configuration;
- const masterLog = (...optionalParams: any[]) => console.log(timestamp(), masterIdentifier, ...optionalParams);
+ const log = (...optionalParams: any[]) => console.log(timestamp(), masterIdentifier, ...optionalParams);
// this sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone
// to kill the server via the /kill/:key route
let key: string | undefined;
if (notifiers && notifiers.key) {
key = Utils.GenerateGuid();
- const success = await notifiers.key(key, masterLog);
+ const success = await notifiers.key(key, log);
const statement = success ? green("distributed session key to recipients") : red("distribution of session key failed");
- masterLog(statement);
+ log(statement);
}
// handle exceptions in the master thread - there shouldn't be many of these
// the IPC (inter process communication) channel closed exception can't seem
// to be caught in a try catch, and is inconsequential, so it is ignored
- process.on("uncaughtException", ({ message, stack }) => {
+ process.on("uncaughtException", ({ message, stack }): void => {
if (message !== "Channel closed") {
- masterLog(red(message));
+ log(red(message));
if (stack) {
- masterLog(`uncaught exception\n${red(stack)}`);
+ log(`uncaught exception\n${red(stack)}`);
}
}
});
@@ -183,26 +231,26 @@ export namespace Session {
return false;
};
- const restartServer = () => {
+ const restartServer = (): void => {
// indicate to the worker that we are 'expecting' this restart
activeWorker.send({ setResponsiveness: false });
tryKillActiveWorker(true);
};
- const killSession = (graceful = true) => {
- masterLog(cyan(`exiting session ${graceful ? "clean" : "immediate"}ly`));
+ const killSession = (graceful = true): never => {
+ log(cyan(`exiting session ${graceful ? "clean" : "immediate"}ly`));
tryKillActiveWorker(graceful);
process.exit(0);
};
- const setPort = (port: string, value: number, immediateRestart: boolean) => {
+ const setPort = (port: "server" | "socket" | string, value: number, immediateRestart: boolean): void => {
if (value > 1023 && value < 65536) {
ports[port] = value;
if (immediateRestart) {
restartServer();
}
} else {
- masterLog(red(`${port} is an invalid port number`));
+ log(red(`${port} is an invalid port number`));
}
};
@@ -218,7 +266,7 @@ export namespace Session {
pollingIntervalSeconds,
session_key: key
});
- masterLog(cyan(`spawned new server worker with process id ${activeWorker.process.pid}`));
+ log(cyan(`spawned new server worker with process id ${activeWorker.process.pid}`));
// an IPC message handler that executes actions on the master thread when prompted by the active worker
activeWorker.on("message", async ({ lifecycle, action }) => {
if (action) {
@@ -226,14 +274,14 @@ export namespace Session {
console.log(timestamp(), `${workerIdentifier} action requested (${cyan(message)})`);
switch (message) {
case "kill":
- masterLog(red("an authorized user has manually ended the server session"));
+ log(red("an authorized user has manually ended the server session"));
killSession();
case "notify_crash":
if (notifiers && notifiers.crash) {
const { error } = args;
- const success = await notifiers.crash(error, masterLog);
+ const success = await notifiers.crash(error, log);
const statement = success ? green("distributed crash notification to recipients") : red("distribution of crash notification failed");
- masterLog(statement);
+ log(statement);
}
case "set_port":
const { port, value, immediateRestart } = args;
@@ -241,7 +289,7 @@ export namespace Session {
default:
const handler = childMessageHandlers[message];
if (handler) {
- handler({ message, args }, { restartServer, killSession, masterLog });
+ handler({ message, args });
}
}
} else if (lifecycle) {
@@ -253,7 +301,7 @@ export namespace Session {
// a helpful cluster event called on the master thread each time a child process exits
on("exit", ({ process: { pid } }, code, signal) => {
const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`;
- masterLog(cyan(prompt));
+ log(cyan(prompt));
// to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one
spawn();
});
@@ -269,7 +317,7 @@ export namespace Session {
repl.registerCommand("set", [/polling/, number, boolean], args => {
const newPollingIntervalSeconds = Math.floor(Number(args[2]));
if (newPollingIntervalSeconds < 0) {
- masterLog(red("the polling interval must be a non-negative integer"));
+ log(red("the polling interval must be a non-negative integer"));
} else {
if (newPollingIntervalSeconds !== pollingIntervalSeconds) {
pollingIntervalSeconds = newPollingIntervalSeconds;
@@ -285,7 +333,11 @@ export namespace Session {
// returned to allow the caller to add custom commands
return {
addReplCommand: repl.registerCommand,
- addChildMessageHandler: (message: string, handler: ActionHandler) => { childMessageHandlers[message] = handler; }
+ addChildMessageHandler: (message: string, handler: ActionHandler) => { childMessageHandlers[message] = handler; },
+ restartServer,
+ killSession,
+ setPort,
+ log
};
}
@@ -295,7 +347,7 @@ export namespace Session {
* email if the server encounters an uncaught exception or if the server cannot be reached.
* @param work the function specifying the work to be done by each worker thread
*/
- export async function initializeWorkerThread(work: Function): Promise<(handler: ExitHandler) => void> {
+ export async function initializeWorkerThread(work: Function): Promise<ServerWorker> {
let shouldServerBeResponsive = false;
const exitHandlers: ExitHandler[] = [];
let pollingFailureCount = 0;
@@ -315,6 +367,8 @@ export namespace Session {
}
});
+ const executeExitHandlers = async (reason: Error | null) => Promise.all(exitHandlers.map(handler => handler(reason)));
+
// called whenever the process has a reason to terminate, either through an uncaught exception
// in the process (potentially inconsistent state) or the server cannot be reached
const activeExit = async (error: Error): Promise<void> => {
@@ -326,7 +380,7 @@ export namespace Session {
args: { error }
}
});
- await Promise.all(exitHandlers.map(handler => handler(error)));
+ await executeExitHandlers(error);
// notify master thread (which will log update in the console) of crash event via IPC
lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`));
lifecycleNotification(red(error.message));
@@ -375,7 +429,10 @@ export namespace Session {
work();
pollServer(); // begin polling
- return (handler: ExitHandler) => exitHandlers.push(handler);
+ return {
+ addExitHandler: (handler: ExitHandler) => exitHandlers.push(handler),
+ killSession: () => process.send!({ action: { message: "kill" } })
+ };
}
} \ No newline at end of file