aboutsummaryrefslogtreecommitdiff
path: root/src/server/DashSession/Session/agents
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/DashSession/Session/agents')
-rw-r--r--src/server/DashSession/Session/agents/applied_session_agent.ts23
-rw-r--r--src/server/DashSession/Session/agents/monitor.ts48
-rw-r--r--src/server/DashSession/Session/agents/process_message_router.ts12
-rw-r--r--src/server/DashSession/Session/agents/promisified_ipc_manager.ts56
-rw-r--r--src/server/DashSession/Session/agents/server_worker.ts46
5 files changed, 89 insertions, 96 deletions
diff --git a/src/server/DashSession/Session/agents/applied_session_agent.ts b/src/server/DashSession/Session/agents/applied_session_agent.ts
index 2037e93e5..c42ba95cc 100644
--- a/src/server/DashSession/Session/agents/applied_session_agent.ts
+++ b/src/server/DashSession/Session/agents/applied_session_agent.ts
@@ -1,13 +1,13 @@
-import * as _cluster from "cluster";
-import { Monitor } from "./monitor";
-import { ServerWorker } from "./server_worker";
+import * as _cluster from 'cluster';
+import { Monitor } from './monitor';
+import { ServerWorker } from './server_worker';
+
const cluster = _cluster as any;
const isMaster = cluster.isPrimary;
export type ExitHandler = (reason: Error | boolean) => void | Promise<void>;
export abstract class AppliedSessionAgent {
-
// the following two methods allow the developer to create a custom
// session and use the built in customization options for each thread
protected abstract initializeMonitor(monitor: Monitor): Promise<string>;
@@ -18,15 +18,15 @@ export abstract class AppliedSessionAgent {
public killSession = (reason: string, graceful = true, errorCode = 0) => {
const target = cluster.default.isPrimary ? this.sessionMonitor : this.serverWorker;
target.killSession(reason, graceful, errorCode);
- }
+ };
private sessionMonitorRef: Monitor | undefined;
public get sessionMonitor(): Monitor {
if (!cluster.default.isPrimary) {
- this.serverWorker.emit("kill", {
+ this.serverWorker.emit('kill', {
graceful: false,
- reason: "Cannot access the session monitor directly from the server worker thread.",
- errorCode: 1
+ reason: 'Cannot access the session monitor directly from the server worker thread.',
+ errorCode: 1,
});
throw new Error();
}
@@ -36,7 +36,7 @@ export abstract class AppliedSessionAgent {
private serverWorkerRef: ServerWorker | undefined;
public get serverWorker(): ServerWorker {
if (isMaster) {
- throw new Error("Cannot access the server worker directly from the session monitor thread");
+ throw new Error('Cannot access the server worker directly from the session monitor thread');
}
return this.serverWorkerRef!;
}
@@ -52,8 +52,7 @@ export abstract class AppliedSessionAgent {
this.serverWorkerRef = await this.initializeServerWorker();
}
} else {
- throw new Error("Cannot launch a session thread more than once per process.");
+ throw new Error('Cannot launch a session thread more than once per process.');
}
}
-
-} \ No newline at end of file
+}
diff --git a/src/server/DashSession/Session/agents/monitor.ts b/src/server/DashSession/Session/agents/monitor.ts
index a6fde4356..6cdad46c2 100644
--- a/src/server/DashSession/Session/agents/monitor.ts
+++ b/src/server/DashSession/Session/agents/monitor.ts
@@ -1,21 +1,19 @@
-import { ExitHandler } from './applied_session_agent';
-import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from '../utilities/session_config';
-import Repl, { ReplAction } from '../utilities/repl';
+import { ExecOptions, exec } from 'child_process';
import * as _cluster from 'cluster';
import { Worker } from 'cluster';
-import { manage, MessageHandler, ErrorLike } from './promisified_ipc_manager';
-import { red, cyan, white, yellow, blue } from 'colors';
-import { exec, ExecOptions } from 'child_process';
-import { validate, ValidationError } from 'jsonschema';
-import { Utilities } from '../utilities/utilities';
+import { blue, cyan, red, white, yellow } from 'colors';
import { readFileSync } from 'fs';
+import { ValidationError, validate } from 'jsonschema';
+import Repl, { ReplAction } from '../utilities/repl';
+import { Configuration, Identifiers, colorMapping, configurationSchema, defaultConfig } from '../utilities/session_config';
+import { Utilities } from '../utilities/utilities';
+import { ExitHandler } from './applied_session_agent';
import IPCMessageReceiver from './process_message_router';
+import { ErrorLike, MessageHandler, manage } from './promisified_ipc_manager';
import { ServerWorker } from './server_worker';
+
const cluster = _cluster as any;
-const isWorker = cluster.isWorker;
-const setupMaster = cluster.setupPrimary;
-const on = cluster.on;
-const fork = cluster.fork;
+const { isWorker, setupMaster, on, fork } = cluster;
/**
* Validates and reads the configuration file, accordingly builds a child process factory
@@ -41,9 +39,8 @@ export class Monitor extends IPCMessageReceiver {
} else if (++Monitor.count > 1) {
console.error(red('cannot create more than one monitor.'));
process.exit(1);
- } else {
- return new Monitor();
}
+ return new Monitor();
}
private constructor() {
@@ -128,25 +125,25 @@ export class Monitor extends IPCMessageReceiver {
this.repl.registerCommand(basename, argPatterns, action);
};
- public exec = (command: string, options?: ExecOptions) => {
- return new Promise<void>(resolve => {
+ public exec = (command: string, options?: ExecOptions) =>
+ new Promise<void>(resolve => {
exec(command, { ...options, encoding: 'utf8' }, (error, stdout, stderr) => {
if (error) {
this.execLog(red(`unable to execute ${white(command)}`));
error.message.split('\n').forEach(line => line.length && this.execLog(red(`(error) ${line}`)));
} else {
- let outLines: string[], errorLines: string[];
- if ((outLines = stdout.split('\n').filter(line => line.length)).length) {
+ const outLines = stdout.split('\n').filter(line => line.length);
+ if (outLines.length) {
outLines.forEach(line => line.length && this.execLog(cyan(`(stdout) ${line}`)));
}
- if ((errorLines = stderr.split('\n').filter(line => line.length)).length) {
+ const errorLines = stderr.split('\n').filter(line => line.length);
+ if (errorLines.length) {
errorLines.forEach(line => line.length && this.execLog(yellow(`(stderr) ${line}`)));
}
}
resolve();
});
});
- };
/**
* Generates a blue UTC string associated with the time
@@ -226,12 +223,10 @@ export class Monitor extends IPCMessageReceiver {
const newPollingIntervalSeconds = Math.floor(Number(args[1]));
if (newPollingIntervalSeconds < 0) {
this.mainLog(red('the polling interval must be a non-negative integer'));
- } else {
- if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) {
- this.config.polling.intervalSeconds = newPollingIntervalSeconds;
- if (args[2] === 'true') {
- Monitor.IPCManager.emit('updatePollingInterval', { newPollingIntervalSeconds });
- }
+ } else if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) {
+ this.config.polling.intervalSeconds = newPollingIntervalSeconds;
+ if (args[2] === 'true') {
+ Monitor.IPCManager.emit('updatePollingInterval', { newPollingIntervalSeconds });
}
}
});
@@ -297,6 +292,7 @@ export class Monitor extends IPCMessageReceiver {
};
}
+// eslint-disable-next-line no-redeclare
export namespace Monitor {
export enum IntrinsicEvents {
KeyGenerated = 'key_generated',
diff --git a/src/server/DashSession/Session/agents/process_message_router.ts b/src/server/DashSession/Session/agents/process_message_router.ts
index 0745ea455..3e2b7d8d0 100644
--- a/src/server/DashSession/Session/agents/process_message_router.ts
+++ b/src/server/DashSession/Session/agents/process_message_router.ts
@@ -1,7 +1,6 @@
-import { MessageHandler, PromisifiedIPCManager, HandlerMap } from "./promisified_ipc_manager";
+import { MessageHandler, PromisifiedIPCManager, HandlerMap } from './promisified_ipc_manager';
export default abstract class IPCMessageReceiver {
-
protected static IPCManager: PromisifiedIPCManager;
protected handlers: HandlerMap = {};
@@ -18,7 +17,7 @@ export default abstract class IPCMessageReceiver {
} else {
handlers.push(handler);
}
- }
+ };
/**
* Unregister a given listener at this message.
@@ -31,11 +30,10 @@ export default abstract class IPCMessageReceiver {
handlers.splice(index, 1);
}
}
- }
+ };
- /**
+ /**
* Unregister all listeners at this message.
*/
public clearMessageListeners = (...names: string[]) => names.map(name => delete this.handlers[name]);
-
-} \ No newline at end of file
+}
diff --git a/src/server/DashSession/Session/agents/promisified_ipc_manager.ts b/src/server/DashSession/Session/agents/promisified_ipc_manager.ts
index 76e218977..fc870d003 100644
--- a/src/server/DashSession/Session/agents/promisified_ipc_manager.ts
+++ b/src/server/DashSession/Session/agents/promisified_ipc_manager.ts
@@ -1,13 +1,14 @@
-import { Utilities } from '../utilities/utilities';
import { ChildProcess } from 'child_process';
+import { Utilities } from '../utilities/utilities';
/**
- * Convenience constructor
- * @param target the process / worker to which to attach the specialized listeners
+ * Specifies a general message format for this API
*/
-export function manage(target: IPCTarget, handlers?: HandlerMap) {
- return new PromisifiedIPCManager(target, handlers);
-}
+export type Message<T = any> = {
+ name: string;
+ args?: T;
+};
+export type MessageHandler<T = any> = (args: T) => any | Promise<any>;
/**
* Captures the logic to execute upon receiving a message
@@ -22,15 +23,10 @@ export type HandlerMap = { [name: string]: MessageHandler[] };
*/
export type IPCTarget = NodeJS.Process | ChildProcess;
-/**
- * Specifies a general message format for this API
- */
-export type Message<T = any> = {
- name: string;
- args?: T;
-};
-export type MessageHandler<T = any> = (args: T) => any | Promise<any>;
-
+interface Metadata {
+ isResponse: boolean;
+ id: string;
+}
/**
* When a message is emitted, it is embedded with private metadata
* to facilitate the resolution of promises, etc.
@@ -38,10 +34,6 @@ export type MessageHandler<T = any> = (args: T) => any | Promise<any>;
interface InternalMessage extends Message {
metadata: Metadata;
}
-interface Metadata {
- isResponse: boolean;
- id: string;
-}
/**
* Allows for the transmission of the error's key features over IPC.
@@ -95,11 +87,11 @@ export class PromisifiedIPCManager {
}
return new Promise<Response<T>>(resolve => {
const messageId = Utilities.guid();
- type InternalMessageHandler = (message: any /* MessageListener*/) => any | Promise<any>;
- const responseHandler: InternalMessageHandler = ({ metadata: { id, isResponse }, args }) => {
+ type InternalMessageHandler = (message: any /* MessageListener */) => any | Promise<any>;
+ const responseHandler: InternalMessageHandler = ({ metadata: { id, isResponse }, args: hargs }) => {
if (isResponse && id === messageId) {
this.target.removeListener('message', responseHandler);
- resolve(args);
+ resolve(hargs);
}
};
this.target.addListener('message', responseHandler);
@@ -118,8 +110,9 @@ export class PromisifiedIPCManager {
* completion response for each of the pending messages, allowing their
* promises in the caller to resolve.
*/
- public destroy = () => {
- return new Promise<void>(async resolve => {
+ public destroy = () =>
+ // eslint-disable-next-line no-async-promise-executor
+ new Promise<void>(async resolve => {
if (this.callerIsTarget) {
this.destroyHelper();
} else {
@@ -127,7 +120,6 @@ export class PromisifiedIPCManager {
}
resolve();
});
- };
/**
* Dispatches the dummy responses and sets the isDestroyed flag to true.
@@ -168,12 +160,20 @@ export class PromisifiedIPCManager {
error = e;
}
if (!this.isDestroyed && this.target.send) {
- const metadata = { id, isResponse: true };
+ const metadataRes = { id, isResponse: true };
const response: Response = { results, error };
- const message = { name, args: response, metadata };
+ const messageRes = { name, args: response, metadata: metadataRes };
delete this.pendingMessages[id];
- this.target.send(message);
+ this.target.send(messageRes);
}
}
};
}
+
+/**
+ * Convenience constructor
+ * @param target the process / worker to which to attach the specialized listeners
+ */
+export function manage(target: IPCTarget, handlers?: HandlerMap) {
+ return new PromisifiedIPCManager(target, handlers);
+}
diff --git a/src/server/DashSession/Session/agents/server_worker.ts b/src/server/DashSession/Session/agents/server_worker.ts
index d8b3ee80b..85e1b31d6 100644
--- a/src/server/DashSession/Session/agents/server_worker.ts
+++ b/src/server/DashSession/Session/agents/server_worker.ts
@@ -1,10 +1,10 @@
-import cluster from "cluster";
-import { green, red, white, yellow } from "colors";
-import { get } from "request-promise";
-import { ExitHandler } from "./applied_session_agent";
-import { Monitor } from "./monitor";
-import IPCMessageReceiver from "./process_message_router";
-import { ErrorLike, manage } from "./promisified_ipc_manager";
+import cluster from 'cluster';
+import { green, red, white, yellow } from 'colors';
+import { get } from 'request-promise';
+import { ExitHandler } from './applied_session_agent';
+import { Monitor } from './monitor';
+import IPCMessageReceiver from './process_message_router';
+import { ErrorLike, manage } from './promisified_ipc_manager';
/**
* Effectively, each worker repairs the connection to the server by reintroducing a consistent state
@@ -23,18 +23,17 @@ export class ServerWorker extends IPCMessageReceiver {
private isInitialized = false;
public static Create(work: Function) {
if (cluster.isPrimary) {
- console.error(red("cannot create a worker on the monitor process."));
+ console.error(red('cannot create a worker on the monitor process.'));
process.exit(1);
} else if (++ServerWorker.count > 1) {
- ServerWorker.IPCManager.emit("kill", {
- reason: "cannot create more than one worker on a given worker process.",
+ ServerWorker.IPCManager.emit('kill', {
+ reason: 'cannot create more than one worker on a given worker process.',
graceful: false,
- errorCode: 1
+ errorCode: 1,
});
process.exit(1);
- } else {
- return new ServerWorker(work);
}
+ return new ServerWorker(work);
}
/**
@@ -48,7 +47,7 @@ export class ServerWorker extends IPCMessageReceiver {
* server worker (child process). This will also kill
* this process (child process).
*/
- public killSession = (reason: string, graceful = true, errorCode = 0) => this.emit<never>("kill", { reason, graceful, errorCode });
+ public killSession = (reason: string, graceful = true, errorCode = 0) => this.emit<never>('kill', { reason, graceful, errorCode });
/**
* A convenience wrapper to tell the session monitor (parent process)
@@ -60,7 +59,7 @@ export class ServerWorker extends IPCMessageReceiver {
super();
this.configureInternalHandlers();
ServerWorker.IPCManager = manage(process, this.handlers);
- this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(" ")}]`)}`));
+ this.lifecycleNotification(green(`initializing process... ${white(`[${process.execPath} ${process.execArgv.join(' ')}]`)}`));
const { pollingRoute, serverPort, pollingIntervalSeconds, pollingFailureTolerance } = process.env;
this.serverPort = Number(serverPort);
@@ -78,8 +77,10 @@ export class ServerWorker extends IPCMessageReceiver {
*/
protected configureInternalHandlers = () => {
// updates the local values of variables to the those sent from master
- this.on("updatePollingInterval", ({ newPollingIntervalSeconds }) => this.pollingIntervalSeconds = newPollingIntervalSeconds);
- this.on("manualExit", async ({ isSessionEnd }) => {
+ this.on('updatePollingInterval', ({ newPollingIntervalSeconds }) => {
+ this.pollingIntervalSeconds = newPollingIntervalSeconds;
+ });
+ this.on('manualExit', async ({ isSessionEnd }) => {
await ServerWorker.IPCManager.destroy();
await this.executeExitHandlers(isSessionEnd);
process.exit(0);
@@ -91,7 +92,7 @@ export class ServerWorker extends IPCMessageReceiver {
const appropriateError = reason instanceof Error ? reason : new Error(`unhandled rejection: ${reason}`);
this.proactiveUnplannedExit(appropriateError);
});
- }
+ };
/**
* Execute the list of functions registered to be called
@@ -102,7 +103,7 @@ export class ServerWorker extends IPCMessageReceiver {
/**
* Notify master thread (which will log update in the console) of initialization via IPC.
*/
- public lifecycleNotification = (event: string) => this.emit("lifecycle", { event });
+ public lifecycleNotification = (event: string) => this.emit('lifecycle', { event });
/**
* Called whenever the process has a reason to terminate, either through an uncaught exception
@@ -120,11 +121,11 @@ export class ServerWorker extends IPCMessageReceiver {
this.lifecycleNotification(red(error.message));
await ServerWorker.IPCManager.destroy();
process.exit(1);
- }
+ };
/**
* This monitors the health of the server by submitting a get request to whatever port / route specified
- * by the configuration every n seconds, where n is also given by the configuration.
+ * by the configuration every n seconds, where n is also given by the configuration.
*/
private pollServer = async (): Promise<void> => {
await new Promise<void>(resolve => {
@@ -156,6 +157,5 @@ export class ServerWorker extends IPCMessageReceiver {
});
// controlled, asynchronous infinite recursion achieves a persistent poll that does not submit a new request until the previous has completed
this.pollServer();
- }
-
+ };
}