aboutsummaryrefslogtreecommitdiff
path: root/src/server/Session/session.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/Session/session.ts')
-rw-r--r--src/server/Session/session.ts79
1 files changed, 67 insertions, 12 deletions
diff --git a/src/server/Session/session.ts b/src/server/Session/session.ts
index d46e6b6e7..7b194598b 100644
--- a/src/server/Session/session.ts
+++ b/src/server/Session/session.ts
@@ -133,6 +133,56 @@ export namespace Session {
export type ExitHandler = (reason: Error | boolean) => void | Promise<void>;
+ namespace IPC {
+
+ export const suffix = isMaster ? Utils.GenerateGuid() : process.env.ipc_suffix;
+ const ipc_id = `ipc_id_${suffix}`;
+ const response_expected = `response_expected_${suffix}`;
+ const is_response = `is_response_${suffix}`;
+
+ export async function dispatchMessage(target: NodeJS.EventEmitter & { send?: Function }, message: any, expectResponse = false): Promise<Error | undefined> {
+ if (!target.send) {
+ return new Error("Cannot dispatch when send is undefined.");
+ }
+ message[response_expected] = expectResponse;
+ if (expectResponse) {
+ return new Promise(resolve => {
+ const messageId = Utils.GenerateGuid();
+ message[ipc_id] = messageId;
+ const responseHandler: (args: any) => void = response => {
+ const { error } = response;
+ if (response[is_response] && response[ipc_id] === messageId) {
+ target.removeListener("message", responseHandler);
+ resolve(error);
+ }
+ };
+ target.addListener("message", responseHandler);
+ target.send!(message);
+ });
+ } else {
+ target.send(message);
+ }
+ }
+
+ export function addMessagesHandler(target: NodeJS.EventEmitter & { send?: Function }, handler: (message: any) => void | Promise<void>): void {
+ target.addListener("message", async incoming => {
+ let error: Error | undefined;
+ try {
+ await handler(incoming);
+ } catch (e) {
+ error = e;
+ }
+ if (incoming[response_expected] && target.send) {
+ const response: any = { error };
+ response[ipc_id] = incoming[ipc_id];
+ response[is_response] = true;
+ target.send(response);
+ }
+ });
+ }
+
+ }
+
export namespace Monitor {
export interface NotifierHooks {
@@ -166,7 +216,7 @@ export namespace Session {
public static Create(notifiers?: Monitor.NotifierHooks) {
if (isWorker) {
- process.send?.({
+ IPC.dispatchMessage(process, {
action: {
message: "kill",
args: {
@@ -418,15 +468,15 @@ export namespace Session {
repl.registerCommand("exit", [/clean|force/], args => this.killSession("manual exit requested by repl", args[0] === "clean", 0));
repl.registerCommand("restart", [/clean|force/], args => this.killActiveWorker(args[0] === "clean"));
repl.registerCommand("set", [letters, "port", number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === "true"));
- repl.registerCommand("set", [/polling/, number, boolean], args => {
- const newPollingIntervalSeconds = Math.floor(Number(args[2]));
+ repl.registerCommand("set", [/polling/, number, boolean], async args => {
+ const newPollingIntervalSeconds = Math.floor(Number(args[1]));
if (newPollingIntervalSeconds < 0) {
this.mainLog(red("the polling interval must be a non-negative integer"));
} else {
if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) {
this.config.polling.intervalSeconds = newPollingIntervalSeconds;
- if (args[3] === "true") {
- this.activeWorker?.send({ newPollingIntervalSeconds });
+ if (args[2] === "true") {
+ return IPC.dispatchMessage(this.activeWorker!, { newPollingIntervalSeconds }, true);
}
}
}
@@ -442,7 +492,7 @@ export namespace Session {
private killActiveWorker = (graceful = true, isSessionEnd = false): void => {
if (this.activeWorker && !this.activeWorker.isDead()) {
if (graceful) {
- this.activeWorker.send({ manualExit: { isSessionEnd } });
+ IPC.dispatchMessage(this.activeWorker, { manualExit: { isSessionEnd } });
} else {
this.activeWorker.process.kill();
}
@@ -487,11 +537,12 @@ export namespace Session {
serverPort: ports.server,
socketPort: ports.socket,
pollingIntervalSeconds: intervalSeconds,
- session_key: this.key
+ session_key: this.key,
+ ipc_suffix: IPC.suffix
});
this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker.process.pid}`));
// an IPC message handler that executes actions on the master thread when prompted by the active worker
- this.activeWorker.on("message", async ({ lifecycle, action }) => {
+ IPC.addMessagesHandler(this.activeWorker, async ({ lifecycle, action }) => {
if (action) {
const { message, args } = action as Monitor.Action;
console.log(this.timestamp(), `${this.config.identifiers.worker.text} action requested (${cyan(message)})`);
@@ -547,7 +598,7 @@ export namespace Session {
console.error(red("cannot create a worker on the monitor process."));
process.exit(1);
} else if (++ServerWorker.count > 1) {
- process.send?.({
+ IPC.dispatchMessage(process, {
action: {
message: "kill", args: {
reason: "cannot create more than one worker on a given worker process.",
@@ -579,7 +630,7 @@ export namespace Session {
* A convenience wrapper to tell the session monitor (parent process)
* to carry out the action with the specified message and arguments.
*/
- public sendMonitorAction = (message: string, args?: any) => process.send!({ action: { message, args } });
+ public sendMonitorAction = (message: string, args?: any, expectResponse = false) => IPC.dispatchMessage(process, { action: { message, args } }, expectResponse);
private constructor(work: Function) {
this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`));
@@ -601,8 +652,11 @@ export namespace Session {
*/
private configureProcess = () => {
// updates the local values of variables to the those sent from master
- process.on("message", async ({ newPollingIntervalSeconds, manualExit }) => {
+ IPC.addMessagesHandler(process, async ({ newPollingIntervalSeconds, manualExit }) => {
if (newPollingIntervalSeconds !== undefined) {
+ await new Promise<void>(resolve => {
+ setTimeout(resolve, 1000 * 10);
+ });
this.pollingIntervalSeconds = newPollingIntervalSeconds;
}
if (manualExit !== undefined) {
@@ -629,7 +683,7 @@ export namespace Session {
/**
* Notify master thread (which will log update in the console) of initialization via IPC.
*/
- public lifecycleNotification = (event: string) => process.send?.({ lifecycle: event });
+ public lifecycleNotification = (event: string) => IPC.dispatchMessage(process, { lifecycle: event });
/**
* Called whenever the process has a reason to terminate, either through an uncaught exception
@@ -643,6 +697,7 @@ export namespace Session {
// notify master thread (which will log update in the console) of crash event via IPC
this.lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`));
this.lifecycleNotification(red(error.message));
+ console.log("GAH!", error);
process.exit(1);
}