aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSam Wilkins <samwilkins333@gmail.com>2020-01-05 14:50:54 -0800
committerSam Wilkins <samwilkins333@gmail.com>2020-01-05 14:50:54 -0800
commitd4e7e354ec9209f117054ead9970ea9f180dd17b (patch)
treef36a17978ef9595f5ee1f4255299b2a51fcda73a /src
parentba1a85c4833820bc228779aa9187315d8b711268 (diff)
port config, customizers
Diffstat (limited to 'src')
-rw-r--r--src/server/Session/session.ts131
-rw-r--r--src/server/Session/session_config_schema.ts13
-rw-r--r--src/server/index.ts42
-rw-r--r--src/server/repl.ts27
4 files changed, 141 insertions, 72 deletions
diff --git a/src/server/Session/session.ts b/src/server/Session/session.ts
index 789a40c42..61b8bcf16 100644
--- a/src/server/Session/session.ts
+++ b/src/server/Session/session.ts
@@ -1,12 +1,10 @@
import { red, cyan, green, yellow, magenta } from "colors";
-import { isMaster, on, fork, setupMaster, Worker } from "cluster";
+import { on, fork, setupMaster, Worker } from "cluster";
import { execSync } from "child_process";
import { get } from "request-promise";
-import { WebSocket } from "../Websocket/Websocket";
import { Utils } from "../../Utils";
-import { MessageStore } from "../Message";
import { Email } from "../ActionUtilities";
-import Repl from "../repl";
+import Repl, { ReplAction } from "../repl";
import { readFileSync } from "fs";
import { validate, ValidationError } from "jsonschema";
import { configurationSchema } from "./session_config_schema";
@@ -26,26 +24,35 @@ const onWindows = process.platform === "win32";
*/
export namespace Session {
+ interface MasterCustomizer {
+ addReplCommand: (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => void;
+ addChildMessageHandler: (message: string, handler: ActionHandler) => void;
+ }
+
+ export interface SessionAction {
+ message: string;
+ args: any;
+ }
+
+ export type ExitHandler = (error: Error) => void | Promise<void>;
+ export type ActionHandler = (action: SessionAction) => void | Promise<void>;
+ export interface EmailTemplate {
+ subject: string;
+ body: string;
+ }
+ export type CrashEmailGenerator = (error: Error) => EmailTemplate | Promise<EmailTemplate>;
+
/**
* Validates and reads the configuration file, accordingly builds a child process factory
* and spawns off an initial process that will respawn as predecessors die.
*/
- export async function initializeMonitorThread(): Promise<Repl> {
+ export async function initializeMonitorThread(crashEmailGenerator?: CrashEmailGenerator): Promise<MasterCustomizer> {
let activeWorker: Worker;
+ const childMessageHandlers: { [message: string]: (action: SessionAction, args: any) => void } = {};
// read in configuration .json file only once, in the master thread
// pass down any variables the pertinent to the child processes as environment variables
- const {
- masterIdentifier,
- workerIdentifier,
- recipients,
- signature,
- heartbeatRoute,
- serverPort,
- socketPort,
- showServerOutput,
- pollingIntervalSeconds
- } = function loadConfiguration(): any {
+ const configuration = function loadConfiguration(): any {
try {
const configuration = JSON.parse(readFileSync('./session.config.json', 'utf8'));
const options = {
@@ -54,8 +61,8 @@ export namespace Session {
};
// ensure all necessary and no excess information is specified by the configuration file
validate(configuration, configurationSchema, options);
- configuration.masterIdentifier = `${yellow(configuration.masterIdentifier)}:`;
- configuration.workerIdentifier = `${magenta(configuration.workerIdentifier)}:`;
+ configuration.masterIdentifier = yellow(configuration.masterIdentifier + ":");
+ configuration.workerIdentifier = magenta(configuration.workerIdentifier + ":");
return configuration;
} catch (error) {
console.log(red("\nSession configuration failed."));
@@ -73,6 +80,17 @@ export namespace Session {
}
}();
+ const {
+ masterIdentifier,
+ workerIdentifier,
+ recipients,
+ ports,
+ signature,
+ heartbeatRoute,
+ showServerOutput,
+ pollingIntervalSeconds
+ } = configuration;
+
// this sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone
// to kill the server via the /kill/:key route
const key = Utils.GenerateGuid();
@@ -92,7 +110,7 @@ export namespace Session {
if (message !== "Channel closed") {
console.log(masterIdentifier, red(message));
if (stack) {
- console.log(masterIdentifier, `\n${red(stack)}`);
+ console.log(masterIdentifier, `uncaught exception\n${red(stack)}`);
}
}
});
@@ -113,39 +131,56 @@ export namespace Session {
return false;
};
+ const restart = () => {
+ // indicate to the worker that we are 'expecting' this restart
+ activeWorker.send({ setListening: false });
+ tryKillActiveWorker();
+ };
+
+ const setPort = (port: string, value: number, immediateRestart: boolean) => {
+ ports[port] = value;
+ if (immediateRestart) {
+ restart();
+ }
+ };
+
// kills the current active worker and proceeds to spawn a new worker,
// feeding in configuration information as environment variables
const spawn = (): void => {
tryKillActiveWorker();
activeWorker = fork({
heartbeatRoute,
- serverPort,
- socketPort,
+ serverPort: ports.server,
+ socketPort: ports.socket,
pollingIntervalSeconds,
session_key: key
});
console.log(masterIdentifier, `spawned new server worker with process id ${activeWorker.process.pid}`);
// an IPC message handler that executes actions on the master thread when prompted by the active worker
- activeWorker.on("message", ({ lifecycle, action }) => {
+ activeWorker.on("message", async ({ lifecycle, action }) => {
if (action) {
- const { message, args } = action;
+ const { message, args } = action as SessionAction;
console.log(`${workerIdentifier} action requested (${cyan(message)})`);
switch (message) {
case "kill":
- console.log(masterIdentifier, red("An authorized user has ended the server session from the /kill route"));
+ console.log(masterIdentifier, red("An authorized user has manually ended the server session"));
tryKillActiveWorker(false);
process.exit(0);
case "notify_crash":
- const { error: { name, message, stack } } = args;
- const content = [
- "You, as a Dash Administrator, are being notified of a server crash event. Here's what we know:",
- `name:\n${name}`,
- `message:\n${message}`,
- `stack:\n${stack}`,
- "The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress.",
- signature
- ].join("\n\n");
- Email.dispatchAll(recipients, "Dash Web Server Crash", content);
+ if (crashEmailGenerator) {
+ const { error } = args;
+ const { subject, body } = await crashEmailGenerator(error);
+ const content = `${body}\n\n${signature}`;
+ Email.dispatchAll(recipients, subject, content);
+ }
+ case "set_port":
+ const { port, value, immediateRestart } = args;
+ setPort(port, value, immediateRestart);
+ default:
+ const handler = childMessageHandlers[message];
+ if (handler) {
+ handler(action, args);
+ }
}
} else if (lifecycle) {
console.log(`${workerIdentifier} lifecycle phase (${lifecycle})`);
@@ -155,7 +190,7 @@ export namespace Session {
// a helpful cluster event called on the master thread each time a child process exits
on("exit", ({ process: { pid } }, code, signal) => {
- const prompt = `Server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`;
+ const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`;
console.log(masterIdentifier, cyan(prompt));
// to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one
spawn();
@@ -164,17 +199,18 @@ export namespace Session {
// builds the repl that allows the following commands to be typed into stdin of the master thread
const repl = new Repl({ identifier: masterIdentifier });
repl.registerCommand("exit", [], () => execSync(onWindows ? "taskkill /f /im node.exe" : "killall -9 node"));
- repl.registerCommand("restart", [], () => {
- // indicate to the worker that we are 'expecting' this restart
- activeWorker.send({ setListening: false });
- tryKillActiveWorker();
+ repl.registerCommand("restart", [], restart);
+ repl.registerCommand("set", [/[a-zA-Z]+/g, "port", /\d+/g, /true|false/g], args => {
+ setPort(args[0], Number(args[2]), args[3] === "true");
});
-
// finally, set things in motion by spawning off the first child (server) process
spawn();
// returned to allow the caller to add custom commands
- return repl;
+ return {
+ addReplCommand: repl.registerCommand,
+ addChildMessageHandler: (message: string, handler: ActionHandler) => { childMessageHandlers[message] = handler; }
+ };
}
/**
@@ -183,8 +219,9 @@ export namespace Session {
* email if the server encounters an uncaught exception or if the server cannot be reached.
* @param work the function specifying the work to be done by each worker thread
*/
- export async function initializeWorkerThread(work: Function): Promise<void> {
+ export async function initializeWorkerThread(work: Function): Promise<(handler: ExitHandler) => void> {
let listening = false;
+ const exitHandlers: ExitHandler[] = [];
// notify master thread (which will log update in the console) of initialization via IPC
process.send?.({ lifecycle: green("initializing...") });
@@ -194,7 +231,7 @@ export namespace Session {
// called whenever the process has a reason to terminate, either through an uncaught exception
// in the process (potentially inconsistent state) or the server cannot be reached
- const activeExit = (error: Error): void => {
+ const activeExit = async (error: Error): Promise<void> => {
if (!listening) {
return;
}
@@ -206,11 +243,7 @@ export namespace Session {
args: { error }
}
});
- const { _socket } = WebSocket;
- // notifies all client users of a crash event
- if (_socket) {
- Utils.Emit(_socket, MessageStore.ConnectionTerminated, "Manual");
- }
+ await Promise.all(exitHandlers.map(handler => handler(error)));
// notify master thread (which will log update in the console) of crash event via IPC
process.send?.({ lifecycle: red(`Crash event detected @ ${new Date().toUTCString()}`) });
process.send?.({ lifecycle: red(error.message) });
@@ -253,6 +286,8 @@ export namespace Session {
work();
checkHeartbeat(); // begin polling
+
+ return (handler: ExitHandler) => exitHandlers.push(handler);
}
} \ No newline at end of file
diff --git a/src/server/Session/session_config_schema.ts b/src/server/Session/session_config_schema.ts
index 03009a351..34d1ad523 100644
--- a/src/server/Session/session_config_schema.ts
+++ b/src/server/Session/session_config_schema.ts
@@ -3,7 +3,7 @@ import { Schema } from "jsonschema";
const emailPattern = /^(([a-zA-Z0-9_.-])+@([a-zA-Z0-9_.-])+\.([a-zA-Z])+([a-zA-Z])+)?$/g;
const localPortPattern = /\/[a-zA-Z]+/g;
-const properties = {
+const properties: { [name: string]: Schema } = {
recipients: {
type: "array",
items: {
@@ -12,8 +12,15 @@ const properties = {
},
minLength: 1
},
- serverPort: { type: "number" },
- socketPort: { type: "number" },
+ ports: {
+ type: "object",
+ properties: {
+ server: { type: "number" },
+ socket: { type: "number" }
+ },
+ required: ["server"],
+ additionalProperties: true
+ },
heartbeatRoute: {
type: "string",
pattern: localPortPattern
diff --git a/src/server/index.ts b/src/server/index.ts
index 4400687d8..0fee41bd8 100644
--- a/src/server/index.ts
+++ b/src/server/index.ts
@@ -26,6 +26,8 @@ import { yellow } from "colors";
import { Session } from "./Session/session";
import { isMaster } from "cluster";
import { execSync } from "child_process";
+import { Utils } from "../Utils";
+import { MessageStore } from "./Message";
export const publicDirectory = path.resolve(__dirname, "public");
export const filesDirectory = path.resolve(publicDirectory, "files");
@@ -132,17 +134,31 @@ function routeSetter({ isRelease, addSupervisedRoute, logRegistrationOutcome }:
/**
* Thread dependent session initialization
*/
-if (isMaster) {
- Session.initializeMonitorThread().then(({ registerCommand }) => {
- registerCommand("pull", [], () => execSync("git pull", { stdio: ["ignore", "inherit", "inherit"] }));
- });
-} else {
- Session.initializeWorkerThread(async () => {
- await log_execution({
- startMessage: "\nstarting execution of preliminary functions",
- endMessage: "completed preliminary functions\n",
- action: preliminaryFunctions
+(async function launch() {
+ if (isMaster) {
+ const emailGenerator = (error: Error) => {
+ const subject = "Dash Web Server Crash";
+ const { name, message, stack } = error;
+ const body = [
+ "You, as a Dash Administrator, are being notified of a server crash event. Here's what we know:",
+ `name:\n${name}`,
+ `message:\n${message}`,
+ `stack:\n${stack}`,
+ "The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress.",
+ ].join("\n\n");
+ return { subject, body };
+ };
+ const customizer = await Session.initializeMonitorThread(emailGenerator);
+ customizer.addReplCommand("pull", [], () => execSync("git pull", { stdio: ["ignore", "inherit", "inherit"] }));
+ } else {
+ const addExitHandler = await Session.initializeWorkerThread(async () => {
+ await log_execution({
+ startMessage: "\nstarting execution of preliminary functions",
+ endMessage: "completed preliminary functions\n",
+ action: preliminaryFunctions
+ });
+ await initializeServer(routeSetter);
});
- await initializeServer(routeSetter);
- });
-} \ No newline at end of file
+ addExitHandler(() => Utils.Emit(WebSocket._socket, MessageStore.ConnectionTerminated, "Manual"));
+ }
+})(); \ No newline at end of file
diff --git a/src/server/repl.ts b/src/server/repl.ts
index ec525582b..a47d4aad4 100644
--- a/src/server/repl.ts
+++ b/src/server/repl.ts
@@ -1,30 +1,33 @@
import { createInterface, Interface } from "readline";
-import { red } from "colors";
+import { red, green, white } from "colors";
export interface Configuration {
identifier: string;
onInvalid?: (culprit?: string) => string | string;
+ onValid?: (success?: string) => string | string;
isCaseSensitive?: boolean;
}
-type Action = (parsedArgs: IterableIterator<string>) => any | Promise<any>;
+export type ReplAction = (parsedArgs: Array<string>) => any | Promise<any>;
export interface Registration {
argPatterns: RegExp[];
- action: Action;
+ action: ReplAction;
}
export default class Repl {
private identifier: string;
- private onInvalid: ((culprit?: string) => string) | string;
+ private onInvalid: (culprit?: string) => string | string;
+ private onValid: (success: string) => string | string;
private isCaseSensitive: boolean;
private commandMap = new Map<string, Registration[]>();
public interface: Interface;
private busy = false;
private keys: string | undefined;
- constructor({ identifier: prompt, onInvalid, isCaseSensitive }: Configuration) {
+ constructor({ identifier: prompt, onInvalid, onValid, isCaseSensitive }: Configuration) {
this.identifier = prompt;
this.onInvalid = onInvalid || this.usage;
+ this.onValid = onValid || this.success;
this.isCaseSensitive = isCaseSensitive ?? true;
this.interface = createInterface(process.stdin, process.stdout).on('line', this.considerInput);
}
@@ -43,7 +46,9 @@ export default class Repl {
return `${this.identifier} commands: { ${members.sort().join(", ")} }`;
}
- public registerCommand = (basename: string, argPatterns: (RegExp | string)[], action: Action) => {
+ private success = (command: string) => `${this.identifier} completed execution of ${white(command)}`;
+
+ public registerCommand = (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => {
const existing = this.commandMap.get(basename);
const converted = argPatterns.map(input => input instanceof RegExp ? input : new RegExp(input));
const registration = { argPatterns: converted, action };
@@ -59,7 +64,13 @@ export default class Repl {
this.busy = false;
}
+ private valid = (command: string) => {
+ console.log(green(typeof this.onValid === "string" ? this.onValid : this.onValid(command)));
+ this.busy = false;
+ }
+
private considerInput = async (line: string) => {
+ console.log("raw", line);
if (this.busy) {
console.log(red("Busy"));
return;
@@ -91,8 +102,8 @@ export default class Repl {
matched = true;
}
if (!length || matched) {
- await action(parsed[Symbol.iterator]());
- this.busy = false;
+ await action(parsed);
+ this.valid(`${command} ${parsed.join(" ")}`);
return;
}
}