aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/server/RouteManager.ts4
-rw-r--r--src/server/session/agents/monitor.ts18
-rw-r--r--src/server/session/agents/server_worker.ts12
-rw-r--r--src/server/session/utilities/ipc.ts48
4 files changed, 46 insertions, 36 deletions
diff --git a/src/server/RouteManager.ts b/src/server/RouteManager.ts
index a8ad81bf7..b146d7b72 100644
--- a/src/server/RouteManager.ts
+++ b/src/server/RouteManager.ts
@@ -68,7 +68,7 @@ export default class RouteManager {
console.log('please remove all duplicate routes before continuing');
}
if (malformedCount) {
- console.log(`please ensure all routes adhere to ^\/$|^\/[A-Za-z]+(\/\:[A-Za-z]+)*$`);
+ console.log(`please ensure all routes adhere to ^\/$|^\/[A-Za-z]+(\/\:[A-Za-z?]+)*$`);
}
process.exit(1);
} else {
@@ -128,7 +128,7 @@ export default class RouteManager {
} else {
route = subscriber.build;
}
- if (!/^\/$|^\/[A-Za-z]+(\/\:[A-Za-z]+)*$/g.test(route)) {
+ if (!/^\/$|^\/[A-Za-z]+(\/\:[A-Za-z?]+)*$/g.test(route)) {
this.failedRegistrations.push({
reason: RegistrationError.Malformed,
route
diff --git a/src/server/session/agents/monitor.ts b/src/server/session/agents/monitor.ts
index e1709f5e6..f6738a23f 100644
--- a/src/server/session/agents/monitor.ts
+++ b/src/server/session/agents/monitor.ts
@@ -2,8 +2,8 @@ import { ExitHandler } from "./applied_session_agent";
import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config";
import Repl, { ReplAction } from "../utilities/repl";
import { isWorker, setupMaster, on, Worker, fork } from "cluster";
-import { IPC } from "../utilities/ipc";
-import { red, cyan, white, yellow, blue, green } from "colors";
+import { PromisifiedIPCManager, suffix } from "../utilities/ipc";
+import { red, cyan, white, yellow, blue } from "colors";
import { exec, ExecOptions } from "child_process";
import { Utils } from "../../../Utils";
import { validate, ValidationError } from "jsonschema";
@@ -16,7 +16,8 @@ import { EventEmitter } from "events";
* and spawns off an initial process that will respawn as predecessors die.
*/
export class Monitor extends EventEmitter {
-
+ private static readonly localIPCManager = new PromisifiedIPCManager(process);
+ private static childIPCManager: PromisifiedIPCManager;
private static count = 0;
private finalized = false;
private exitHandlers: ExitHandler[] = [];
@@ -28,7 +29,7 @@ export class Monitor extends EventEmitter {
public static Create() {
if (isWorker) {
- IPC.dispatchMessage(process, {
+ this.localIPCManager.emit({
action: {
message: "kill",
args: {
@@ -250,7 +251,7 @@ export class Monitor extends EventEmitter {
if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) {
this.config.polling.intervalSeconds = newPollingIntervalSeconds;
if (args[2] === "true") {
- return IPC.dispatchMessage(this.activeWorker!, { newPollingIntervalSeconds }, true);
+ return Monitor.localIPCManager.emit({ newPollingIntervalSeconds }, true);
}
}
}
@@ -266,7 +267,7 @@ export class Monitor extends EventEmitter {
private killActiveWorker = (graceful = true, isSessionEnd = false): void => {
if (this.activeWorker && !this.activeWorker.isDead()) {
if (graceful) {
- IPC.dispatchMessage(this.activeWorker, { manualExit: { isSessionEnd } });
+ Monitor.childIPCManager.emit({ manualExit: { isSessionEnd } });
} else {
this.activeWorker.process.kill();
}
@@ -312,11 +313,12 @@ export class Monitor extends EventEmitter {
socketPort: ports.socket,
pollingIntervalSeconds: intervalSeconds,
session_key: this.key,
- ipc_suffix: IPC.suffix
+ ipc_suffix: suffix
});
+ Monitor.childIPCManager = new PromisifiedIPCManager(this.activeWorker);
this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`));
// an IPC message handler that executes actions on the master thread when prompted by the active worker
- IPC.addMessagesHandler(this.activeWorker!, async ({ lifecycle, action }) => {
+ Monitor.childIPCManager.addMessagesHandler(async ({ lifecycle, action }) => {
if (action) {
const { message, args } = action as Monitor.Action;
console.log(this.timestamp(), `${this.config.identifiers.worker.text} action requested (${cyan(message)})`);
diff --git a/src/server/session/agents/server_worker.ts b/src/server/session/agents/server_worker.ts
index e9fdaf923..278cbb42f 100644
--- a/src/server/session/agents/server_worker.ts
+++ b/src/server/session/agents/server_worker.ts
@@ -1,6 +1,6 @@
import { ExitHandler } from "./applied_session_agent";
import { isMaster } from "cluster";
-import { IPC } from "../utilities/ipc";
+import { PromisifiedIPCManager } from "../utilities/ipc";
import { red, green, white, yellow } from "colors";
import { get } from "request-promise";
import { Monitor } from "./monitor";
@@ -11,7 +11,7 @@ import { Monitor } from "./monitor";
* email if the server encounters an uncaught exception or if the server cannot be reached.
*/
export class ServerWorker {
-
+ private static localIPCManager = new PromisifiedIPCManager(process);
private static count = 0;
private shouldServerBeResponsive = false;
private exitHandlers: ExitHandler[] = [];
@@ -27,7 +27,7 @@ export class ServerWorker {
console.error(red("cannot create a worker on the monitor process."));
process.exit(1);
} else if (++ServerWorker.count > 1) {
- IPC.dispatchMessage(process, {
+ ServerWorker.localIPCManager.emit({
action: {
message: "kill", args: {
reason: "cannot create more than one worker on a given worker process.",
@@ -59,7 +59,7 @@ export class ServerWorker {
* A convenience wrapper to tell the session monitor (parent process)
* to carry out the action with the specified message and arguments.
*/
- public sendMonitorAction = (message: string, args?: any, expectResponse = false) => IPC.dispatchMessage(process, { action: { message, args } }, expectResponse);
+ public sendMonitorAction = (message: string, args?: any, expectResponse = false) => ServerWorker.localIPCManager.emit({ action: { message, args } }, expectResponse);
private constructor(work: Function) {
this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`));
@@ -81,7 +81,7 @@ export class ServerWorker {
*/
private configureProcess = () => {
// updates the local values of variables to the those sent from master
- IPC.addMessagesHandler(process, async ({ newPollingIntervalSeconds, manualExit }) => {
+ ServerWorker.localIPCManager.addMessagesHandler(async ({ newPollingIntervalSeconds, manualExit }) => {
if (newPollingIntervalSeconds !== undefined) {
this.pollingIntervalSeconds = newPollingIntervalSeconds;
}
@@ -109,7 +109,7 @@ export class ServerWorker {
/**
* Notify master thread (which will log update in the console) of initialization via IPC.
*/
- public lifecycleNotification = (event: string) => IPC.dispatchMessage(process, { lifecycle: event });
+ public lifecycleNotification = (event: string) => ServerWorker.localIPCManager.emit({ lifecycle: event });
/**
* Called whenever the process has a reason to terminate, either through an uncaught exception
diff --git a/src/server/session/utilities/ipc.ts b/src/server/session/utilities/ipc.ts
index b20f3d337..888377c93 100644
--- a/src/server/session/utilities/ipc.ts
+++ b/src/server/session/utilities/ipc.ts
@@ -1,50 +1,58 @@
import { isMaster } from "cluster";
import { Utils } from "../../../Utils";
-export namespace IPC {
+export type IPCTarget = NodeJS.EventEmitter & { send?: Function };
+export type Listener = (message: any) => void | Promise<void>;
- export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix;
- const ipc_id = `ipc_id_${suffix}`;
- const response_expected = `response_expected_${suffix}`;
- const is_response = `is_response_${suffix}`;
+export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix;
- export async function dispatchMessage(target: NodeJS.EventEmitter & { send?: Function }, message: any, expectResponse = false): Promise<Error | undefined> {
- if (!target.send) {
+export class PromisifiedIPCManager {
+ private readonly target: IPCTarget;
+ private readonly ipc_id = `ipc_id_${suffix}`;
+ private readonly response_expected = `response_expected_${suffix}`;
+ private readonly is_response = `is_response_${suffix}`;
+
+ constructor(target: IPCTarget) {
+ this.target = target;
+ }
+
+ public emit = async (message: any, expectResponse = false): Promise<Error | undefined> => {
+ if (!this.target.send) {
return new Error("Cannot dispatch when send is undefined.");
}
- message[response_expected] = expectResponse;
+ message[this.response_expected] = expectResponse;
if (expectResponse) {
return new Promise(resolve => {
const messageId = Utils.GenerateGuid();
- message[ipc_id] = messageId;
+ message[this.ipc_id] = messageId;
const responseHandler: (args: any) => void = response => {
const { error } = response;
- if (response[is_response] && response[ipc_id] === messageId) {
- target.removeListener("message", responseHandler);
+ if (response[this.is_response] && response[this.ipc_id] === messageId) {
+ this.target.removeListener("message", responseHandler);
resolve(error);
}
};
- target.addListener("message", responseHandler);
- target.send!(message);
+ this.target.addListener("message", responseHandler);
+ this.target.send!(message);
});
} else {
- target.send(message);
+ this.target.send(message);
}
}
- export function addMessagesHandler(target: NodeJS.EventEmitter & { send?: Function }, handler: (message: any) => void | Promise<void>): void {
- target.addListener("message", async incoming => {
+ public addMessagesHandler = (handler: Listener): void => {
+ this.target.addListener("message", async incoming => {
let error: Error | undefined;
try {
await handler(incoming);
} catch (e) {
error = e;
}
- if (incoming[response_expected] && target.send) {
+ if (incoming[this.response_expected] && this.target.send) {
const response: any = { error };
- response[ipc_id] = incoming[ipc_id];
- response[is_response] = true;
- target.send(response);
+ response[this.ipc_id] = incoming[this.ipc_id];
+ response[this.is_response] = true;
+ this.target.send(response);
}
});
}