aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Wilkins <samwilkins333@gmail.com>2020-01-11 13:42:06 -0500
committerSam Wilkins <samwilkins333@gmail.com>2020-01-11 13:42:06 -0500
commit86f1e0f58940904b8c55284f6787e7422a6665ff (patch)
treefaeba5e08d5a6fc99aa4f26cd246aca3af3bca5d
parent120fa84b3e8c794dd882d3613067c5b18ee7ba04 (diff)
refactor
-rw-r--r--src/server/ApiManagers/SessionManager.ts8
-rw-r--r--src/server/DashSession/DashSessionAgent.ts8
-rw-r--r--src/server/session/agents/monitor.ts17
-rw-r--r--src/server/session/agents/process_message_router.ts (renamed from src/server/session/agents/message_router.ts)7
-rw-r--r--src/server/session/agents/server_worker.ts18
-rw-r--r--src/server/session/utilities/ipc.ts39
6 files changed, 47 insertions, 50 deletions
diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts
index 91ef7e298..4513752a6 100644
--- a/src/server/ApiManagers/SessionManager.ts
+++ b/src/server/ApiManagers/SessionManager.ts
@@ -8,16 +8,16 @@ const permissionError = "You are not authorized!";
export default class SessionManager extends ApiManager {
- private secureSubscriber = (root: string, ...params: string[]) => new RouteSubscriber(root).add("password", ...params);
+ private secureSubscriber = (root: string, ...params: string[]) => new RouteSubscriber(root).add("sessionKey", ...params);
private authorizedAction = (handler: SecureHandler) => {
return (core: AuthorizedCore) => {
const { req, res, isRelease } = core;
- const { password } = req.params;
+ const { sessionKey } = req.params;
if (!isRelease) {
return res.send("This can be run only on the release server.");
}
- if (password !== process.env.session_key) {
+ if (sessionKey !== process.env.session_key) {
return _permission_denied(res, permissionError);
}
return handler(core);
@@ -33,7 +33,7 @@ export default class SessionManager extends ApiManager {
const { mode } = req.params;
if (["passive", "active"].includes(mode)) {
const recipient = req.params.recipient || DashSessionAgent.notificationRecipient;
- const response = await sessionAgent.serverWorker.emitToMonitor("debug", { mode, recipient }, true);
+ const response = await sessionAgent.serverWorker.emitToMonitorPromise("debug", { mode, recipient });
if (response instanceof Error) {
res.send(response);
} else {
diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts
index 23d421835..de8e7240f 100644
--- a/src/server/DashSession/DashSessionAgent.ts
+++ b/src/server/DashSession/DashSessionAgent.ts
@@ -11,7 +11,7 @@ import { resolve } from "path";
import { AppliedSessionAgent, ExitHandler } from "../session/agents/applied_session_agent";
import { Monitor } from "../session/agents/monitor";
import { ServerWorker } from "../session/agents/server_worker";
-import { Message } from "../session/utilities/ipc";
+import { MessageHandler } from "../session/utilities/ipc";
/**
* If we're the monitor (master) thread, we should launch the monitor logic for the session.
@@ -34,8 +34,8 @@ export class DashSessionAgent extends AppliedSessionAgent {
monitor.addReplCommand("backup", [], this.backup);
monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient));
monitor.on("backup", this.backup);
- monitor.on("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient));
- monitor.hooks.crashDetected(this.dispatchCrashReport);
+ monitor.on("debug", ({ mode, recipient }) => this.dispatchZippedDebugBackup(mode, recipient));
+ monitor.coreHooks.onCrashDetected(this.dispatchCrashReport);
}
/**
@@ -101,7 +101,7 @@ export class DashSessionAgent extends AppliedSessionAgent {
/**
* This sends an email with the generated crash report.
*/
- private dispatchCrashReport = async ({ args: { error: crashCause } }: Message) => {
+ private dispatchCrashReport: MessageHandler<{ error: Error }> = async ({ error: crashCause }) => {
const { mainLog } = this.sessionMonitor;
const { notificationRecipient } = DashSessionAgent;
const error = await Email.dispatch({
diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts
index ccba8199e..d4abbb51e 100644
--- a/src/server/session/agents/monitor.ts
+++ b/src/server/session/agents/monitor.ts
@@ -8,14 +8,13 @@ import { exec, ExecOptions } from "child_process";
import { validate, ValidationError } from "jsonschema";
import { Utilities } from "../utilities/utilities";
import { readFileSync } from "fs";
-import MessageRouter from "./message_router";
+import ProcessMessageRouter from "./process_message_router";
/**
* Validates and reads the configuration file, accordingly builds a child process factory
* and spawns off an initial process that will respawn as predecessors die.
*/
-export class Monitor extends MessageRouter {
- private static IPCManager: PromisifiedIPCManager;
+export class Monitor extends ProcessMessageRouter {
private static count = 0;
private finalized = false;
private exitHandlers: ExitHandler[] = [];
@@ -84,9 +83,9 @@ export class Monitor extends MessageRouter {
this.spawn();
}
- public readonly hooks = Object.freeze({
- crashDetected: (listener: MessageHandler<{ error: Error }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener),
- serverRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener)
+ public readonly coreHooks = Object.freeze({
+ onCrashDetected: (listener: MessageHandler<{ error: Error }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener),
+ onServerRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener)
});
/**
@@ -219,7 +218,7 @@ export class Monitor extends MessageRouter {
if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) {
this.config.polling.intervalSeconds = newPollingIntervalSeconds;
if (args[2] === "true") {
- return Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds }, true);
+ return Monitor.IPCManager.emitPromise("updatePollingInterval", { newPollingIntervalSeconds });
}
}
}
@@ -286,8 +285,8 @@ export class Monitor extends MessageRouter {
Monitor.IPCManager = IPC(this.activeWorker);
this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`));
- this.on("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true);
- this.on("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true);
+ this.on("kill", ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode), true);
+ this.on("lifecycle", ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true);
Monitor.IPCManager.setRouter(this.route);
}
diff --git a/src/server/session/agents/message_router.ts b/src/server/session/agents/process_message_router.ts
index 707f771d9..f60343514 100644
--- a/src/server/session/agents/message_router.ts
+++ b/src/server/session/agents/process_message_router.ts
@@ -1,7 +1,8 @@
-import { MessageHandler, Message } from "../utilities/ipc";
+import { MessageHandler, PromisifiedIPCManager } from "../utilities/ipc";
-export default abstract class MessageRouter {
+export default abstract class ProcessMessageRouter {
+ protected static IPCManager: PromisifiedIPCManager;
private onMessage: { [name: string]: MessageHandler[] | undefined } = {};
/**
@@ -38,7 +39,7 @@ export default abstract class MessageRouter {
protected route: MessageHandler = async ({ name, args }) => {
const handlers = this.onMessage[name];
if (handlers) {
- await Promise.all(handlers.map(handler => handler({ name, args })));
+ await Promise.all(handlers.map(handler => handler(args)));
}
}
diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts
index 50abe398d..23ffb2650 100644
--- a/src/server/session/agents/server_worker.ts
+++ b/src/server/session/agents/server_worker.ts
@@ -1,7 +1,7 @@
import { ExitHandler } from "./applied_session_agent";
import { isMaster } from "cluster";
-import { PromisifiedIPCManager, Message } from "../utilities/ipc";
-import MessageRouter from "./message_router";
+import { PromisifiedIPCManager, Message, MessageHandler } from "../utilities/ipc";
+import ProcessMessageRouter from "./process_message_router";
import { red, green, white, yellow } from "colors";
import { get } from "request-promise";
import { Monitor } from "./monitor";
@@ -11,8 +11,7 @@ import { Monitor } from "./monitor";
* if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification
* email if the server encounters an uncaught exception or if the server cannot be reached.
*/
-export class ServerWorker extends MessageRouter {
- private static IPCManager = new PromisifiedIPCManager(process);
+export class ServerWorker extends ProcessMessageRouter {
private static count = 0;
private shouldServerBeResponsive = false;
private exitHandlers: ExitHandler[] = [];
@@ -56,10 +55,13 @@ export class ServerWorker extends MessageRouter {
* A convenience wrapper to tell the session monitor (parent process)
* to carry out the action with the specified message and arguments.
*/
- public emitToMonitor = (name: string, args?: any, awaitResponse = false) => ServerWorker.IPCManager.emit(name, args, awaitResponse);
+ public emitToMonitor = (name: string, args?: any) => ServerWorker.IPCManager.emit(name, args);
+
+ public emitToMonitorPromise = (name: string, args?: any) => ServerWorker.IPCManager.emitPromise(name, args);
private constructor(work: Function) {
super();
+ ServerWorker.IPCManager = new PromisifiedIPCManager(process);
this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`));
const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env;
@@ -80,11 +82,11 @@ export class ServerWorker extends MessageRouter {
private configureProcess = () => {
ServerWorker.IPCManager.setRouter(this.route);
// updates the local values of variables to the those sent from master
- this.on("updatePollingInterval", ({ args }: Message<{ newPollingIntervalSeconds: number }>) => {
- this.pollingIntervalSeconds = args.newPollingIntervalSeconds;
+ this.on("updatePollingInterval", ({ newPollingIntervalSeconds }) => {
+ this.pollingIntervalSeconds = newPollingIntervalSeconds;
return new Promise<void>(resolve => setTimeout(resolve, 1000 * 10));
});
- this.on("manualExit", async ({ args: { isSessionEnd } }: Message<{ isSessionEnd: boolean }>) => {
+ this.on("manualExit", async ({ isSessionEnd }) => {
await this.executeExitHandlers(isSessionEnd);
process.exit(0);
});
diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts
index 7ad00596d..db4c23180 100644
--- a/src/server/session/utilities/ipc.ts
+++ b/src/server/session/utilities/ipc.ts
@@ -11,7 +11,7 @@ export interface Message<T = any> {
args: T;
}
type InternalMessage<T = any> = Message<T> & { metadata: any };
-export type MessageHandler<A = any, T extends Message<A> = Message<A>> = (message: T) => any | Promise<any>;
+export type MessageHandler<T = any> = (message: T) => any | Promise<any>;
export class PromisifiedIPCManager {
private readonly target: IPCTarget;
@@ -22,27 +22,22 @@ export class PromisifiedIPCManager {
this.target = target;
}
- public emit = async (name: string, args?: any, awaitResponse = false): Promise<Error | undefined> => {
- if (!this.target.send) {
- return new Error("Cannot dispatch when send is undefined.");
- }
- if (awaitResponse) {
- return new Promise(resolve => {
- const messageId = Utils.GenerateGuid();
- const metadata: any = {};
- metadata[this.ipc_id] = messageId;
- const responseHandler: MessageHandler<any, InternalMessage> = ({ metadata, args }) => {
- if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) {
- this.target.removeListener("message", responseHandler);
- resolve(args?.error as Error | undefined);
- }
- };
- this.target.addListener("message", responseHandler);
- this.target.send?.({ name, args, metadata });
- });
- } else {
- this.target.send?.({ name, args });
- }
+ public emit = async (name: string, args?: any) => this.target.send?.({ name, args });
+
+ public emitPromise = async (name: string, args?: any) => {
+ return new Promise(resolve => {
+ const messageId = Utils.GenerateGuid();
+ const metadata: any = {};
+ metadata[this.ipc_id] = messageId;
+ const responseHandler: MessageHandler<any> = ({ metadata, args }) => {
+ if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) {
+ this.target.removeListener("message", responseHandler);
+ resolve(args?.error as Error | undefined);
+ }
+ };
+ this.target.addListener("message", responseHandler);
+ this.target.send?.({ name, args, metadata });
+ });
}
public setRouter = (router: Router) => {