aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/server/session/agents/monitor.ts86
-rw-r--r--src/server/session/agents/server_worker.ts38
-rw-r--r--src/server/session/utilities/ipc.ts88
3 files changed, 95 insertions, 117 deletions
diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts
index cd09c9e41..18fa6df24 100644
--- a/src/server/session/agents/monitor.ts
+++ b/src/server/session/agents/monitor.ts
@@ -2,7 +2,7 @@ import { ExitHandler } from "./applied_session_agent";
import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config";
import Repl, { ReplAction } from "../utilities/repl";
import { isWorker, setupMaster, on, Worker, fork } from "cluster";
-import { PromisifiedIPCManager, suffix } from "../utilities/ipc";
+import { PromisifiedIPCManager, suffix, IPC, Message } from "../utilities/ipc";
import { red, cyan, white, yellow, blue } from "colors";
import { exec, ExecOptions } from "child_process";
import { Utils } from "../../../Utils";
@@ -16,28 +16,21 @@ import { EventEmitter } from "events";
* and spawns off an initial process that will respawn as predecessors die.
*/
export class Monitor extends EventEmitter {
- private static readonly localIPCManager = new PromisifiedIPCManager(process);
- private static childIPCManager: PromisifiedIPCManager;
+ private static IPCManager: PromisifiedIPCManager;
private static count = 0;
private finalized = false;
private exitHandlers: ExitHandler[] = [];
private readonly config: Configuration;
- private onMessage: { [message: string]: Monitor.ServerMessageHandler[] | undefined } = {};
private activeWorker: Worker | undefined;
private key: string | undefined;
private repl: Repl;
public static Create() {
if (isWorker) {
- this.localIPCManager.emit({
- action: {
- message: "kill",
- args: {
- reason: "cannot create a monitor on the worker process.",
- graceful: false,
- errorCode: 1
- }
- }
+ IPC(process).emit("kill", {
+ reason: "cannot create a monitor on the worker process.",
+ graceful: false,
+ errorCode: 1
});
process.exit(1);
} else if (++Monitor.count > 1) {
@@ -100,37 +93,6 @@ export class Monitor extends EventEmitter {
});
}
- /**
- * Add a listener at this message. When the monitor process
- * receives a message, it will invoke all registered functions.
- */
- public addServerMessageListener = (message: string, handler: Monitor.ServerMessageHandler) => {
- const handlers = this.onMessage[message];
- if (handlers) {
- handlers.push(handler);
- } else {
- this.onMessage[message] = [handler];
- }
- }
-
- /**
- * Unregister a given listener at this message.
- */
- public removeServerMessageListener = (message: string, handler: Monitor.ServerMessageHandler) => {
- const handlers = this.onMessage[message];
- if (handlers) {
- const index = handlers.indexOf(handler);
- if (index > -1) {
- handlers.splice(index, 1);
- }
- }
- }
-
- /**
- * Unregister all listeners at this message.
- */
- public clearServerMessageListeners = (message: string) => this.onMessage[message] = undefined;
-
private constructor() {
super();
@@ -255,7 +217,7 @@ export class Monitor extends EventEmitter {
if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) {
this.config.polling.intervalSeconds = newPollingIntervalSeconds;
if (args[2] === "true") {
- return Monitor.localIPCManager.emit({ newPollingIntervalSeconds }, true);
+ return Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds }, true);
}
}
}
@@ -271,7 +233,7 @@ export class Monitor extends EventEmitter {
private killActiveWorker = (graceful = true, isSessionEnd = false): void => {
if (this.activeWorker && !this.activeWorker.isDead()) {
if (graceful) {
- Monitor.childIPCManager.emit({ manualExit: { isSessionEnd } });
+ Monitor.IPCManager.emit("manualExit", { isSessionEnd });
} else {
this.activeWorker.process.kill();
}
@@ -319,40 +281,20 @@ export class Monitor extends EventEmitter {
session_key: this.key,
ipc_suffix: suffix
});
- Monitor.childIPCManager = new PromisifiedIPCManager(this.activeWorker);
+ Monitor.IPCManager = IPC(this.activeWorker);
this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`));
- this.addServerMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode));
- this.addServerMessageListener(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, ({ args: { error } }) => this.emit(Monitor.IntrinsicEvents.CrashDetected, error));
- this.addServerMessageListener(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, ({ args: { firstTime } }) => this.emit(Monitor.IntrinsicEvents.ServerRunning, firstTime));
-
- // an IPC message handler that executes actions on the master thread when prompted by the active worker
- Monitor.childIPCManager.addMessagesHandler(async ({ lifecycle, action }) => {
- if (action) {
- const { message, args } = action as Monitor.Action;
- console.log(this.timestamp(), `${this.config.identifiers.worker.text} action requested (${cyan(message)})`);
- const handlers = this.onMessage[message];
- if (handlers) {
- await Promise.all(handlers.map(handler => handler({ message, args })));
- }
- }
- if (lifecycle) {
- console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${lifecycle})`);
- }
- });
+ const { addMessageListener } = Monitor.IPCManager;
+ addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode));
+ addMessageListener(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, ({ args: { error } }) => this.emit(Monitor.IntrinsicEvents.CrashDetected, error));
+ addMessageListener(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, ({ args: { firstTime } }) => this.emit(Monitor.IntrinsicEvents.ServerRunning, firstTime));
+ addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`));
}
}
export namespace Monitor {
- export interface Action {
- message: string;
- args: any;
- }
-
- export type ServerMessageHandler = (action: Action) => any | Promise<any>;
-
export enum IntrinsicEvents {
KeyGenerated = "key_generated",
CrashDetected = "crash_detected",
diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts
index b279a19d8..9e471366a 100644
--- a/src/server/session/agents/server_worker.ts
+++ b/src/server/session/agents/server_worker.ts
@@ -11,7 +11,7 @@ import { Monitor } from "./monitor";
* email if the server encounters an uncaught exception or if the server cannot be reached.
*/
export class ServerWorker {
- private static localIPCManager = new PromisifiedIPCManager(process);
+ private static IPCManager = new PromisifiedIPCManager(process);
private static count = 0;
private shouldServerBeResponsive = false;
private exitHandlers: ExitHandler[] = [];
@@ -27,14 +27,10 @@ export class ServerWorker {
console.error(red("cannot create a worker on the monitor process."));
process.exit(1);
} else if (++ServerWorker.count > 1) {
- ServerWorker.localIPCManager.emit({
- action: {
- message: "kill", args: {
- reason: "cannot create more than one worker on a given worker process.",
- graceful: false,
- errorCode: 1
- }
- }
+ ServerWorker.IPCManager.emit("kill", {
+ reason: "cannot create more than one worker on a given worker process.",
+ graceful: false,
+ errorCode: 1
});
process.exit(1);
} else {
@@ -53,13 +49,13 @@ export class ServerWorker {
* server worker (child process). This will also kill
* this process (child process).
*/
- public killSession = (reason: string, graceful = true, errorCode = 0) => this.sendMonitorAction("kill", { reason, graceful, errorCode });
+ public killSession = (reason: string, graceful = true, errorCode = 0) => this.emitToMonitor("kill", { reason, graceful, errorCode });
/**
* A convenience wrapper to tell the session monitor (parent process)
* to carry out the action with the specified message and arguments.
*/
- public sendMonitorAction = (message: string, args?: any, expectResponse = false) => ServerWorker.localIPCManager.emit({ action: { message, args } }, expectResponse);
+ public emitToMonitor = (name: string, args?: any, expectResponse = false) => ServerWorker.IPCManager.emit(name, args, expectResponse);
private constructor(work: Function) {
this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`));
@@ -81,15 +77,11 @@ export class ServerWorker {
*/
private configureProcess = () => {
// updates the local values of variables to the those sent from master
- ServerWorker.localIPCManager.addMessagesHandler(async ({ newPollingIntervalSeconds, manualExit }) => {
- if (newPollingIntervalSeconds !== undefined) {
- this.pollingIntervalSeconds = newPollingIntervalSeconds;
- }
- if (manualExit !== undefined) {
- const { isSessionEnd } = manualExit;
- await this.executeExitHandlers(isSessionEnd);
- process.exit(0);
- }
+ const { addMessageListener } = ServerWorker.IPCManager;
+ addMessageListener("updatePollingInterval", ({ args }) => this.pollingIntervalSeconds = args.newPollingIntervalSeconds);
+ addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => {
+ await this.executeExitHandlers(isSessionEnd);
+ process.exit(0);
});
// one reason to exit, as the process might be in an inconsistent state after such an exception
@@ -109,7 +101,7 @@ export class ServerWorker {
/**
* Notify master thread (which will log update in the console) of initialization via IPC.
*/
- public lifecycleNotification = (event: string) => ServerWorker.localIPCManager.emit({ lifecycle: event });
+ public lifecycleNotification = (event: string) => ServerWorker.IPCManager.emit("lifecycle", { event });
/**
* Called whenever the process has a reason to terminate, either through an uncaught exception
@@ -118,7 +110,7 @@ export class ServerWorker {
private proactiveUnplannedExit = async (error: Error): Promise<void> => {
this.shouldServerBeResponsive = false;
// communicates via IPC to the master thread that it should dispatch a crash notification email
- this.sendMonitorAction(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, { error });
+ this.emitToMonitor(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, { error });
await this.executeExitHandlers(error);
// notify master thread (which will log update in the console) of crash event via IPC
this.lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`));
@@ -138,7 +130,7 @@ export class ServerWorker {
if (!this.shouldServerBeResponsive) {
// notify monitor thread that the server is up and running
this.lifecycleNotification(green(`listening on ${this.serverPort}...`));
- this.sendMonitorAction(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, { firstTime: !this.isInitialized });
+ this.emitToMonitor(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, { firstTime: !this.isInitialized });
this.isInitialized = true;
}
this.shouldServerBeResponsive = true;
diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts
index 888377c93..2faf9f63e 100644
--- a/src/server/session/utilities/ipc.ts
+++ b/src/server/session/utilities/ipc.ts
@@ -2,11 +2,19 @@ import { isMaster } from "cluster";
import { Utils } from "../../../Utils";
export type IPCTarget = NodeJS.EventEmitter & { send?: Function };
-export type Listener = (message: any) => void | Promise<void>;
+export type Router = (message: Message) => void | Promise<void>;
export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix;
+export interface Message {
+ name: string;
+ args: any;
+}
+
+export type MessageHandler = (message: Message) => any | Promise<any>;
+
export class PromisifiedIPCManager {
+ private onMessage: { [message: string]: MessageHandler[] | undefined } = {};
private readonly target: IPCTarget;
private readonly ipc_id = `ipc_id_${suffix}`;
private readonly response_expected = `response_expected_${suffix}`;
@@ -14,17 +22,66 @@ export class PromisifiedIPCManager {
constructor(target: IPCTarget) {
this.target = target;
+
+ this.target.addListener("message", async ({ name, args }: Message) => {
+ let error: Error | undefined;
+ try {
+ const handlers = this.onMessage[name];
+ if (handlers) {
+ await Promise.all(handlers.map(handler => handler({ name, args })));
+ }
+ } catch (e) {
+ error = e;
+ }
+ if (args[this.response_expected] && this.target.send) {
+ const response: any = { error };
+ response[this.ipc_id] = args[this.ipc_id];
+ response[this.is_response] = true;
+ this.target.send(response);
+ }
+ });
+ }
+
+ /**
+ * Add a listener at this message. When the monitor process
+ * receives a message, it will invoke all registered functions.
+ */
+ public addMessageListener = (name: string, handler: MessageHandler) => {
+ const handlers = this.onMessage[name];
+ if (handlers) {
+ handlers.push(handler);
+ } else {
+ this.onMessage[name] = [handler];
+ }
}
- public emit = async (message: any, expectResponse = false): Promise<Error | undefined> => {
+ /**
+ * Unregister a given listener at this message.
+ */
+ public removeMessageListener = (name: string, handler: MessageHandler) => {
+ const handlers = this.onMessage[name];
+ if (handlers) {
+ const index = handlers.indexOf(handler);
+ if (index > -1) {
+ handlers.splice(index, 1);
+ }
+ }
+ }
+
+ /**
+ * Unregister all listeners at this message.
+ */
+ public clearMessageListeners = (message: string) => this.onMessage[message] = undefined;
+
+ public emit = async (name: string, args: any, expectResponse = false): Promise<Error | undefined> => {
if (!this.target.send) {
return new Error("Cannot dispatch when send is undefined.");
}
- message[this.response_expected] = expectResponse;
+ args[this.response_expected] = expectResponse;
if (expectResponse) {
return new Promise(resolve => {
const messageId = Utils.GenerateGuid();
- message[this.ipc_id] = messageId;
+ args[this.ipc_id] = messageId;
const responseHandler: (args: any) => void = response => {
const { error } = response;
if (response[this.is_response] && response[this.ipc_id] === messageId) {
@@ -33,28 +90,15 @@ export class PromisifiedIPCManager {
}
};
this.target.addListener("message", responseHandler);
- this.target.send!(message);
+ this.target.send!({ name, args });
});
} else {
- this.target.send(message);
+ this.target.send({ name, args });
}
}
- public addMessagesHandler = (handler: Listener): void => {
- this.target.addListener("message", async incoming => {
- let error: Error | undefined;
- try {
- await handler(incoming);
- } catch (e) {
- error = e;
- }
- if (incoming[this.response_expected] && this.target.send) {
- const response: any = { error };
- response[this.ipc_id] = incoming[this.ipc_id];
- response[this.is_response] = true;
- this.target.send(response);
- }
- });
- }
+}
+export function IPC(target: IPCTarget) {
+ return new PromisifiedIPCManager(target);
} \ No newline at end of file