aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSam Wilkins <samwilkins333@gmail.com>2020-01-11 09:54:48 -0500
committerSam Wilkins <samwilkins333@gmail.com>2020-01-11 09:54:48 -0500
commit27c93abd49ca8a519d2aa3cf7938434fe25947d7 (patch)
tree14f137aa8332ef1ce0cf95ff5cde52246637d45f /src
parent7a20f573f4f428bfc779797d437fa9525b6976f8 (diff)
extends message, removed duplicate handlers, IPC streamlined
Diffstat (limited to 'src')
-rw-r--r--src/server/ApiManagers/SessionManager.ts4
-rw-r--r--src/server/DashSession/DashSessionAgent.ts15
-rw-r--r--src/server/session/agents/applied_session_agent.ts8
-rw-r--r--src/server/session/agents/message_router.ts45
-rw-r--r--src/server/session/agents/monitor.ts30
-rw-r--r--src/server/session/agents/server_worker.ts17
-rw-r--r--src/server/session/utilities/ipc.ts93
7 files changed, 116 insertions, 96 deletions
diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts
index 21103fdd5..91ef7e298 100644
--- a/src/server/ApiManagers/SessionManager.ts
+++ b/src/server/ApiManagers/SessionManager.ts
@@ -33,7 +33,7 @@ export default class SessionManager extends ApiManager {
const { mode } = req.params;
if (["passive", "active"].includes(mode)) {
const recipient = req.params.recipient || DashSessionAgent.notificationRecipient;
- const response = await sessionAgent.serverWorker.sendMonitorAction("debug", { mode, recipient }, true);
+ const response = await sessionAgent.serverWorker.emitToMonitor("debug", { mode, recipient }, true);
if (response instanceof Error) {
res.send(response);
} else {
@@ -49,7 +49,7 @@ export default class SessionManager extends ApiManager {
method: Method.GET,
subscription: this.secureSubscriber("backup"),
secureHandler: this.authorizedAction(async ({ res }) => {
- const response = await sessionAgent.serverWorker.sendMonitorAction("backup");
+ const response = await sessionAgent.serverWorker.emitToMonitor("backup");
if (response instanceof Error) {
res.send(response);
} else {
diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts
index 0d9486757..b7e741525 100644
--- a/src/server/DashSession/DashSessionAgent.ts
+++ b/src/server/DashSession/DashSessionAgent.ts
@@ -11,6 +11,7 @@ import { resolve } from "path";
import { AppliedSessionAgent, ExitHandler } from "../session/agents/applied_session_agent";
import { Monitor } from "../session/agents/monitor";
import { ServerWorker } from "../session/agents/server_worker";
+import { Message } from "../session/utilities/ipc";
/**
* If we're the monitor (master) thread, we should launch the monitor logic for the session.
@@ -26,14 +27,14 @@ export class DashSessionAgent extends AppliedSessionAgent {
* The core method invoked when the single master thread is initialized.
* Installs event hooks, repl commands and additional IPC listeners.
*/
- protected async initializeMonitor(monitor: Monitor) {
+ protected async initializeMonitor(monitor: Monitor, sessionKey: string) {
+ await this.dispatchSessionPassword(sessionKey);
monitor.addReplCommand("pull", [], () => monitor.exec("git pull"));
monitor.addReplCommand("solr", [/start|stop|index/], this.executeSolrCommand);
monitor.addReplCommand("backup", [], this.backup);
monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient));
- monitor.addServerMessageListener("backup", this.backup);
- monitor.addServerMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient));
- monitor.onKeyGenerated(this.dispatchSessionPassword);
+ monitor.addMessageListener("backup", this.backup);
+ monitor.addMessageListener("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient));
monitor.onCrashDetected(this.dispatchCrashReport);
}
@@ -80,14 +81,14 @@ export class DashSessionAgent extends AppliedSessionAgent {
* This sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone
* to kill the server via the /kill/:key route.
*/
- private dispatchSessionPassword = async (key: string) => {
+ private dispatchSessionPassword = async (sessionKey: string) => {
const { mainLog } = this.sessionMonitor;
const { notificationRecipient } = DashSessionAgent;
mainLog(green("dispatching session key..."));
const error = await Email.dispatch({
to: notificationRecipient,
subject: "Dash Release Session Admin Authentication Key",
- content: `The key for this session (started @ ${new Date().toUTCString()}) is ${key}.\n\n${this.signature}`
+ content: `The key for this session (started @ ${new Date().toUTCString()}) is ${sessionKey}.\n\n${this.signature}`
});
if (error) {
this.sessionMonitor.mainLog(red(`dispatch failure @ ${notificationRecipient} (${yellow(error.message)})`));
@@ -100,7 +101,7 @@ export class DashSessionAgent extends AppliedSessionAgent {
/**
* This sends an email with the generated crash report.
*/
- private dispatchCrashReport = async (crashCause: Error) => {
+ private dispatchCrashReport = async ({ args: { error: crashCause } }: Message) => {
const { mainLog } = this.sessionMonitor;
const { notificationRecipient } = DashSessionAgent;
const error = await Email.dispatch({
diff --git a/src/server/session/agents/applied_session_agent.ts b/src/server/session/agents/applied_session_agent.ts
index 53293d3bf..48226dab6 100644
--- a/src/server/session/agents/applied_session_agent.ts
+++ b/src/server/session/agents/applied_session_agent.ts
@@ -1,6 +1,7 @@
import { isMaster } from "cluster";
import { Monitor } from "./monitor";
import { ServerWorker } from "./server_worker";
+import { Utils } from "../../../Utils";
export type ExitHandler = (reason: Error | boolean) => void | Promise<void>;
@@ -8,7 +9,7 @@ export abstract class AppliedSessionAgent {
// the following two methods allow the developer to create a custom
// session and use the built in customization options for each thread
- protected abstract async initializeMonitor(monitor: Monitor): Promise<void>;
+ protected abstract async initializeMonitor(monitor: Monitor, key: string): Promise<void>;
protected abstract async initializeServerWorker(): Promise<ServerWorker>;
private launched = false;
@@ -21,7 +22,7 @@ export abstract class AppliedSessionAgent {
private sessionMonitorRef: Monitor | undefined;
public get sessionMonitor(): Monitor {
if (!isMaster) {
- this.serverWorker.sendMonitorAction("kill", {
+ this.serverWorker.emitToMonitor("kill", {
graceful: false,
reason: "Cannot access the session monitor directly from the server worker thread.",
errorCode: 1
@@ -43,7 +44,8 @@ export abstract class AppliedSessionAgent {
if (!this.launched) {
this.launched = true;
if (isMaster) {
- await this.initializeMonitor(this.sessionMonitorRef = Monitor.Create());
+ const sessionKey = Utils.GenerateGuid();
+ await this.initializeMonitor(this.sessionMonitorRef = Monitor.Create(sessionKey), sessionKey);
this.sessionMonitorRef.finalize();
} else {
this.serverWorkerRef = await this.initializeServerWorker();
diff --git a/src/server/session/agents/message_router.ts b/src/server/session/agents/message_router.ts
new file mode 100644
index 000000000..5848e27ab
--- /dev/null
+++ b/src/server/session/agents/message_router.ts
@@ -0,0 +1,45 @@
+import { MessageHandler, Message } from "../utilities/ipc";
+
+export default abstract class MessageRouter {
+
+ private onMessage: { [name: string]: MessageHandler[] | undefined } = {};
+
+ /**
+ * Add a listener at this message. When the monitor process
+ * receives a message, it will invoke all registered functions.
+ */
+ public addMessageListener = (name: string, handler: MessageHandler, exclusive = false) => {
+ const handlers = this.onMessage[name];
+ if (exclusive || !handlers) {
+ this.onMessage[name] = [handler];
+ } else {
+ handlers.push(handler);
+ }
+ }
+
+ /**
+ * Unregister a given listener at this message.
+ */
+ public removeMessageListener = (name: string, handler: MessageHandler) => {
+ const handlers = this.onMessage[name];
+ if (handlers) {
+ const index = handlers.indexOf(handler);
+ if (index > -1) {
+ handlers.splice(index, 1);
+ }
+ }
+ }
+
+ /**
+ * Unregister all listeners at this message.
+ */
+ public clearMessageListeners = (...names: string[]) => names.map(name => this.onMessage[name] = undefined);
+
+ protected route: MessageHandler = async ({ name, args }) => {
+ const handlers = this.onMessage[name];
+ if (handlers) {
+ await Promise.all(handlers.map(handler => handler({ name, args })));
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts
index 18fa6df24..96f1f8130 100644
--- a/src/server/session/agents/monitor.ts
+++ b/src/server/session/agents/monitor.ts
@@ -2,20 +2,19 @@ import { ExitHandler } from "./applied_session_agent";
import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config";
import Repl, { ReplAction } from "../utilities/repl";
import { isWorker, setupMaster, on, Worker, fork } from "cluster";
-import { PromisifiedIPCManager, suffix, IPC, Message } from "../utilities/ipc";
+import { PromisifiedIPCManager, suffix, IPC, MessageHandler, Message } from "../utilities/ipc";
import { red, cyan, white, yellow, blue } from "colors";
import { exec, ExecOptions } from "child_process";
-import { Utils } from "../../../Utils";
import { validate, ValidationError } from "jsonschema";
import { Utilities } from "../utilities/utilities";
import { readFileSync } from "fs";
-import { EventEmitter } from "events";
+import MessageRouter from "./message_router";
/**
* Validates and reads the configuration file, accordingly builds a child process factory
* and spawns off an initial process that will respawn as predecessors die.
*/
-export class Monitor extends EventEmitter {
+export class Monitor extends MessageRouter {
private static IPCManager: PromisifiedIPCManager;
private static count = 0;
private finalized = false;
@@ -25,7 +24,7 @@ export class Monitor extends EventEmitter {
private key: string | undefined;
private repl: Repl;
- public static Create() {
+ public static Create(sessionKey: string) {
if (isWorker) {
IPC(process).emit("kill", {
reason: "cannot create a monitor on the worker process.",
@@ -37,13 +36,12 @@ export class Monitor extends EventEmitter {
console.error(red("cannot create more than one monitor."));
process.exit(1);
} else {
- return new Monitor();
+ return new Monitor(sessionKey);
}
}
- public onCrashDetected = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener);
- public onKeyGenerated = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.KeyGenerated, listener);
- public onServerRunning = (listener: (...args: any[]) => void) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener);
+ public onCrashDetected = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.CrashDetected, listener);
+ public onServerRunning = (listener: MessageHandler) => this.addMessageListener(Monitor.IntrinsicEvents.ServerRunning, listener);
/**
* Kill this session and its active child
@@ -93,10 +91,10 @@ export class Monitor extends EventEmitter {
});
}
- private constructor() {
+ private constructor(sessionKey: string) {
super();
-
console.log(this.timestamp(), cyan("initializing session..."));
+ this.key = sessionKey;
this.config = this.loadAndValidateConfiguration();
// determines whether or not we see the compilation / initialization / runtime output of each child server process
@@ -131,7 +129,6 @@ export class Monitor extends EventEmitter {
throw new Error("Session monitor is already finalized");
}
this.finalized = true;
- this.emit(Monitor.IntrinsicEvents.KeyGenerated, this.key = Utils.GenerateGuid());
this.spawn();
}
@@ -284,11 +281,10 @@ export class Monitor extends EventEmitter {
Monitor.IPCManager = IPC(this.activeWorker);
this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`));
- const { addMessageListener } = Monitor.IPCManager;
- addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode));
- addMessageListener(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, ({ args: { error } }) => this.emit(Monitor.IntrinsicEvents.CrashDetected, error));
- addMessageListener(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, ({ args: { firstTime } }) => this.emit(Monitor.IntrinsicEvents.ServerRunning, firstTime));
- addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`));
+ this.addMessageListener("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true);
+ this.addMessageListener("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true);
+
+ Monitor.IPCManager.setRouter(this.route);
}
}
diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts
index 9e471366a..01e1cf971 100644
--- a/src/server/session/agents/server_worker.ts
+++ b/src/server/session/agents/server_worker.ts
@@ -1,6 +1,7 @@
import { ExitHandler } from "./applied_session_agent";
import { isMaster } from "cluster";
import { PromisifiedIPCManager } from "../utilities/ipc";
+import MessageRouter from "./message_router";
import { red, green, white, yellow } from "colors";
import { get } from "request-promise";
import { Monitor } from "./monitor";
@@ -10,7 +11,7 @@ import { Monitor } from "./monitor";
* if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification
* email if the server encounters an uncaught exception or if the server cannot be reached.
*/
-export class ServerWorker {
+export class ServerWorker extends MessageRouter {
private static IPCManager = new PromisifiedIPCManager(process);
private static count = 0;
private shouldServerBeResponsive = false;
@@ -58,6 +59,7 @@ export class ServerWorker {
public emitToMonitor = (name: string, args?: any, expectResponse = false) => ServerWorker.IPCManager.emit(name, args, expectResponse);
private constructor(work: Function) {
+ super();
this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`));
const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env;
@@ -76,10 +78,13 @@ export class ServerWorker {
* server process.
*/
private configureProcess = () => {
+ ServerWorker.IPCManager.setRouter(this.route);
// updates the local values of variables to the those sent from master
- const { addMessageListener } = ServerWorker.IPCManager;
- addMessageListener("updatePollingInterval", ({ args }) => this.pollingIntervalSeconds = args.newPollingIntervalSeconds);
- addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => {
+ this.addMessageListener("updatePollingInterval", ({ args }) => {
+ this.pollingIntervalSeconds = args.newPollingIntervalSeconds;
+ return new Promise<void>(resolve => setTimeout(resolve, 1000 * 10));
+ });
+ this.addMessageListener("manualExit", async ({ args: { isSessionEnd } }) => {
await this.executeExitHandlers(isSessionEnd);
process.exit(0);
});
@@ -110,7 +115,7 @@ export class ServerWorker {
private proactiveUnplannedExit = async (error: Error): Promise<void> => {
this.shouldServerBeResponsive = false;
// communicates via IPC to the master thread that it should dispatch a crash notification email
- this.emitToMonitor(`notify_${Monitor.IntrinsicEvents.CrashDetected}`, { error });
+ this.emitToMonitor(Monitor.IntrinsicEvents.CrashDetected, { error });
await this.executeExitHandlers(error);
// notify master thread (which will log update in the console) of crash event via IPC
this.lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`));
@@ -130,7 +135,7 @@ export class ServerWorker {
if (!this.shouldServerBeResponsive) {
// notify monitor thread that the server is up and running
this.lifecycleNotification(green(`listening on ${this.serverPort}...`));
- this.emitToMonitor(`notify_${Monitor.IntrinsicEvents.ServerRunning}`, { firstTime: !this.isInitialized });
+ this.emitToMonitor(Monitor.IntrinsicEvents.ServerRunning, { firstTime: !this.isInitialized });
this.isInitialized = true;
}
this.shouldServerBeResponsive = true;
diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts
index 2faf9f63e..37aaa6757 100644
--- a/src/server/session/utilities/ipc.ts
+++ b/src/server/session/utilities/ipc.ts
@@ -8,95 +8,66 @@ export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix;
export interface Message {
name: string;
- args: any;
+ args?: any;
}
+type InternalMessage = Message & { metadata: any };
-export type MessageHandler = (message: Message) => any | Promise<any>;
+export type MessageHandler<T extends Message = Message> = (message: T) => any | Promise<any>;
export class PromisifiedIPCManager {
- private onMessage: { [message: string]: MessageHandler[] | undefined } = {};
private readonly target: IPCTarget;
private readonly ipc_id = `ipc_id_${suffix}`;
- private readonly response_expected = `response_expected_${suffix}`;
private readonly is_response = `is_response_${suffix}`;
constructor(target: IPCTarget) {
this.target = target;
-
- this.target.addListener("message", async ({ name, args }: Message) => {
- let error: Error | undefined;
- try {
- const handlers = this.onMessage[name];
- if (handlers) {
- await Promise.all(handlers.map(handler => handler({ name, args })));
- }
- } catch (e) {
- error = e;
- }
- if (args[this.response_expected] && this.target.send) {
- const response: any = { error };
- response[this.ipc_id] = args[this.ipc_id];
- response[this.is_response] = true;
- this.target.send(response);
- }
- });
- }
-
- /**
- * Add a listener at this message. When the monitor process
- * receives a message, it will invoke all registered functions.
- */
- public addMessageListener = (name: string, handler: MessageHandler) => {
- const handlers = this.onMessage[name];
- if (handlers) {
- handlers.push(handler);
- } else {
- this.onMessage[name] = [handler];
- }
- }
-
- /**
- * Unregister a given listener at this message.
- */
- public removeMessageListener = (name: string, handler: MessageHandler) => {
- const handlers = this.onMessage[name];
- if (handlers) {
- const index = handlers.indexOf(handler);
- if (index > -1) {
- handlers.splice(index, 1);
- }
- }
}
- /**
- * Unregister all listeners at this message.
- */
- public clearMessageListeners = (message: string) => this.onMessage[message] = undefined;
-
- public emit = async (name: string, args: any, expectResponse = false): Promise<Error | undefined> => {
+ public emit = async (name: string, args?: any, expectResponse = false): Promise<Error | undefined> => {
if (!this.target.send) {
return new Error("Cannot dispatch when send is undefined.");
}
- args[this.response_expected] = expectResponse;
if (expectResponse) {
return new Promise(resolve => {
const messageId = Utils.GenerateGuid();
- args[this.ipc_id] = messageId;
- const responseHandler: (args: any) => void = response => {
- const { error } = response;
- if (response[this.is_response] && response[this.ipc_id] === messageId) {
+ const metadata: any = {};
+ metadata[this.ipc_id] = messageId;
+ const responseHandler: MessageHandler<InternalMessage> = ({ args, metadata }) => {
+ if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) {
+ const { error } = args;
this.target.removeListener("message", responseHandler);
resolve(error);
}
};
this.target.addListener("message", responseHandler);
- this.target.send!({ name, args });
+ this.target.send?.({ name, args, metadata });
});
} else {
- this.target.send({ name, args });
+ this.target.send?.({ name, args });
}
}
+ public setRouter = (router: Router) => {
+ this.target.addListener("message", async ({ name, args, metadata }: InternalMessage) => {
+ if (name && (!metadata || !metadata[this.is_response])) {
+ let error: Error | undefined;
+ try {
+ await router({ name, args });
+ } catch (e) {
+ error = e;
+ }
+ if (metadata && this.target.send) {
+ metadata[this.is_response] = true;
+ this.target.send({
+ name,
+ args: { error },
+ metadata
+ });
+ }
+ }
+ });
+ }
+
}
export function IPC(target: IPCTarget) {