aboutsummaryrefslogtreecommitdiff
path: root/src/server/session/agents
diff options
context:
space:
mode:
authorSam Wilkins <samwilkins333@gmail.com>2020-01-11 09:54:48 -0500
committerSam Wilkins <samwilkins333@gmail.com>2020-01-11 09:54:48 -0500
commit27c93abd49ca8a519d2aa3cf7938434fe25947d7 (patch)
tree14f137aa8332ef1ce0cf95ff5cde52246637d45f /src/server/session/agents
parent7a20f573f4f428bfc779797d437fa9525b6976f8 (diff)
extends message, removed duplicate handlers, IPC streamlined
Diffstat (limited to 'src/server/session/agents')
-rw-r--r--src/server/session/agents/applied_session_agent.ts8
-rw-r--r--src/server/session/agents/message_router.ts45
-rw-r--r--src/server/session/agents/monitor.ts30
-rw-r--r--src/server/session/agents/server_worker.ts17
4 files changed, 74 insertions, 26 deletions
diff --git a/src/server/session/agents/applied_session_agent.ts b/src/server/session/agents/applied_session_agent.ts
index 53293d3bf..48226dab6 100644
--- a/src/server/session/agents/applied_session_agent.ts
+++ b/src/server/session/agents/applied_session_agent.ts
@@ -1,6 +1,7 @@
import { isMaster } from "cluster";
import { Monitor } from "./monitor";
import { ServerWorker } from "./server_worker";
+import { Utils } from "../../../Utils";
export type ExitHandler = (reason: Error | boolean) => void | Promise<void>;
@@ -8,7 +9,7 @@ export abstract class AppliedSessionAgent {
// the following two methods allow the developer to create a custom
// session and use the built in customization options for each thread
- protected abstract async initializeMonitor(monitor: Monitor): Promise<void>;
+ protected abstract async initializeMonitor(monitor: Monitor, key: string): Promise<void>;
protected abstract async initializeServerWorker(): Promise<ServerWorker>;
private launched = false;
@@ -21,7 +22,7 @@ export abstract class AppliedSessionAgent {
private sessionMonitorRef: Monitor | undefined;
public get sessionMonitor(): Monitor {
if (!isMaster) {
- this.serverWorker.sendMonitorAction("kill", {
+ this.serverWorker.emitToMonitor("kill", {
graceful: false,
reason: "Cannot access the session monitor directly from the server worker thread.",
errorCode: 1
@@ -43,7 +44,8 @@ export abstract class AppliedSessionAgent {
if (!this.launched) {
this.launched = true;
if (isMaster) {
- await this.initializeMonitor(this.sessionMonitorRef = Monitor.Create());
+ const sessionKey = Utils.GenerateGuid();
+ await this.initializeMonitor(this.sessionMonitorRef = Monitor.Create(sessionKey), sessionKey);
this.sessionMonitorRef.finalize();
} else {
this.serverWorkerRef = await this.initializeServerWorker();
diff --git a/src/server/session/agents/message_router.ts b/src/server/session/agents/message_router.ts
new file mode 100644
index 000000000..5848e27ab
--- /dev/null
+++ b/src/server/session/agents/message_router.ts
@@ -0,0 +1,45 @@
+import { MessageHandler, Message } from "../utilities/ipc";
+
+export default abstract class MessageRouter {
+
+ private onMessage: { [name: string]: MessageHandler[] | undefined } = {};
+
+ /**
+ * Add a listener at this message. When the monitor process
+ * receives a message, it will invoke all registered functions.
+ */
+ public addMessageListener = (name: string, handler: MessageHandler, exclusive = false) => {
+ const handlers = this.onMessage[name];
+ if (exclusive || !handlers) {
+ this.onMessage[name] = [handler];
+ } else {
+ handlers.push(handler);
+ }
+ }
+
+ /**
+ * Unregister a given listener at this message.
+ */
+ public removeMessageListener = (name: string, handler: MessageHandler) => {
+ const handlers = this.onMessage[name];
+ if (handlers) {
+ const index = handlers.indexOf(handler);
+ if (index > -1) {
+ handlers.splice(index, 1);
+ }
+ }
+ }
+
+ /**
+ * Unregister all listeners at this message.
+ */
+ public clearMessageListeners = (...names: string[]) => names.map(name => this.onMessage[name] = undefined);
+
+ protected route: MessageHandler = async ({ name, args }) => {
+ const handlers = this.onMessage[name];
+ if (handlers) {
+ await Promise.all(handlers.map(handler => handler({ name, args })));
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts
index 18fa6df24..96f1f8130 100644
--- a/src/server/session/agents/monitor.ts
+++ b/src/server/session/agents/monitor.ts
@@ -2,20 +2,19 @@ import { ExitHandler } from "./applied_session_agent";
import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config";
import Repl, { ReplAction } from "../utilities/repl";
import { isWorker, setupMaster, on, Worker, fork } from "cluster";
-import { PromisifiedIPCManager, suffix, IPC, Message } from "../utilities/ipc";
+import { PromisifiedIPCManager, suffix, IPC, MessageHandler, Message } from "../utilities/ipc";
import { red, cyan, white, yellow, blue } from "colors";
import { exec, ExecOptions } from "child_process";
-import { Utils } from "../../../Utils";
import { validate, ValidationError } from "jsonschema";
import { Utilities } from "../utilities/utilities";
import { readFileSync } from "fs";
-import { EventEmitter } from "events";
+import MessageRouter from "./message_router";
/**
* Validates and reads the configuration file, accordingly builds a child process factory
* and spawns off an initial process that will respawn as predecessors die.
*/
-export class Monitor extends EventEmitter {
+export class Monitor extends MessageRouter {
private static IPCManager: PromisifiedIPCManager;
private static count = 0;
private finalized = false;
@@ -25,7 +24,7 @@ export class Monitor extends EventEmitter {
private key: string | undefined;
private repl: Repl;
- public static Create() {
+ public static Create(sessionKey: string) {
if (isWorker) {
IPC(process).emit("kill", {
reason: "cannot create a monitor on the worker process.",
@@ -37,13 +36,12 @@ export class Monitor extends EventEmitter {
console.error(red("cannot create more than one monitor."));
process.exit(1);
} else {
- return new Monitor();
+ return new Monitor(sessionKey);
}
}
- public onCrashDetected = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener);
- public onKeyGenerated = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.KeyGenerated, listener);
- public onServerRunning = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener);
+ public onCrashDetected = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.CrashDetected, listener);
+ public onServerRunning = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.ServerRunning, listener);
/**
* Kill this session and its active child
@@ -93,10 +91,10 @@ export class Monitor extends EventEmitter {
});
}
- private constructor() {
+ private constructor(sessionKey: string) {
super();
-
console.log(this.timestamp(), cyan("initializing session..."));
+ this.key = sessionKey;
this.config = this.loadAndValidateConfiguration();
// determines whether or not we see the compilation / initialization / runtime output of each child server process
@@ -131,7 +129,6 @@ export class Monitor extends EventEmitter {
throw new Error("Session monitor is already finalized");
}
this.finalized = true;
- this.emit(Monitor.IntrinsicEvents.KeyGenerated, this.key = Utils.GenerateGuid());
this.spawn();
}
@@ -284,11 +281,10 @@ export class Monitor extends EventEmitter {
Monitor.IPCManager = IPC(this.activeWorker);
this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`));
- const { addMessageListener } = Monitor.IPCManager;
- addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode));
- addMessageListener(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, ({ args: { error } }) => this.emit(Monitor.IntrinsicEvents.CrashDetected, error));
- addMessageListener(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, ({ args: { firstTime } }) => this.emit(Monitor.IntrinsicEvents.ServerRunning, firstTime));
- addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`));
+ this.addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true);
+ this.addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true);
+
+ Monitor.IPCManager.setRouter(this.route);
}
}
diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts
index 9e471366a..01e1cf971 100644
--- a/src/server/session/agents/server_worker.ts
+++ b/src/server/session/agents/server_worker.ts
@@ -1,6 +1,7 @@
import { ExitHandler } from "./applied_session_agent";
import { isMaster } from "cluster";
import { PromisifiedIPCManager } from "../utilities/ipc";
+import MessageRouter from "./message_router";
import { red, green, white, yellow } from "colors";
import { get } from "request-promise";
import { Monitor } from "./monitor";
@@ -10,7 +11,7 @@ import { Monitor } from "./monitor";
* if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification
* email if the server encounters an uncaught exception or if the server cannot be reached.
*/
-export class ServerWorker {
+export class ServerWorker extends MessageRouter {
private static IPCManager = new PromisifiedIPCManager(process);
private static count = 0;
private shouldServerBeResponsive = false;
@@ -58,6 +59,7 @@ export class ServerWorker {
public emitToMonitor = (name: string, args?: any, expectResponse = false) => ServerWorker.IPCManager.emit(name, args, expectResponse);
private constructor(work: Function) {
+ super();
this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`));
const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env;
@@ -76,10 +78,13 @@ export class ServerWorker {
* server process.
*/
private configureProcess = () => {
+ ServerWorker.IPCManager.setRouter(this.route);
// updates the local values of variables to the those sent from master
- const { addMessageListener } = ServerWorker.IPCManager;
- addMessageListener("updatePollingInterval", ({ args }) => this.pollingIntervalSeconds = args.newPollingIntervalSeconds);
- addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => {
+ this.addMessageListener("updatePollingInterval", ({ args }) => {
+ this.pollingIntervalSeconds = args.newPollingIntervalSeconds;
+ return new Promise<void>(resolve => setTimeout(resolve, 1000 * 10));
+ });
+ this.addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => {
await this.executeExitHandlers(isSessionEnd);
process.exit(0);
});
@@ -110,7 +115,7 @@ export class ServerWorker {
private proactiveUnplannedExit = async (error: Error): Promise<void> => {
this.shouldServerBeResponsive = false;
// communicates via IPC to the master thread that it should dispatch a crash notification email
- this.emitToMonitor(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, { error });
+ this.emitToMonitor(Monitor.IntrinsicEvents.CrashDetected, { error });
await this.executeExitHandlers(error);
// notify master thread (which will log update in the console) of crash event via IPC
this.lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`));
@@ -130,7 +135,7 @@ export class ServerWorker {
if (!this.shouldServerBeResponsive) {
// notify monitor thread that the server is up and running
this.lifecycleNotification(green(`listening on ${this.serverPort}...`));
- this.emitToMonitor(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, { firstTime: !this.isInitialized });
+ this.emitToMonitor(Monitor.IntrinsicEvents.ServerRunning, { firstTime: !this.isInitialized });
this.isInitialized = true;
}
this.shouldServerBeResponsive = true;