aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/server/ApiManagers/SessionManager.ts9
-rw-r--r--src/server/DashSession/DashSessionAgent.ts4
-rw-r--r--src/server/session/agents/monitor.ts16
-rw-r--r--src/server/session/agents/process_message_router.ts2
-rw-r--r--src/server/session/agents/promisified_ipc_manager.ts97
-rw-r--r--src/server/session/agents/server_worker.ts10
-rw-r--r--src/server/session/utilities/ipc.ts66
7 files changed, 115 insertions, 89 deletions
diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts
index a40b86dc5..d989d8d1b 100644
--- a/src/server/ApiManagers/SessionManager.ts
+++ b/src/server/ApiManagers/SessionManager.ts
@@ -30,13 +30,14 @@ export default class SessionManager extends ApiManager {
method: Method.GET,
subscription: this.secureSubscriber("debug", "mode?", "recipient?"),
secureHandler: this.authorizedAction(async ({ req, res }) => {
- let { mode } = req.params;
+ const { mode, recipient } = req.params;
if (mode && !["passive", "active"].includes(mode)) {
res.send(`Your request failed. '${mode}' is not a valid mode: please choose either 'active' or 'passive'`);
} else {
- !mode && (mode = "active");
- const recipient = req.params.recipient || DashSessionAgent.notificationRecipient;
- const response = await sessionAgent.serverWorker.emitToMonitorPromise("debug", { mode, recipient });
+ const response = await sessionAgent.serverWorker.emitToMonitorPromise("debug", {
+ mode: mode || "active",
+ recipient: recipient || DashSessionAgent.notificationRecipient
+ });
if (response instanceof Error) {
res.send(response);
} else {
diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts
index 3c98c1e9d..fe7cdae88 100644
--- a/src/server/DashSession/DashSessionAgent.ts
+++ b/src/server/DashSession/DashSessionAgent.ts
@@ -11,7 +11,7 @@ import { resolve } from "path";
import { AppliedSessionAgent, ExitHandler } from "../session/agents/applied_session_agent";
import { Monitor } from "../session/agents/monitor";
import { ServerWorker } from "../session/agents/server_worker";
-import { MessageHandler } from "../session/utilities/ipc";
+import { MessageHandler } from "../session/agents/promisified_ipc_manager";
/**
* If we're the monitor (master) thread, we should launch the monitor logic for the session.
@@ -110,7 +110,7 @@ export class DashSessionAgent extends AppliedSessionAgent {
content: this.generateCrashInstructions(crashCause)
});
if (error) {
- this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`));
+ this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} ${yellow(`(${error.message})`)}`));
mainLog(red("distribution of crash notification experienced errors"));
} else {
mainLog(green("successfully distributed crash notification to recipients"));
diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts
index d4abbb51e..5ea950b2b 100644
--- a/src/server/session/agents/monitor.ts
+++ b/src/server/session/agents/monitor.ts
@@ -2,13 +2,14 @@ import { ExitHandler } from "./applied_session_agent";
import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config";
import Repl, { ReplAction } from "../utilities/repl";
import { isWorker, setupMaster, on, Worker, fork } from "cluster";
-import { PromisifiedIPCManager, suffix, IPC, MessageHandler } from "../utilities/ipc";
+import { IPC_Promisify, MessageHandler } from "./promisified_ipc_manager";
import { red, cyan, white, yellow, blue } from "colors";
import { exec, ExecOptions } from "child_process";
import { validate, ValidationError } from "jsonschema";
import { Utilities } from "../utilities/utilities";
import { readFileSync } from "fs";
import ProcessMessageRouter from "./process_message_router";
+import { ServerWorker } from "./server_worker";
/**
* Validates and reads the configuration file, accordingly builds a child process factory
@@ -25,7 +26,7 @@ export class Monitor extends ProcessMessageRouter {
public static Create(sessionKey: string) {
if (isWorker) {
- IPC(process).emit("kill", {
+ ServerWorker.IPCManager.emit("kill", {
reason: "cannot create a monitor on the worker process.",
graceful: false,
errorCode: 1
@@ -210,7 +211,7 @@ export class Monitor extends ProcessMessageRouter {
repl.registerCommand("exit", [/clean|force/], args => this.killSession("manual exit requested by repl", args[0] === "clean", 0));
repl.registerCommand("restart", [/clean|force/], args => this.killActiveWorker(args[0] === "clean"));
repl.registerCommand("set", [letters, "port", number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === "true"));
- repl.registerCommand("set", [/polling/, number, boolean], async args => {
+ repl.registerCommand("set", [/polling/, number, boolean], args => {
const newPollingIntervalSeconds = Math.floor(Number(args[1]));
if (newPollingIntervalSeconds < 0) {
this.mainLog(red("the polling interval must be a non-negative integer"));
@@ -218,7 +219,7 @@ export class Monitor extends ProcessMessageRouter {
if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) {
this.config.polling.intervalSeconds = newPollingIntervalSeconds;
if (args[2] === "true") {
- return Monitor.IPCManager.emitPromise("updatePollingInterval", { newPollingIntervalSeconds });
+ Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds });
}
}
}
@@ -279,16 +280,13 @@ export class Monitor extends ProcessMessageRouter {
serverPort: ports.server,
socketPort: ports.socket,
pollingIntervalSeconds: intervalSeconds,
- session_key: this.key,
- ipc_suffix: suffix
+ session_key: this.key
});
- Monitor.IPCManager = IPC(this.activeWorker);
+ Monitor.IPCManager = IPC_Promisify(this.activeWorker, this.route);
this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`));
this.on("kill", ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode), true);
this.on("lifecycle", ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true);
-
- Monitor.IPCManager.setRouter(this.route);
}
}
diff --git a/src/server/session/agents/process_message_router.ts b/src/server/session/agents/process_message_router.ts
index f60343514..d359e97c3 100644
--- a/src/server/session/agents/process_message_router.ts
+++ b/src/server/session/agents/process_message_router.ts
@@ -1,4 +1,4 @@
-import { MessageHandler, PromisifiedIPCManager } from "../utilities/ipc";
+import { MessageHandler, PromisifiedIPCManager } from "./promisified_ipc_manager";
export default abstract class ProcessMessageRouter {
diff --git a/src/server/session/agents/promisified_ipc_manager.ts b/src/server/session/agents/promisified_ipc_manager.ts
new file mode 100644
index 000000000..216e9be44
--- /dev/null
+++ b/src/server/session/agents/promisified_ipc_manager.ts
@@ -0,0 +1,97 @@
+import { Utils } from "../../../Utils";
+import { isMaster } from "cluster";
+
+/**
+ * Convenience constructor
+ * @param target the process / worker to which to attach the specialized listeners
+ */
+export function IPC_Promisify(target: IPCTarget, router: Router) {
+ return new PromisifiedIPCManager(target, router);
+}
+
+/**
+ * Essentially, a node process or node cluster worker
+ */
+export type IPCTarget = NodeJS.EventEmitter & { send?: Function };
+
+/**
+ * Some external code that maps the name of incoming messages to registered handlers, if any
+ * when this returns, the message is assumed to have been handled in its entirety by the process, so
+ * await any asynchronous code inside this router.
+ */
+export type Router = (message: Message) => void | Promise<void>;
+
+/**
+ * Specifies a general message format for this API
+ */
+export type Message<T = any> = { name: string; args: T; };
+export type MessageHandler<T = any> = (args: T) => any | Promise<any>;
+
+/**
+ * When a message is emitted, it
+ */
+type InternalMessage = Message & { metadata: any };
+type InternalMessageHandler = (message: InternalMessage) => any | Promise<any>;
+
+/**
+ * This is a wrapper utility class that allows the caller process
+ * to emit an event and return a promise that resolves when it and all
+ * other processes listening to its emission of this event have completed.
+ */
+export class PromisifiedIPCManager {
+ private readonly target: IPCTarget;
+
+ constructor(target: IPCTarget, router: Router) {
+ this.target = target;
+ this.target.addListener("message", this.internalHandler(router));
+ }
+
+ /**
+ * A convenience wrapper around the standard process emission.
+ * Does not wait for a response.
+ */
+ public emit = async (name: string, args?: any) => this.target.send?.({ name, args });
+
+ /**
+ * This routine uniquely identifies each message, then adds a general
+ * message listener that waits for a response with the same id before resolving
+ * the promise.
+ */
+ public emitPromise = async (name: string, args?: any) => {
+ return new Promise(resolve => {
+ const messageId = Utils.GenerateGuid();
+ const responseHandler: InternalMessageHandler = ({ metadata: { id, isResponse }, args, name }) => {
+ if (isResponse && id === messageId) {
+ this.target.removeListener("message", responseHandler);
+ resolve(args?.error as Error | undefined);
+ }
+ };
+ this.target.addListener("message", responseHandler);
+ const message = { name, args, metadata: { id: messageId } };
+ this.target.send?.(message);
+ });
+ }
+
+ /**
+ * This routine receives a uniquely identified message. If the message is itself a response,
+ * it is ignored to avoid infinite mutual responses. Otherwise, the routine awaits its completion using whatever
+ * router the caller has installed, and then sends a response containing the original message id,
+ * which will ultimately invoke the responseHandler of the original emission and resolve the
+ * sender's promise.
+ */
+ private internalHandler = (router: Router) => async ({ name, args, metadata }: InternalMessage) => {
+ if (name && (!metadata || !metadata.isResponse)) {
+ let error: Error | undefined;
+ try {
+ await router({ name, args });
+ } catch (e) {
+ error = e;
+ }
+ if (metadata && this.target.send) {
+ metadata.isResponse = true;
+ this.target.send({ name, args: { error }, metadata });
+ }
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts
index 23ffb2650..705307030 100644
--- a/src/server/session/agents/server_worker.ts
+++ b/src/server/session/agents/server_worker.ts
@@ -1,6 +1,6 @@
import { ExitHandler } from "./applied_session_agent";
import { isMaster } from "cluster";
-import { PromisifiedIPCManager, Message, MessageHandler } from "../utilities/ipc";
+import { PromisifiedIPCManager } from "./promisified_ipc_manager";
import ProcessMessageRouter from "./process_message_router";
import { red, green, white, yellow } from "colors";
import { get } from "request-promise";
@@ -61,7 +61,7 @@ export class ServerWorker extends ProcessMessageRouter {
private constructor(work: Function) {
super();
- ServerWorker.IPCManager = new PromisifiedIPCManager(process);
+ ServerWorker.IPCManager = new PromisifiedIPCManager(process, this.route);
this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`));
const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env;
@@ -80,12 +80,8 @@ export class ServerWorker extends ProcessMessageRouter {
* server process.
*/
private configureProcess = () => {
- ServerWorker.IPCManager.setRouter(this.route);
// updates the local values of variables to the those sent from master
- this.on("updatePollingInterval", ({ newPollingIntervalSeconds }) => {
- this.pollingIntervalSeconds = newPollingIntervalSeconds;
- return new Promise<void>(resolve => setTimeout(resolve, 1000 * 10));
- });
+ this.on("updatePollingInterval", ({ newPollingIntervalSeconds }) => this.pollingIntervalSeconds = newPollingIntervalSeconds);
this.on("manualExit", async ({ isSessionEnd }) => {
await this.executeExitHandlers(isSessionEnd);
process.exit(0);
diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts
deleted file mode 100644
index c90b15907..000000000
--- a/src/server/session/utilities/ipc.ts
+++ /dev/null
@@ -1,66 +0,0 @@
-import { isMaster } from "cluster";
-import { Utils } from "../../../Utils";
-
-export function IPC(target: IPCTarget) {
- return new PromisifiedIPCManager(target);
-}
-
-export type IPCTarget = NodeJS.EventEmitter & { send?: Function };
-export type Router = (message: Message) => void | Promise<void>;
-
-export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix;
-
-type InternalMessage<T = any> = Message<T> & { metadata: any };
-
-export interface Message<T = any> {
- name: string;
- args: T;
-}
-
-export type MessageHandler<T = any> = (message: T) => any | Promise<any>;
-
-export class PromisifiedIPCManager {
- private readonly target: IPCTarget;
- private readonly ipc_id = `ipc_id_${suffix}`;
- private readonly is_response = `is_response_${suffix}`;
-
- constructor(target: IPCTarget) {
- this.target = target;
- }
-
- public emit = async (name: string, args?: any) => this.target.send?.({ name, args });
-
- public emitPromise = async (name: string, args?: any) => {
- return new Promise(resolve => {
- const messageId = Utils.GenerateGuid();
- const metadata: any = {};
- metadata[this.ipc_id] = messageId;
- const responseHandler: MessageHandler<any> = ({ metadata, args }) => {
- if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) {
- this.target.removeListener("message", responseHandler);
- resolve(args?.error as Error | undefined);
- }
- };
- this.target.addListener("message", responseHandler);
- this.target.send?.({ name, args, metadata });
- });
- }
-
- public setRouter = (router: Router) => {
- this.target.addListener("message", async ({ name, args, metadata }: InternalMessage) => {
- if (name && (!metadata || !metadata[this.is_response])) {
- let error: Error | undefined;
- try {
- await router({ name, args });
- } catch (e) {
- error = e;
- }
- if (metadata && this.target.send) {
- metadata[this.is_response] = true;
- this.target.send({ name, args: { error }, metadata });
- }
- }
- });
- }
-
-} \ No newline at end of file