aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/server/ActionUtilities.ts4
-rw-r--r--src/server/Session/session.ts348
-rw-r--r--src/server/Session/session_config_schema.ts44
-rw-r--r--src/server/index.ts11
4 files changed, 246 insertions, 161 deletions
diff --git a/src/server/ActionUtilities.ts b/src/server/ActionUtilities.ts
index 950fba093..3125f8683 100644
--- a/src/server/ActionUtilities.ts
+++ b/src/server/ActionUtilities.ts
@@ -118,6 +118,10 @@ export namespace Email {
}
});
+ export async function dispatchAll(recipients: string[], subject: string, content: string) {
+ return Promise.all(recipients.map((recipient: string) => Email.dispatch(recipient, subject, content)));
+ }
+
export async function dispatch(recipient: string, subject: string, content: string): Promise<boolean> {
const mailOptions = {
to: recipient,
diff --git a/src/server/Session/session.ts b/src/server/Session/session.ts
index 1ff4ce4de..2ff4ef0d2 100644
--- a/src/server/Session/session.ts
+++ b/src/server/Session/session.ts
@@ -13,162 +13,232 @@ import { configurationSchema } from "./session_config_schema";
const onWindows = process.platform === "win32";
+/**
+ * This namespace relies on NodeJS's cluster module, which allows a parent (master) process to share
+ * code with its children (workers). A simple `isMaster` flag indicates who is trying to access
+ * the code, and thus determines the functionality that actually gets invoked (checked by the caller, not internally).
+ *
+ * Think of the master thread as a factory, and the workers as the helpers that actually run the server.
+ *
+ * So, when we run `npm start`, given the appropriate check, initializeMaster() is called in the parent process
+ * This will spawn off its own child process (by default, mirrors the execution path of its parent),
+ * in which initializeWorker() is invoked.
+ */
export namespace Session {
- export let key: string;
- let activeWorker: Worker;
- let listening = false;
-
- function loadConfiguration() {
- try {
- const raw = readFileSync('./session.config.json', 'utf8');
- const configuration = JSON.parse(raw);
- const options = {
- throwError: true,
- allowUnknownAttributes: false
- };
- validate(configuration, configurationSchema, options);
- configuration.masterIdentifier = `${yellow(configuration.masterIdentifier)}:`;
- configuration.workerIdentifier = `${magenta(configuration.workerIdentifier)}:`;
- return configuration;
- } catch (error) {
- console.log(red("\nSession configuration failed."));
- if (error instanceof ValidationError) {
- console.log("The given session.config.json configuration file is invalid.");
- console.log(`${error.instance}: ${error.stack}`);
- } else if (error.code === "ENOENT" && error.path === "./session.config.json") {
- console.log("Please include a session.config.json configuration file in your project root.");
- } else {
- console.log(error.stack);
+ /**
+ * Validates and reads the configuration file, accordingly builds a child process factory
+ * and spawns off an initial process that will respawn as predecessors die.
+ */
+ export async function initializeMaster() {
+ let activeWorker: Worker;
+
+ // read in configuration .json file only once, in the master thread
+ // pass down any variables the pertinent to the child processes as environment variables
+ const {
+ masterIdentifier,
+ workerIdentifier,
+ recipients,
+ signature,
+ heartbeat,
+ showServerOutput,
+ pollingIntervalSeconds
+ } = function loadConfiguration() {
+ try {
+ const raw = readFileSync('./session.config.json', 'utf8');
+ const configuration = JSON.parse(raw);
+ const options = {
+ throwError: true,
+ allowUnknownAttributes: false
+ };
+ // ensure all necessary and no excess information is specified by the configuration file
+ validate(configuration, configurationSchema, options);
+ configuration.masterIdentifier = `${yellow(configuration.masterIdentifier)}:`;
+ configuration.workerIdentifier = `${magenta(configuration.workerIdentifier)}:`;
+ return configuration;
+ } catch (error) {
+ console.log(red("\nSession configuration failed."));
+ if (error instanceof ValidationError) {
+ console.log("The given session.config.json configuration file is invalid.");
+ console.log(`${error.instance}: ${error.stack}`);
+ } else if (error.code === "ENOENT" && error.path === "./session.config.json") {
+ console.log("Please include a session.config.json configuration file in your project root.");
+ } else {
+ console.log("The following unknown error occurred during configuration.");
+ console.log(error.stack);
+ }
+ console.log();
+ process.exit(0);
}
- console.log();
- process.exit(0);
- }
- }
+ }();
- export async function email(recipients: string[], subject: string, content: string) {
- return Promise.all(recipients.map((recipient: string) => Email.dispatch(recipient, subject, content)));
- }
+ // this sends a random guid to the configuration's recipients, allowing them alone
+ // to kill the server via the /kill/:password route
+ const key = Utils.GenerateGuid();
+ const timestamp = new Date().toUTCString();
+ const content = `The key for this session (started @ ${timestamp}) is ${key}.\n\n${signature}`;
+ await Email.dispatchAll(recipients, "Server Termination Key", content);
- function tryKillActiveWorker() {
- if (activeWorker && !activeWorker.isDead()) {
- activeWorker.process.kill();
- return true;
- }
- return false;
- }
+ console.log(masterIdentifier, "distributed session key to recipients");
- async function activeExit(error: Error) {
- if (!listening) {
- return;
- }
- listening = false;
- process.send?.({
- action: {
- message: "notify_crash",
- args: { error }
+ // handle exceptions in the master thread - there shouldn't be many of these
+ // the IPC (inter process communication) channel closed exception can't seem to be caught in a try catch, and is inconsequential, so it is ignored
+ process.on("uncaughtException", ({ message, stack }) => {
+ if (message !== "Channel closed") {
+ console.log(masterIdentifier, red(message));
+ if (stack) {
+ console.log(masterIdentifier, `\n${red(stack)}`);
+ }
}
});
- const { _socket } = WebSocket;
- if (_socket) {
- Utils.Emit(_socket, MessageStore.ConnectionTerminated, "Manual");
- }
- process.send?.({ lifecycle: red(`Crash event detected @ ${new Date().toUTCString()}`) });
- process.send?.({ lifecycle: red(error.message) });
- process.exit(1);
- }
- export async function initialize(work: Function) {
- if (isMaster) {
- const {
- masterIdentifier,
- workerIdentifier,
- recipients,
- signature,
+ // determines whether or not we see the compilation / initialization / runtime output of each child server process
+ setupMaster({ silent: !showServerOutput });
+
+ // attempts to kills the active worker ungracefully
+ const tryKillActiveWorker = () => {
+ if (activeWorker && !activeWorker.isDead()) {
+ activeWorker.process.kill();
+ return true;
+ }
+ return false;
+ };
+
+ // kills the current active worker and proceeds to spawn a new worker, feeding in configuration information as environment variables
+ const spawn = () => {
+ tryKillActiveWorker();
+ activeWorker = fork({
heartbeat,
- silentChildren
- } = loadConfiguration();
- await (async function distributeKey() {
- key = Utils.GenerateGuid();
- const timestamp = new Date().toUTCString();
- const content = `The key for this session (started @ ${timestamp}) is ${key}.\n\n${signature}`;
- return email(recipients, "Server Termination Key", content);
- })();
- console.log(masterIdentifier, "distributed session key to recipients");
- process.on("uncaughtException", ({ message, stack }) => {
- if (message !== "Channel closed") {
- console.log(masterIdentifier, red(message));
- if (stack) {
- console.log(masterIdentifier, `\n${red(stack)}`);
+ pollingIntervalSeconds,
+ session_key: key
+ });
+ console.log(masterIdentifier, `spawned new server worker with process id ${activeWorker.process.pid}`);
+ // an IPC message handler that executes actions on the master thread when prompted by the active worker
+ activeWorker.on("message", ({ lifecycle, action }) => {
+ if (action) {
+ const { message, args } = action;
+ console.log(`${workerIdentifier} action requested (${cyan(message)})`);
+ switch (message) {
+ case "kill":
+ console.log(masterIdentifier, red("An authorized user has ended the server session from the /kill route"));
+ tryKillActiveWorker();
+ process.exit(0);
+ case "notify_crash":
+ const { error: { name, message, stack } } = args;
+ const content = [
+ "You, as a Dash Administrator, are being notified of a server crash event. Here's what we know:",
+ `name:\n${name}`,
+ `message:\n${message}`,
+ `stack:\n${stack}`,
+ "The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress.",
+ signature
+ ].join("\n\n");
+ Email.dispatchAll(recipients, "Dash Web Server Crash", content);
}
+ } else if (lifecycle) {
+ console.log(`${workerIdentifier} lifecycle phase (${lifecycle})`);
}
});
- setupMaster({ silent: silentChildren });
- const spawn = () => {
- tryKillActiveWorker();
- activeWorker = fork({ heartbeat, session_key: key });
- console.log(masterIdentifier, `spawned new server worker with process id ${activeWorker.process.pid}`);
- activeWorker.on("message", ({ lifecycle, action }) => {
- if (action) {
- const { message, args } = action;
- console.log(`${workerIdentifier} action requested (${cyan(message)})`);
- switch (message) {
- case "kill":
- console.log(masterIdentifier, red("An authorized user has ended the server session from the /kill route"));
- tryKillActiveWorker();
- process.exit(0);
- case "notify_crash":
- const { error: { name, message, stack } } = args;
- email(recipients, "Dash Web Server Crash", [
- "You, as a Dash Administrator, are being notified of a server crash event. Here's what we know:",
- `name:\n${name}`,
- `message:\n${message}`,
- `stack:\n${stack}`,
- "The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress.",
- signature
- ].join("\n\n"));
- }
- } else if (lifecycle) {
- console.log(`${workerIdentifier} lifecycle phase (${lifecycle})`);
- }
- });
- };
+ };
+
+ // a helpful cluster event called on the master thread each time a child process exits
+ on("exit", ({ process: { pid } }, code, signal) => {
+ const prompt = `Server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`;
+ console.log(masterIdentifier, cyan(prompt));
+ // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one
spawn();
- on("exit", ({ process: { pid } }, code, signal) => {
- const prompt = `Server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`;
- console.log(masterIdentifier, cyan(prompt));
- spawn();
- });
- const { registerCommand } = new Repl({ identifier: masterIdentifier });
- registerCommand("exit", [], () => execSync(onWindows ? "taskkill /f /im node.exe" : "killall -9 node"));
- registerCommand("pull", [], () => execSync("git pull", { stdio: ["ignore", "inherit", "inherit"] }));
- registerCommand("restart", [], () => {
- listening = false;
- tryKillActiveWorker();
+ });
+
+ // builds the repl that allows the following commands to be typed into stdin of the master thread
+ const { registerCommand } = new Repl({ identifier: masterIdentifier });
+ registerCommand("exit", [], () => execSync(onWindows ? "taskkill /f /im node.exe" : "killall -9 node"));
+ registerCommand("pull", [], () => execSync("git pull", { stdio: ["ignore", "inherit", "inherit"] }));
+ registerCommand("restart", [], () => {
+ // indicate to the worker that we are 'expecting' this restart
+ activeWorker.send({ setListening: false });
+ tryKillActiveWorker();
+ });
+
+ // finally, set things in motion by spawning off the first child (server) process
+ spawn();
+ }
+
+ /**
+ * Effectively, each worker repairs the connection to the server by reintroducing a consistent state
+ * if its predecessor has died. It itself also polls the server heartbeat, and exits with a notification
+ * email if the server encounters an uncaught exception or if the server cannot be reached.
+ * @param work the function specifying the work to be done by each worker thread
+ */
+ export async function initializeWorker(work: Function) {
+ let listening = false;
+
+ // notify master thread (which will log update in the console) of initialization via IPC
+ process.send?.({ lifecycle: green("initializing...") });
+
+ // updates the local value of listening to the value sent from master
+ process.on("message", ({ setListening }) => listening = setListening);
+
+ // called whenever the process has a reason to terminate, either through an uncaught exception
+ // in the process (potentially inconsistent state) or the server cannot be reached
+ const activeExit = (error: Error) => {
+ if (!listening) {
+ return;
+ }
+ listening = false;
+ // communicates via IPC to the master thread that it should dispatch a crash notification email
+ process.send?.({
+ action: {
+ message: "notify_crash",
+ args: { error }
+ }
});
- } else {
- process.send?.({ lifecycle: green("initializing...") });
- process.on('uncaughtException', activeExit);
- const checkHeartbeat = async () => {
- await new Promise<void>(resolve => {
- setTimeout(async () => {
- try {
- await get(process.env.heartbeat!);
- if (!listening) {
- process.send?.({ lifecycle: green("listening...") });
- }
- listening = true;
- resolve();
- } catch (error) {
- await activeExit(error);
+ const { _socket } = WebSocket;
+ // notifies all client users of a crash event
+ if (_socket) {
+ Utils.Emit(_socket, MessageStore.ConnectionTerminated, "Manual");
+ }
+ // notify master thread (which will log update in the console) of crash event via IPC
+ process.send?.({ lifecycle: red(`Crash event detected @ ${new Date().toUTCString()}`) });
+ process.send?.({ lifecycle: red(error.message) });
+ process.exit(1);
+ };
+
+ // one reason to exit, as the process might be in an inconsistent state after such an exception
+ process.on('uncaughtException', activeExit);
+
+ const { pollingIntervalSeconds, heartbeat } = process.env;
+
+ // this monitors the health of the server by submitting a get request to whatever port / route specified
+ // by the configuration every n seconds, where n is also given by the configuration.
+ const checkHeartbeat = async () => {
+ await new Promise<void>(resolve => {
+ setTimeout(async () => {
+ try {
+ await get(heartbeat!);
+ if (!listening) {
+ // notify master thread (which will log update in the console) via IPC that the server is up and running
+ process.send?.({ lifecycle: green("listening...") });
}
- }, 1000 * 15);
- });
- checkHeartbeat();
- };
- work();
+ listening = true;
+ resolve();
+ } catch (error) {
+ // if we expect the server to be unavailable, i.e. during compilation,
+ // the listening variable is false, activeExit will return early and the child
+ // process will continue
+ activeExit(error);
+ }
+ }, 1000 * Number(pollingIntervalSeconds));
+ });
+ // controlled, asynchronous infinite recursion achieves a persistent poll that does not submit a new request until the previous has completed
checkHeartbeat();
- }
+ };
+
+ // the actual work of the process, may be asynchronous
+ // for Dash, this is the code that launches the server
+ work();
+
+ // begin polling
+ checkHeartbeat();
}
} \ No newline at end of file
diff --git a/src/server/Session/session_config_schema.ts b/src/server/Session/session_config_schema.ts
index 25d95c243..a5010055a 100644
--- a/src/server/Session/session_config_schema.ts
+++ b/src/server/Session/session_config_schema.ts
@@ -1,25 +1,31 @@
import { Schema } from "jsonschema";
-export const configurationSchema: Schema = {
- id: "/Configuration",
- type: "object",
- properties: {
- recipients: {
- type: "array",
- items: {
- type: "string",
- pattern: /[^\@]+\@[^\@]+/g
- },
- minLength: 1
- },
- heartbeat: {
+const emailPattern = /^(([a-zA-Z0-9_.-])+@([a-zA-Z0-9_.-])+\.([a-zA-Z])+([a-zA-Z])+)?$/g;
+const localPortPattern = /http\:\/\/localhost:\d+\/[a-zA-Z]+/g;
+
+const properties = {
+ recipients: {
+ type: "array",
+ items: {
type: "string",
- pattern: /http\:\/\/localhost:\d+\/[a-zA-Z]+/g
+ pattern: emailPattern
},
- signature: { type: "string" },
- masterIdentifier: { type: "string", minLength: 1 },
- workerIdentifier: { type: "string", minLength: 1 },
- silentChildren: { type: "boolean" }
+ minLength: 1
},
- required: ["heartbeat", "recipients", "signature", "masterIdentifier", "workerIdentifier", "silentChildren"]
+ heartbeat: {
+ type: "string",
+ pattern: localPortPattern
+ },
+ signature: { type: "string" },
+ masterIdentifier: { type: "string", minLength: 1 },
+ workerIdentifier: { type: "string", minLength: 1 },
+ showServerOutput: { type: "boolean" },
+ pollingIntervalSeconds: { type: "number", minimum: 1, maximum: 86400 }
+};
+
+export const configurationSchema: Schema = {
+ id: "/configuration",
+ type: "object",
+ properties,
+ required: Object.keys(properties)
}; \ No newline at end of file
diff --git a/src/server/index.ts b/src/server/index.ts
index 381496c20..0a5b4afae 100644
--- a/src/server/index.ts
+++ b/src/server/index.ts
@@ -24,6 +24,13 @@ import GooglePhotosManager from "./ApiManagers/GooglePhotosManager";
import { Logger } from "./ProcessFactory";
import { yellow } from "colors";
import { Session } from "./Session/session";
+import { isMaster } from "cluster";
+
+if (isMaster) {
+ Session.initializeMaster();
+} else {
+ Session.initializeWorker(launch);
+}
export const publicDirectory = path.resolve(__dirname, "public");
export const filesDirectory = path.resolve(publicDirectory, "files");
@@ -133,6 +140,4 @@ async function launch() {
action: preliminaryFunctions
});
await initializeServer({ serverPort: 1050, routeSetter });
-}
-
-Session.initialize(launch); \ No newline at end of file
+} \ No newline at end of file