aboutsummaryrefslogtreecommitdiff
path: root/src/server/session/agents
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/session/agents')
-rw-r--r--src/server/session/agents/applied_session_agent.ts56
-rw-r--r--src/server/session/agents/monitor.ts367
-rw-r--r--src/server/session/agents/server_worker.ts165
3 files changed, 0 insertions, 588 deletions
diff --git a/src/server/session/agents/applied_session_agent.ts b/src/server/session/agents/applied_session_agent.ts
deleted file mode 100644
index 53293d3bf..000000000
--- a/src/server/session/agents/applied_session_agent.ts
+++ /dev/null
@@ -1,56 +0,0 @@
-import { isMaster } from "cluster";
-import { Monitor } from "./monitor";
-import { ServerWorker } from "./server_worker";
-
-export type ExitHandler = (reason: Error | boolean) => void | Promise<void>;
-
-export abstract class AppliedSessionAgent {
-
- // the following two methods allow the developer to create a custom
- // session and use the built in customization options for each thread
- protected abstract async initializeMonitor(monitor: Monitor): Promise<void>;
- protected abstract async initializeServerWorker(): Promise<ServerWorker>;
-
- private launched = false;
-
- public killSession = (reason: string, graceful = true, errorCode = 0) => {
- const target = isMaster ? this.sessionMonitor : this.serverWorker;
- target.killSession(reason, graceful, errorCode);
- }
-
- private sessionMonitorRef: Monitor | undefined;
- public get sessionMonitor(): Monitor {
- if (!isMaster) {
- this.serverWorker.sendMonitorAction("kill", {
- graceful: false,
- reason: "Cannot access the session monitor directly from the server worker thread.",
- errorCode: 1
- });
- throw new Error();
- }
- return this.sessionMonitorRef!;
- }
-
- private serverWorkerRef: ServerWorker | undefined;
- public get serverWorker(): ServerWorker {
- if (isMaster) {
- throw new Error("Cannot access the server worker directly from the session monitor thread");
- }
- return this.serverWorkerRef!;
- }
-
- public async launch(): Promise<void> {
- if (!this.launched) {
- this.launched = true;
- if (isMaster) {
- await this.initializeMonitor(this.sessionMonitorRef = Monitor.Create());
- this.sessionMonitorRef.finalize();
- } else {
- this.serverWorkerRef = await this.initializeServerWorker();
- }
- } else {
- throw new Error("Cannot launch a session thread more than once per process.");
- }
- }
-
-} \ No newline at end of file
diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts
deleted file mode 100644
index e1709f5e6..000000000
--- a/src/server/session/agents/monitor.ts
+++ /dev/null
@@ -1,367 +0,0 @@
-import { ExitHandler } from "./applied_session_agent";
-import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config";
-import Repl, { ReplAction } from "../utilities/repl";
-import { isWorker, setupMaster, on, Worker, fork } from "cluster";
-import { IPC } from "../utilities/ipc";
-import { red, cyan, white, yellow, blue, green } from "colors";
-import { exec, ExecOptions } from "child_process";
-import { Utils } from "../../../Utils";
-import { validate, ValidationError } from "jsonschema";
-import { Utilities } from "../utilities/utilities";
-import { readFileSync } from "fs";
-import { EventEmitter } from "events";
-
-/**
- * Validates and reads the configuration file, accordingly builds a child process factory
- * and spawns off an initial process that will respawn as predecessors die.
- */
-export class Monitor extends EventEmitter {
-
- private static count = 0;
- private finalized = false;
- private exitHandlers: ExitHandler[] = [];
- private readonly config: Configuration;
- private onMessage: { [message: string]: Monitor.ServerMessageHandler[] | undefined } = {};
- private activeWorker: Worker | undefined;
- private key: string | undefined;
- private repl: Repl;
-
- public static Create() {
- if (isWorker) {
- IPC.dispatchMessage(process, {
- action: {
- message: "kill",
- args: {
- reason: "cannot create a monitor on the worker process.",
- graceful: false,
- errorCode: 1
- }
- }
- });
- process.exit(1);
- } else if (++Monitor.count > 1) {
- console.error(red("cannot create more than one monitor."));
- process.exit(1);
- } else {
- return new Monitor();
- }
- }
-
- /**
- * Kill this session and its active child
- * server process, either gracefully (may wait
- * indefinitely, but at least allows active networking
- * requests to complete) or immediately.
- */
- public killSession = async (reason: string, graceful = true, errorCode = 0) => {
- this.mainLog(cyan(`exiting session ${graceful ? "clean" : "immediate"}ly`));
- this.mainLog(`session exit reason: ${(red(reason))}`);
- await this.executeExitHandlers(true);
- this.killActiveWorker(graceful, true);
- process.exit(errorCode);
- }
-
- /**
- * Execute the list of functions registered to be called
- * whenever the process exits.
- */
- public addExitHandler = (handler: ExitHandler) => this.exitHandlers.push(handler);
-
- /**
- * Extend the default repl by adding in custom commands
- * that can invoke application logic external to this module
- */
- public addReplCommand = (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => {
- this.repl.registerCommand(basename, argPatterns, action);
- }
-
- public exec = (command: string, options?: ExecOptions) => {
- return new Promise<void>(resolve => {
- exec(command, { ...options, encoding: "utf8" }, (error, stdout, stderr) => {
- if (error) {
- this.execLog(red(`unable to execute ${white(command)}`));
- error.message.split("\n").forEach(line => line.length && this.execLog(red(`(error) ${line}`)));
- } else {
- let outLines: string[], errorLines: string[];
- if ((outLines = stdout.split("\n").filter(line => line.length)).length) {
- outLines.forEach(line => line.length && this.execLog(cyan(`(stdout) ${line}`)));
- }
- if ((errorLines = stderr.split("\n").filter(line => line.length)).length) {
- errorLines.forEach(line => line.length && this.execLog(yellow(`(stderr) ${line}`)));
- }
- }
- resolve();
- });
- });
- }
-
- /**
- * Add a listener at this message. When the monitor process
- * receives a message, it will invoke all registered functions.
- */
- public addServerMessageListener = (message: string, handler: Monitor.ServerMessageHandler) => {
- const handlers = this.onMessage[message];
- if (handlers) {
- handlers.push(handler);
- } else {
- this.onMessage[message] = [handler];
- }
- }
-
- /**
- * Unregister a given listener at this message.
- */
- public removeServerMessageListener = (message: string, handler: Monitor.ServerMessageHandler) => {
- const handlers = this.onMessage[message];
- if (handlers) {
- const index = handlers.indexOf(handler);
- if (index > -1) {
- handlers.splice(index, 1);
- }
- }
- }
-
- /**
- * Unregister all listeners at this message.
- */
- public clearServerMessageListeners = (message: string) => this.onMessage[message] = undefined;
-
- private constructor() {
- super();
-
- console.log(this.timestamp(), cyan("initializing session..."));
- this.config = this.loadAndValidateConfiguration();
-
- // determines whether or not we see the compilation / initialization / runtime output of each child server process
- const output = this.config.showServerOutput ? "inherit" : "ignore";
- setupMaster({ stdio: ["ignore", output, output, "ipc"] });
-
- // handle exceptions in the master thread - there shouldn't be many of these
- // the IPC (inter process communication) channel closed exception can't seem
- // to be caught in a try catch, and is inconsequential, so it is ignored
- process.on("uncaughtException", ({ message, stack }): void => {
- if (message !== "Channel closed") {
- this.mainLog(red(message));
- if (stack) {
- this.mainLog(`uncaught exception\n${red(stack)}`);
- }
- }
- });
-
- // a helpful cluster event called on the master thread each time a child process exits
- on("exit", ({ process: { pid } }, code, signal) => {
- const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`;
- this.mainLog(cyan(prompt));
- // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one
- this.spawn();
- });
-
- this.repl = this.initializeRepl();
- }
-
- public finalize = (): void => {
- if (this.finalized) {
- throw new Error("Session monitor is already finalized");
- }
- this.finalized = true;
- this.emit(Monitor.IntrinsicEvents.KeyGenerated, this.key = Utils.GenerateGuid());
- this.spawn();
- }
-
- /**
- * Generates a blue UTC string associated with the time
- * of invocation.
- */
- private timestamp = () => blue(`[${new Date().toUTCString()}]`);
-
- /**
- * A formatted, identified and timestamped log in color
- */
- public mainLog = (...optionalParams: any[]) => {
- console.log(this.timestamp(), this.config.identifiers.master.text, ...optionalParams);
- }
-
- /**
- * A formatted, identified and timestamped log in color for non-
- */
- private execLog = (...optionalParams: any[]) => {
- console.log(this.timestamp(), this.config.identifiers.exec.text, ...optionalParams);
- }
-
- /**
- * Reads in configuration .json file only once, in the master thread
- * and pass down any variables the pertinent to the child processes as environment variables.
- */
- private loadAndValidateConfiguration = (): Configuration => {
- let config: Configuration;
- try {
- console.log(this.timestamp(), cyan("validating configuration..."));
- config = JSON.parse(readFileSync('./session.config.json', 'utf8'));
- const options = {
- throwError: true,
- allowUnknownAttributes: false
- };
- // ensure all necessary and no excess information is specified by the configuration file
- validate(config, configurationSchema, options);
- config = Utilities.preciseAssign({}, defaultConfig, config);
- } catch (error) {
- if (error instanceof ValidationError) {
- console.log(red("\nSession configuration failed."));
- console.log("The given session.config.json configuration file is invalid.");
- console.log(`${error.instance}: ${error.stack}`);
- process.exit(0);
- } else if (error.code === "ENOENT" && error.path === "./session.config.json") {
- console.log(cyan("Loading default session parameters..."));
- console.log("Consider including a session.config.json configuration file in your project root for customization.");
- config = Utilities.preciseAssign({}, defaultConfig);
- } else {
- console.log(red("\nSession configuration failed."));
- console.log("The following unknown error occurred during configuration.");
- console.log(error.stack);
- process.exit(0);
- }
- } finally {
- const { identifiers } = config!;
- Object.keys(identifiers).forEach(key => {
- const resolved = key as keyof Identifiers;
- const { text, color } = identifiers[resolved];
- identifiers[resolved].text = (colorMapping.get(color) || white)(`${text}:`);
- });
- return config!;
- }
- }
-
- /**
- * Builds the repl that allows the following commands to be typed into stdin of the master thread.
- */
- private initializeRepl = (): Repl => {
- const repl = new Repl({ identifier: () => `${this.timestamp()} ${this.config.identifiers.master.text}` });
- const boolean = /true|false/;
- const number = /\d+/;
- const letters = /[a-zA-Z]+/;
- repl.registerCommand("exit", [/clean|force/], args => this.killSession("manual exit requested by repl", args[0] === "clean", 0));
- repl.registerCommand("restart", [/clean|force/], args => this.killActiveWorker(args[0] === "clean"));
- repl.registerCommand("set", [letters, "port", number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === "true"));
- repl.registerCommand("set", [/polling/, number, boolean], async args => {
- const newPollingIntervalSeconds = Math.floor(Number(args[1]));
- if (newPollingIntervalSeconds < 0) {
- this.mainLog(red("the polling interval must be a non-negative integer"));
- } else {
- if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) {
- this.config.polling.intervalSeconds = newPollingIntervalSeconds;
- if (args[2] === "true") {
- return IPC.dispatchMessage(this.activeWorker!, { newPollingIntervalSeconds }, true);
- }
- }
- }
- });
- return repl;
- }
-
- private executeExitHandlers = async (reason: Error | boolean) => Promise.all(this.exitHandlers.map(handler => handler(reason)));
-
- /**
- * Attempts to kill the active worker gracefully, unless otherwise specified.
- */
- private killActiveWorker = (graceful = true, isSessionEnd = false): void => {
- if (this.activeWorker && !this.activeWorker.isDead()) {
- if (graceful) {
- IPC.dispatchMessage(this.activeWorker, { manualExit: { isSessionEnd } });
- } else {
- this.activeWorker.process.kill();
- }
- }
- }
-
- /**
- * Allows the caller to set the port at which the target (be it the server,
- * the websocket, some other custom port) is listening. If an immediate restart
- * is specified, this monitor will kill the active child and re-launch the server
- * at the port. Otherwise, the updated port won't be used until / unless the child
- * dies on its own and triggers a restart.
- */
- private setPort = (port: "server" | "socket" | string, value: number, immediateRestart: boolean): void => {
- if (value > 1023 && value < 65536) {
- this.config.ports[port] = value;
- if (immediateRestart) {
- this.killActiveWorker();
- }
- } else {
- this.mainLog(red(`${port} is an invalid port number`));
- }
- }
-
- /**
- * Kills the current active worker and proceeds to spawn a new worker,
- * feeding in configuration information as environment variables.
- */
- private spawn = (): void => {
- const {
- polling: {
- route,
- failureTolerance,
- intervalSeconds
- },
- ports
- } = this.config;
- this.killActiveWorker();
- this.activeWorker = fork({
- pollingRoute: route,
- pollingFailureTolerance: failureTolerance,
- serverPort: ports.server,
- socketPort: ports.socket,
- pollingIntervalSeconds: intervalSeconds,
- session_key: this.key,
- ipc_suffix: IPC.suffix
- });
- this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`));
- // an IPC message handler that executes actions on the master thread when prompted by the active worker
- IPC.addMessagesHandler(this.activeWorker!, async ({ lifecycle, action }) => {
- if (action) {
- const { message, args } = action as Monitor.Action;
- console.log(this.timestamp(), `${this.config.identifiers.worker.text} action requested (${cyan(message)})`);
- switch (message) {
- case "kill":
- const { reason, graceful, errorCode } = args;
- this.killSession(reason, graceful, errorCode);
- break;
- case "notify_crash":
- this.emit(Monitor.IntrinsicEvents.CrashDetected, args.error);
- break;
- case Monitor.IntrinsicEvents.ServerRunning:
- this.emit(Monitor.IntrinsicEvents.ServerRunning, args.firstTime);
- break;
- case "set_port":
- const { port, value, immediateRestart } = args;
- this.setPort(port, value, immediateRestart);
- break;
- }
- const handlers = this.onMessage[message];
- if (handlers) {
- handlers.forEach(handler => handler({ message, args }));
- }
- }
- if (lifecycle) {
- console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${lifecycle})`);
- }
- });
- }
-
-}
-
-export namespace Monitor {
-
- export interface Action {
- message: string;
- args: any;
- }
-
- export type ServerMessageHandler = (action: Action) => void | Promise<void>;
-
- export enum IntrinsicEvents {
- KeyGenerated = "key_generated",
- CrashDetected = "crash_detected",
- ServerRunning = "server_running"
- }
-
-} \ No newline at end of file
diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts
deleted file mode 100644
index e9fdaf923..000000000
--- a/src/server/session/agents/server_worker.ts
+++ /dev/null
@@ -1,165 +0,0 @@
-import { ExitHandler } from "./applied_session_agent";
-import { isMaster } from "cluster";
-import { IPC } from "../utilities/ipc";
-import { red, green, white, yellow } from "colors";
-import { get } from "request-promise";
-import { Monitor } from "./monitor";
-
-/**
- * Effectively, each worker repairs the connection to the server by reintroducing a consistent state
- * if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification
- * email if the server encounters an uncaught exception or if the server cannot be reached.
- */
-export class ServerWorker {
-
- private static count = 0;
- private shouldServerBeResponsive = false;
- private exitHandlers: ExitHandler[] = [];
- private pollingFailureCount = 0;
- private pollingIntervalSeconds: number;
- private pollingFailureTolerance: number;
- private pollTarget: string;
- private serverPort: number;
- private isInitialized = false;
-
- public static Create(work: Function) {
- if (isMaster) {
- console.error(red("cannot create a worker on the monitor process."));
- process.exit(1);
- } else if (++ServerWorker.count > 1) {
- IPC.dispatchMessage(process, {
- action: {
- message: "kill", args: {
- reason: "cannot create more than one worker on a given worker process.",
- graceful: false,
- errorCode: 1
- }
- }
- });
- process.exit(1);
- } else {
- return new ServerWorker(work);
- }
- }
-
- /**
- * Allows developers to invoke application specific logic
- * by hooking into the exiting of the server process.
- */
- public addExitHandler = (handler: ExitHandler) => this.exitHandlers.push(handler);
-
- /**
- * Kill the session monitor (parent process) from this
- * server worker (child process). This will also kill
- * this process (child process).
- */
- public killSession = (reason: string, graceful = true, errorCode = 0) => this.sendMonitorAction("kill", { reason, graceful, errorCode });
-
- /**
- * A convenience wrapper to tell the session monitor (parent process)
- * to carry out the action with the specified message and arguments.
- */
- public sendMonitorAction = (message: string, args?: any, expectResponse = false) => IPC.dispatchMessage(process, { action: { message, args } }, expectResponse);
-
- private constructor(work: Function) {
- this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`));
-
- const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env;
- this.serverPort = Number(serverPort);
- this.pollingIntervalSeconds = Number(pollingIntervalSeconds);
- this.pollingFailureTolerance = Number(pollingFailureTolerance);
- this.pollTarget = `http://localhost:${serverPort}${pollingRoute}`;
-
- this.configureProcess();
- work();
- this.pollServer();
- }
-
- /**
- * Set up message and uncaught exception handlers for this
- * server process.
- */
- private configureProcess = () => {
- // updates the local values of variables to the those sent from master
- IPC.addMessagesHandler(process, async ({ newPollingIntervalSeconds, manualExit }) => {
- if (newPollingIntervalSeconds !== undefined) {
- this.pollingIntervalSeconds = newPollingIntervalSeconds;
- }
- if (manualExit !== undefined) {
- const { isSessionEnd } = manualExit;
- await this.executeExitHandlers(isSessionEnd);
- process.exit(0);
- }
- });
-
- // one reason to exit, as the process might be in an inconsistent state after such an exception
- process.on('uncaughtException', this.proactiveUnplannedExit);
- process.on('unhandledRejection', reason => {
- const appropriateError = reason instanceof Error ? reason : new Error(`unhandled rejection: ${reason}`);
- this.proactiveUnplannedExit(appropriateError);
- });
- }
-
- /**
- * Execute the list of functions registered to be called
- * whenever the process exits.
- */
- private executeExitHandlers = async (reason: Error | boolean) => Promise.all(this.exitHandlers.map(handler => handler(reason)));
-
- /**
- * Notify master thread (which will log update in the console) of initialization via IPC.
- */
- public lifecycleNotification = (event: string) => IPC.dispatchMessage(process, { lifecycle: event });
-
- /**
- * Called whenever the process has a reason to terminate, either through an uncaught exception
- * in the process (potentially inconsistent state) or the server cannot be reached.
- */
- private proactiveUnplannedExit = async (error: Error): Promise<void> => {
- this.shouldServerBeResponsive = false;
- // communicates via IPC to the master thread that it should dispatch a crash notification email
- this.sendMonitorAction("notify_crash", { error });
- await this.executeExitHandlers(error);
- // notify master thread (which will log update in the console) of crash event via IPC
- this.lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`));
- this.lifecycleNotification(red(error.message));
- process.exit(1);
- }
-
- /**
- * This monitors the health of the server by submitting a get request to whatever port / route specified
- * by the configuration every n seconds, where n is also given by the configuration.
- */
- private pollServer = async (): Promise<void> => {
- await new Promise<void>(resolve => {
- setTimeout(async () => {
- try {
- await get(this.pollTarget);
- if (!this.shouldServerBeResponsive) {
- // notify monitor thread that the server is up and running
- this.lifecycleNotification(green(`listening on ${this.serverPort}...`));
- this.sendMonitorAction(Monitor.IntrinsicEvents.ServerRunning, { firstTime: !this.isInitialized });
- this.isInitialized = true;
- }
- this.shouldServerBeResponsive = true;
- } catch (error) {
- // if we expect the server to be unavailable, i.e. during compilation,
- // the listening variable is false, activeExit will return early and the child
- // process will continue
- if (this.shouldServerBeResponsive) {
- if (++this.pollingFailureCount > this.pollingFailureTolerance) {
- this.proactiveUnplannedExit(error);
- } else {
- this.lifecycleNotification(yellow(`the server has encountered ${this.pollingFailureCount} of ${this.pollingFailureTolerance} tolerable failures`));
- }
- }
- } finally {
- resolve();
- }
- }, 1000 * this.pollingIntervalSeconds);
- });
- // controlled, asynchronous infinite recursion achieves a persistent poll that does not submit a new request until the previous has completed
- this.pollServer();
- }
-
-} \ No newline at end of file