aboutsummaryrefslogtreecommitdiff
path: root/src/server/session/agents
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/session/agents')
-rw-r--r--src/server/session/agents/message_router.ts4
-rw-r--r--src/server/session/agents/monitor.ts100
-rw-r--r--src/server/session/agents/server_worker.ts8
3 files changed, 59 insertions, 53 deletions
diff --git a/src/server/session/agents/message_router.ts b/src/server/session/agents/message_router.ts
index 5848e27ab..707f771d9 100644
--- a/src/server/session/agents/message_router.ts
+++ b/src/server/session/agents/message_router.ts
@@ -8,7 +8,7 @@ export default abstract class MessageRouter {
* Add a listener at this message. When the monitor process
* receives a message, it will invoke all registered functions.
*/
- public addMessageListener = (name: string, handler: MessageHandler, exclusive = false) => {
+ public on = (name: string, handler: MessageHandler, exclusive = false) => {
const handlers = this.onMessage[name];
if (exclusive || !handlers) {
this.onMessage[name] = [handler];
@@ -20,7 +20,7 @@ export default abstract class MessageRouter {
/**
* Unregister a given listener at this message.
*/
- public removeMessageListener = (name: string, handler: MessageHandler) => {
+ public off = (name: string, handler: MessageHandler) => {
const handlers = this.onMessage[name];
if (handlers) {
const index = handlers.indexOf(handler);
diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts
index 5f4543606..ccba8199e 100644
--- a/src/server/session/agents/monitor.ts
+++ b/src/server/session/agents/monitor.ts
@@ -2,7 +2,7 @@ import { ExitHandler } from "./applied_session_agent";
import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config";
import Repl, { ReplAction } from "../utilities/repl";
import { isWorker, setupMaster, on, Worker, fork } from "cluster";
-import { PromisifiedIPCManager, suffix, IPC, MessageHandler, Message } from "../utilities/ipc";
+import { PromisifiedIPCManager, suffix, IPC, MessageHandler } from "../utilities/ipc";
import { red, cyan, white, yellow, blue } from "colors";
import { exec, ExecOptions } from "child_process";
import { validate, ValidationError } from "jsonschema";
@@ -40,8 +40,54 @@ export class Monitor extends MessageRouter {
}
}
- public onCrashDetected = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.CrashDetected, listener);
- public onServerRunning = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.ServerRunning, listener);
+ private constructor(sessionKey: string) {
+ super();
+ this.config = this.loadAndValidateConfiguration();
+ this.initialize(sessionKey);
+ this.repl = this.initializeRepl();
+ }
+
+ private initialize = (sessionKey: string) => {
+ console.log(this.timestamp(), cyan("initializing session..."));
+ this.key = sessionKey;
+
+ // determines whether or not we see the compilation / initialization / runtime output of each child server process
+ const output = this.config.showServerOutput ? "inherit" : "ignore";
+ setupMaster({ stdio: ["ignore", output, output, "ipc"] });
+
+ // handle exceptions in the master thread - there shouldn't be many of these
+ // the IPC (inter process communication) channel closed exception can't seem
+ // to be caught in a try catch, and is inconsequential, so it is ignored
+ process.on("uncaughtException", ({ message, stack }): void => {
+ if (message !== "Channel closed") {
+ this.mainLog(red(message));
+ if (stack) {
+ this.mainLog(`uncaught exception\n${red(stack)}`);
+ }
+ }
+ });
+
+ // a helpful cluster event called on the master thread each time a child process exits
+ on("exit", ({ process: { pid } }, code, signal) => {
+ const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`;
+ this.mainLog(cyan(prompt));
+ // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one
+ this.spawn();
+ });
+ }
+
+ public finalize = (): void => {
+ if (this.finalized) {
+ throw new Error("Session monitor is already finalized");
+ }
+ this.finalized = true;
+ this.spawn();
+ }
+
+ public readonly hooks = Object.freeze({
+ crashDetected: (listener: MessageHandler<{ error: Error }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener),
+ serverRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener)
+ });
/**
* Kill this session and its active child
@@ -91,48 +137,6 @@ export class Monitor extends MessageRouter {
});
}
- private constructor(sessionKey: string) {
- super();
- console.log(this.timestamp(), cyan("initializing session..."));
- this.key = sessionKey;
- this.config = this.loadAndValidateConfiguration();
-
- // determines whether or not we see the compilation / initialization / runtime output of each child server process
- const output = this.config.showServerOutput ? "inherit" : "ignore";
- setupMaster({ stdio: ["ignore", output, output, "ipc"] });
-
- // handle exceptions in the master thread - there shouldn't be many of these
- // the IPC (inter process communication) channel closed exception can't seem
- // to be caught in a try catch, and is inconsequential, so it is ignored
- process.on("uncaughtException", ({ message, stack }): void => {
- if (message !== "Channel closed") {
- this.mainLog(red(message));
- if (stack) {
- this.mainLog(`uncaught exception\n${red(stack)}`);
- }
- }
- });
-
- // a helpful cluster event called on the master thread each time a child process exits
- on("exit", ({ process: { pid } }, code, signal) => {
- const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`;
- this.mainLog(cyan(prompt));
- // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one
- this.spawn();
- });
-
- this.repl = this.initializeRepl();
- Monitor.IPCManager.setRouter(this.route);
- }
-
- public finalize = (): void => {
- if (this.finalized) {
- throw new Error("Session monitor is already finalized");
- }
- this.finalized = true;
- this.spawn();
- }
-
/**
* Generates a blue UTC string associated with the time
* of invocation.
@@ -282,8 +286,10 @@ export class Monitor extends MessageRouter {
Monitor.IPCManager = IPC(this.activeWorker);
this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`));
- this.addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true);
- this.addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true);
+ this.on("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true);
+ this.on("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true);
+
+ Monitor.IPCManager.setRouter(this.route);
}
}
diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts
index 2c77cfb29..50abe398d 100644
--- a/src/server/session/agents/server_worker.ts
+++ b/src/server/session/agents/server_worker.ts
@@ -1,6 +1,6 @@
import { ExitHandler } from "./applied_session_agent";
import { isMaster } from "cluster";
-import { PromisifiedIPCManager } from "../utilities/ipc";
+import { PromisifiedIPCManager, Message } from "../utilities/ipc";
import MessageRouter from "./message_router";
import { red, green, white, yellow } from "colors";
import { get } from "request-promise";
@@ -80,11 +80,11 @@ export class ServerWorker extends MessageRouter {
private configureProcess = () => {
ServerWorker.IPCManager.setRouter(this.route);
// updates the local values of variables to the those sent from master
- this.addMessageListener("updatePollingInterval", ({ args }) => {
+ this.on("updatePollingInterval", ({ args }: Message<{ newPollingIntervalSeconds: number }>) => {
this.pollingIntervalSeconds = args.newPollingIntervalSeconds;
return new Promise<void>(resolve => setTimeout(resolve, 1000 * 10));
});
- this.addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => {
+ this.on("manualExit", async ({ args: { isSessionEnd } }: Message<{ isSessionEnd: boolean }>) => {
await this.executeExitHandlers(isSessionEnd);
process.exit(0);
});
@@ -135,7 +135,7 @@ export class ServerWorker extends MessageRouter {
if (!this.shouldServerBeResponsive) {
// notify monitor thread that the server is up and running
this.lifecycleNotification(green(`listening on ${this.serverPort}...`));
- this.emitToMonitor(Monitor.IntrinsicEvents.ServerRunning, { firstTime: !this.isInitialized });
+ this.emitToMonitor(Monitor.IntrinsicEvents.ServerRunning, { isFirstTime: !this.isInitialized });
this.isInitialized = true;
}
this.shouldServerBeResponsive = true;