diff options
author | Sam Wilkins <samwilkins333@gmail.com> | 2020-01-11 13:42:06 -0500 |
---|---|---|
committer | Sam Wilkins <samwilkins333@gmail.com> | 2020-01-11 13:42:06 -0500 |
commit | 86f1e0f58940904b8c55284f6787e7422a6665ff (patch) | |
tree | faeba5e08d5a6fc99aa4f26cd246aca3af3bca5d | |
parent | 120fa84b3e8c794dd882d3613067c5b18ee7ba04 (diff) |
refactor
-rw-r--r-- | src/server/ApiManagers/SessionManager.ts | 8 | ||||
-rw-r--r-- | src/server/DashSession/DashSessionAgent.ts | 8 | ||||
-rw-r--r-- | src/server/session/agents/monitor.ts | 17 | ||||
-rw-r--r-- | src/server/session/agents/process_message_router.ts (renamed from src/server/session/agents/message_router.ts) | 7 | ||||
-rw-r--r-- | src/server/session/agents/server_worker.ts | 18 | ||||
-rw-r--r-- | src/server/session/utilities/ipc.ts | 39 |
6 files changed, 47 insertions, 50 deletions
diff --git a/src/server/ApiManagers/SessionManager.ts b/src/server/ApiManagers/SessionManager.ts index 91ef7e298..4513752a6 100644 --- a/src/server/ApiManagers/SessionManager.ts +++ b/src/server/ApiManagers/SessionManager.ts @@ -8,16 +8,16 @@ const permissionError = "You are not authorized!"; export default class SessionManager extends ApiManager { - private secureSubscriber = (root: string, ...params: string[]) => new RouteSubscriber(root).add("password", ...params); + private secureSubscriber = (root: string, ...params: string[]) => new RouteSubscriber(root).add("sessionKey", ...params); private authorizedAction = (handler: SecureHandler) => { return (core: AuthorizedCore) => { const { req, res, isRelease } = core; - const { password } = req.params; + const { sessionKey } = req.params; if (!isRelease) { return res.send("This can be run only on the release server."); } - if (password !== process.env.session_key) { + if (sessionKey !== process.env.session_key) { return _permission_denied(res, permissionError); } return handler(core); @@ -33,7 +33,7 @@ export default class SessionManager extends ApiManager { const { mode } = req.params; if (["passive", "active"].includes(mode)) { const recipient = req.params.recipient || DashSessionAgent.notificationRecipient; - const response = await sessionAgent.serverWorker.emitToMonitor("debug", { mode, recipient }, true); + const response = await sessionAgent.serverWorker.emitToMonitorPromise("debug", { mode, recipient }); if (response instanceof Error) { res.send(response); } else { diff --git a/src/server/DashSession/DashSessionAgent.ts b/src/server/DashSession/DashSessionAgent.ts index 23d421835..de8e7240f 100644 --- a/src/server/DashSession/DashSessionAgent.ts +++ b/src/server/DashSession/DashSessionAgent.ts @@ -11,7 +11,7 @@ import { resolve } from "path"; import { AppliedSessionAgent, ExitHandler } from "../session/agents/applied_session_agent"; import { Monitor } from "../session/agents/monitor"; import { ServerWorker } from "../session/agents/server_worker"; -import { Message } from "../session/utilities/ipc"; +import { MessageHandler } from "../session/utilities/ipc"; /** * If we're the monitor (master) thread, we should launch the monitor logic for the session. @@ -34,8 +34,8 @@ export class DashSessionAgent extends AppliedSessionAgent { monitor.addReplCommand("backup", [], this.backup); monitor.addReplCommand("debug", [/active|passive/, /\S+\@\S+/], async ([mode, recipient]) => this.dispatchZippedDebugBackup(mode, recipient)); monitor.on("backup", this.backup); - monitor.on("debug", ({ args: { mode, recipient } }) => this.dispatchZippedDebugBackup(mode, recipient)); - monitor.hooks.crashDetected(this.dispatchCrashReport); + monitor.on("debug", ({ mode, recipient }) => this.dispatchZippedDebugBackup(mode, recipient)); + monitor.coreHooks.onCrashDetected(this.dispatchCrashReport); } /** @@ -101,7 +101,7 @@ export class DashSessionAgent extends AppliedSessionAgent { /** * This sends an email with the generated crash report. */ - private dispatchCrashReport = async ({ args: { error: crashCause } }: Message) => { + private dispatchCrashReport: MessageHandler<{ error: Error }> = async ({ error: crashCause }) => { const { mainLog } = this.sessionMonitor; const { notificationRecipient } = DashSessionAgent; const error = await Email.dispatch({ diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts index ccba8199e..d4abbb51e 100644 --- a/src/server/session/agents/monitor.ts +++ b/src/server/session/agents/monitor.ts @@ -8,14 +8,13 @@ import { exec, ExecOptions } from "child_process"; import { validate, ValidationError } from "jsonschema"; import { Utilities } from "../utilities/utilities"; import { readFileSync } from "fs"; -import MessageRouter from "./message_router"; +import ProcessMessageRouter from "./process_message_router"; /** * Validates and reads the configuration file, accordingly builds a child process factory * and spawns off an initial process that will respawn as predecessors die. */ -export class Monitor extends MessageRouter { - private static IPCManager: PromisifiedIPCManager; +export class Monitor extends ProcessMessageRouter { private static count = 0; private finalized = false; private exitHandlers: ExitHandler[] = []; @@ -84,9 +83,9 @@ export class Monitor extends MessageRouter { this.spawn(); } - public readonly hooks = Object.freeze({ - crashDetected: (listener: MessageHandler<{ error: Error }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener), - serverRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener) + public readonly coreHooks = Object.freeze({ + onCrashDetected: (listener: MessageHandler<{ error: Error }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener), + onServerRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener) }); /** @@ -219,7 +218,7 @@ export class Monitor extends MessageRouter { if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) { this.config.polling.intervalSeconds = newPollingIntervalSeconds; if (args[2] === "true") { - return Monitor.IPCManager.emit("updatePollingInterval", { newPollingIntervalSeconds }, true); + return Monitor.IPCManager.emitPromise("updatePollingInterval", { newPollingIntervalSeconds }); } } } @@ -286,8 +285,8 @@ export class Monitor extends MessageRouter { Monitor.IPCManager = IPC(this.activeWorker); this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`)); - this.on("kill", ({ args: { reason, graceful, errorCode } }) => this.killSession(reason, graceful, errorCode), true); - this.on("lifecycle", ({ args: { event } }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); + this.on("kill", ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode), true); + this.on("lifecycle", ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`), true); Monitor.IPCManager.setRouter(this.route); } diff --git a/src/server/session/agents/message_router.ts b/src/server/session/agents/process_message_router.ts index 707f771d9..f60343514 100644 --- a/src/server/session/agents/message_router.ts +++ b/src/server/session/agents/process_message_router.ts @@ -1,7 +1,8 @@ -import { MessageHandler, Message } from "../utilities/ipc"; +import { MessageHandler, PromisifiedIPCManager } from "../utilities/ipc"; -export default abstract class MessageRouter { +export default abstract class ProcessMessageRouter { + protected static IPCManager: PromisifiedIPCManager; private onMessage: { [name: string]: MessageHandler[] | undefined } = {}; /** @@ -38,7 +39,7 @@ export default abstract class MessageRouter { protected route: MessageHandler = async ({ name, args }) => { const handlers = this.onMessage[name]; if (handlers) { - await Promise.all(handlers.map(handler => handler({ name, args }))); + await Promise.all(handlers.map(handler => handler(args))); } } diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts index 50abe398d..23ffb2650 100644 --- a/src/server/session/agents/server_worker.ts +++ b/src/server/session/agents/server_worker.ts @@ -1,7 +1,7 @@ import { ExitHandler } from "./applied_session_agent"; import { isMaster } from "cluster"; -import { PromisifiedIPCManager, Message } from "../utilities/ipc"; -import MessageRouter from "./message_router"; +import { PromisifiedIPCManager, Message, MessageHandler } from "../utilities/ipc"; +import ProcessMessageRouter from "./process_message_router"; import { red, green, white, yellow } from "colors"; import { get } from "request-promise"; import { Monitor } from "./monitor"; @@ -11,8 +11,7 @@ import { Monitor } from "./monitor"; * if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification * email if the server encounters an uncaught exception or if the server cannot be reached. */ -export class ServerWorker extends MessageRouter { - private static IPCManager = new PromisifiedIPCManager(process); +export class ServerWorker extends ProcessMessageRouter { private static count = 0; private shouldServerBeResponsive = false; private exitHandlers: ExitHandler[] = []; @@ -56,10 +55,13 @@ export class ServerWorker extends MessageRouter { * A convenience wrapper to tell the session monitor (parent process) * to carry out the action with the specified message and arguments. */ - public emitToMonitor = (name: string, args?: any, awaitResponse = false) => ServerWorker.IPCManager.emit(name, args, awaitResponse); + public emitToMonitor = (name: string, args?: any) => ServerWorker.IPCManager.emit(name, args); + + public emitToMonitorPromise = (name: string, args?: any) => ServerWorker.IPCManager.emitPromise(name, args); private constructor(work: Function) { super(); + ServerWorker.IPCManager = new PromisifiedIPCManager(process); this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`)); const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env; @@ -80,11 +82,11 @@ export class ServerWorker extends MessageRouter { private configureProcess = () => { ServerWorker.IPCManager.setRouter(this.route); // updates the local values of variables to the those sent from master - this.on("updatePollingInterval", ({ args }: Message<{ newPollingIntervalSeconds: number }>) => { - this.pollingIntervalSeconds = args.newPollingIntervalSeconds; + this.on("updatePollingInterval", ({ newPollingIntervalSeconds }) => { + this.pollingIntervalSeconds = newPollingIntervalSeconds; return new Promise<void>(resolve => setTimeout(resolve, 1000 * 10)); }); - this.on("manualExit", async ({ args: { isSessionEnd } }: Message<{ isSessionEnd: boolean }>) => { + this.on("manualExit", async ({ isSessionEnd }) => { await this.executeExitHandlers(isSessionEnd); process.exit(0); }); diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts index 7ad00596d..db4c23180 100644 --- a/src/server/session/utilities/ipc.ts +++ b/src/server/session/utilities/ipc.ts @@ -11,7 +11,7 @@ export interface Message<T = any> { args: T; } type InternalMessage<T = any> = Message<T> & { metadata: any }; -export type MessageHandler<A = any, T extends Message<A> = Message<A>> = (message: T) => any | Promise<any>; +export type MessageHandler<T = any> = (message: T) => any | Promise<any>; export class PromisifiedIPCManager { private readonly target: IPCTarget; @@ -22,27 +22,22 @@ export class PromisifiedIPCManager { this.target = target; } - public emit = async (name: string, args?: any, awaitResponse = false): Promise<Error | undefined> => { - if (!this.target.send) { - return new Error("Cannot dispatch when send is undefined."); - } - if (awaitResponse) { - return new Promise(resolve => { - const messageId = Utils.GenerateGuid(); - const metadata: any = {}; - metadata[this.ipc_id] = messageId; - const responseHandler: MessageHandler<any, InternalMessage> = ({ metadata, args }) => { - if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) { - this.target.removeListener("message", responseHandler); - resolve(args?.error as Error | undefined); - } - }; - this.target.addListener("message", responseHandler); - this.target.send?.({ name, args, metadata }); - }); - } else { - this.target.send?.({ name, args }); - } + public emit = async (name: string, args?: any) => this.target.send?.({ name, args }); + + public emitPromise = async (name: string, args?: any) => { + return new Promise(resolve => { + const messageId = Utils.GenerateGuid(); + const metadata: any = {}; + metadata[this.ipc_id] = messageId; + const responseHandler: MessageHandler<any> = ({ metadata, args }) => { + if (metadata[this.is_response] && metadata[this.ipc_id] === messageId) { + this.target.removeListener("message", responseHandler); + resolve(args?.error as Error | undefined); + } + }; + this.target.addListener("message", responseHandler); + this.target.send?.({ name, args, metadata }); + }); } public setRouter = (router: Router) => { |