aboutsummaryrefslogtreecommitdiff
path: root/src/server/session/agents/monitor.ts
diff options
context:
space:
mode:
authorSam Wilkins <samwilkins333@gmail.com>2020-01-11 02:51:33 -0500
committerSam Wilkins <samwilkins333@gmail.com>2020-01-11 02:51:33 -0500
commit7a20f573f4f428bfc779797d437fa9525b6976f8 (patch)
tree5d8d1028f3ee21d279066e3a410659dbd79ad791 /src/server/session/agents/monitor.ts
parent2c83f136771794565350d229a238b3f01cc60aca (diff)
standardized ipc message
Diffstat (limited to 'src/server/session/agents/monitor.ts')
-rw-r--r--src/server/session/agents/monitor.ts86
1 files changed, 14 insertions, 72 deletions
diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts
index cd09c9e41..18fa6df24 100644
--- a/src/server/session/agents/monitor.ts
+++ b/src/server/session/agents/monitor.ts
@@ -2,7 +2,7 @@ import { ExitHandler } from "./applied_session_agent";
import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config";
import Repl, { ReplAction } from "../utilities/repl";
import { isWorker, setupMaster, on, Worker, fork } from "cluster";
-import { PromisifiedIPCManager, suffix } from "../utilities/ipc";
+import { PromisifiedIPCManager, suffix, IPC, Message } from "../utilities/ipc";
import { red, cyan, white, yellow, blue } from "colors";
import { exec, ExecOptions } from "child_process";
import { Utils } from "../../../Utils";
@@ -16,28 +16,21 @@ import { EventEmitter } from "events";
* and spawns off an initial process that will respawn as predecessors die.
*/
export class Monitor extends EventEmitter {
- private static readonly localIPCManager = new PromisifiedIPCManager(process);
- private static childIPCManager: PromisifiedIPCManager;
+ private static IPCManager: PromisifiedIPCManager;
private static count = 0;
private finalized = false;
private exitHandlers: ExitHandler[] = [];
private readonly config: Configuration;
- private onMessage: { [message: string]: Monitor.ServerMessageHandler[] | undefined } = {};
private activeWorker: Worker | undefined;
private key: string | undefined;
private repl: Repl;
public static Create() {
if (isWorker) {
- this.localIPCManager.emit({
- action: {
- message: "kill",
- args: {
- reason: "cannot create a monitor on the worker process.",
- graceful: false,
- errorCode: 1
- }
- }
+ IPC(process).emit("kill", {
+ reason: "cannot create a monitor on the worker process.",
+ graceful: false,
+ errorCode: 1
});
process.exit(1);
} else if (++Monitor.count > 1) {
@@ -100,37 +93,6 @@ export class Monitor extends EventEmitter {
});
}
- /**
- * Add a listener at this message. When the monitor process
- * receives a message, it will invoke all registered functions.
- */
- public addServerMessageListener = (message: string, handler: Monitor.ServerMessageHandler) => {
- const handlers = this.onMessage[message];
- if (handlers) {
- handlers.push(handler);
- } else {
- this.onMessage[message] = [handler];
- }
- }
-
- /**
- * Unregister a given listener at this message.
- */
- public removeServerMessageListener = (message: string, handler: Monitor.ServerMessageHandler) => {
- const handlers = this.onMessage[message];
- if (handlers) {
- const index = handlers.indexOf(handler);
- if (index > -1) {
- handlers.splice(index, 1);
- }
- }
- }
-
- /**
- * Unregister all listeners at this message.
- */
- public clearServerMessageListeners = (message: string) => this.onMessage[message] = undefined;
-
private constructor() {
super();
@@ -255,7 +217,7 @@ export class Monitor extends EventEmitter {
if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) {
this.config.polling.intervalSeconds = newPollingIntervalSeconds;
if (args[2] === "true") {
- return Monitor.localIPCManager.emit({ newPollingIntervalSeconds }, true);
+ return Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds }, true);
}
}
}
@@ -271,7 +233,7 @@ export class Monitor extends EventEmitter {
private killActiveWorker = (graceful = true, isSessionEnd = false): void => {
if (this.activeWorker && !this.activeWorker.isDead()) {
if (graceful) {
- Monitor.childIPCManager.emit({ manualExit: { isSessionEnd } });
+ Monitor.IPCManager.emit("manualExit", { isSessionEnd });
} else {
this.activeWorker.process.kill();
}
@@ -319,40 +281,20 @@ export class Monitor extends EventEmitter {
session_key: this.key,
ipc_suffix: suffix
});
- Monitor.childIPCManager = new PromisifiedIPCManager(this.activeWorker);
+ Monitor.IPCManager = IPC(this.activeWorker);
this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`));
- this.addServerMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode));
- this.addServerMessageListener(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, ({ args: { error } }) => this.emit(Monitor.IntrinsicEvents.CrashDetected, error));
- this.addServerMessageListener(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, ({ args: { firstTime } }) => this.emit(Monitor.IntrinsicEvents.ServerRunning, firstTime));
-
- // an IPC message handler that executes actions on the master thread when prompted by the active worker
- Monitor.childIPCManager.addMessagesHandler(async ({ lifecycle, action }) => {
- if (action) {
- const { message, args } = action as Monitor.Action;
- console.log(this.timestamp(), `${this.config.identifiers.worker.text} action requested (${cyan(message)})`);
- const handlers = this.onMessage[message];
- if (handlers) {
- await Promise.all(handlers.map(handler => handler({ message, args })));
- }
- }
- if (lifecycle) {
- console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${lifecycle})`);
- }
- });
+ const { addMessageListener } = Monitor.IPCManager;
+ addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode));
+ addMessageListener(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, ({ args: { error } }) => this.emit(Monitor.IntrinsicEvents.CrashDetected, error));
+ addMessageListener(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, ({ args: { firstTime } }) => this.emit(Monitor.IntrinsicEvents.ServerRunning, firstTime));
+ addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`));
}
}
export namespace Monitor {
- export interface Action {
- message: string;
- args: any;
- }
-
- export type ServerMessageHandler = (action: Action) => any | Promise<any>;
-
export enum IntrinsicEvents {
KeyGenerated = "key_generated",
CrashDetected = "crash_detected",