aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/server/DashSession.ts61
-rw-r--r--src/server/Session/session.ts125
-rw-r--r--src/server/index.ts57
3 files changed, 157 insertions, 86 deletions
diff --git a/src/server/DashSession.ts b/src/server/DashSession.ts
new file mode 100644
index 000000000..9c36fa17f
--- /dev/null
+++ b/src/server/DashSession.ts
@@ -0,0 +1,61 @@
+import { Session } from "./Session/session";
+import { Email } from "./ActionUtilities";
+import { red, yellow } from "colors";
+import { SolrManager } from "./ApiManagers/SearchManager";
+import { execSync } from "child_process";
+import { isMaster } from "cluster";
+import { Utils } from "../Utils";
+import { WebSocket } from "./Websocket/Websocket";
+import { MessageStore } from "./Message";
+import { launchServer } from ".";
+
+const notificationRecipients = ["samuel_wilkins@brown.edu"];
+const signature = "-Dash Server Session Manager";
+
+const monitorHooks: Session.MonitorNotifierHooks = {
+ key: async (key, masterLog) => {
+ const content = `The key for this session (started @ ${new Date().toUTCString()}) is ${key}.\n\n${signature}`;
+ const failures = await Email.dispatchAll(notificationRecipients, "Server Termination Key", content);
+ if (failures) {
+ failures.map(({ recipient, error: { message } }) => masterLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`)));
+ return false;
+ }
+ return true;
+ },
+ crash: async ({ name, message, stack }, masterLog) => {
+ const body = [
+ "You, as a Dash Administrator, are being notified of a server crash event. Here's what we know:",
+ `name:\n${name}`,
+ `message:\n${message}`,
+ `stack:\n${stack}`,
+ "The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress.",
+ ].join("\n\n");
+ const content = `${body}\n\n${signature}`;
+ const failures = await Email.dispatchAll(notificationRecipients, "Dash Web Server Crash", content);
+ if (failures) {
+ failures.map(({ recipient, error: { message } }) => masterLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`)));
+ return false;
+ }
+ return true;
+ }
+};
+
+export class DashSessionAgent extends Session.AppliedSessionAgent {
+
+ /**
+ * If we're the monitor (master) thread, we should launch the monitor logic for the session.
+ * Otherwise, we must be on a worker thread that was spawned *by* the monitor (master) thread, and thus
+ * our job should be to run the server.
+ */
+ protected async launchImplementation() {
+ if (isMaster) {
+ this.sessionMonitor = await Session.initializeMonitorThread(monitorHooks);
+ this.sessionMonitor.addReplCommand("pull", [], () => execSync("git pull", { stdio: ["ignore", "inherit", "inherit"] }));
+ this.sessionMonitor.addReplCommand("solr", [/start|stop/g], args => SolrManager.SetRunning(args[0] === "start"));
+ } else {
+ this.serverWorker = await Session.initializeWorkerThread(launchServer); // server initialization delegated to worker
+ this.serverWorker.addExitHandler(() => Utils.Emit(WebSocket._socket, MessageStore.ConnectionTerminated, "Manual"));
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/server/Session/session.ts b/src/server/Session/session.ts
index 2a483fbab..b22b6404d 100644
--- a/src/server/Session/session.ts
+++ b/src/server/Session/session.ts
@@ -1,5 +1,5 @@
import { red, cyan, green, yellow, magenta, blue } from "colors";
-import { on, fork, setupMaster, Worker } from "cluster";
+import { on, fork, setupMaster, Worker, isMaster } from "cluster";
import { get } from "request-promise";
import { Utils } from "../../Utils";
import Repl, { ReplAction } from "../repl";
@@ -20,6 +20,51 @@ import { configurationSchema } from "./session_config_schema";
*/
export namespace Session {
+ export abstract class AppliedSessionAgent {
+
+ private launched = false;
+
+ protected sessionMonitorRef: Session.Monitor | undefined;
+ public get sessionMonitor(): Session.Monitor {
+ if (!isMaster) {
+ throw new Error("Cannot access the session monitor directly from the server worker thread");
+ }
+ return this.sessionMonitorRef!;
+ }
+ public set sessionMonitor(monitor: Session.Monitor) {
+ if (!isMaster) {
+ throw new Error("Cannot set the session monitor directly from the server worker thread");
+ }
+ this.sessionMonitorRef = monitor;
+ }
+
+ protected serverWorkerRef: Session.ServerWorker | undefined;
+ public get serverWorker(): Session.ServerWorker {
+ if (isMaster) {
+ throw new Error("Cannot access the server worker directly from the session monitor thread");
+ }
+ return this.serverWorkerRef!;
+ }
+ public set serverWorker(worker: Session.ServerWorker) {
+ if (isMaster) {
+ throw new Error("Cannot set the server worker directly from the session monitor thread");
+ }
+ this.serverWorkerRef = worker;
+ }
+
+ public async launch(): Promise<void> {
+ if (!this.launched) {
+ this.launched = true;
+ await this.launchImplementation();
+ } else {
+ throw new Error("Cannot launch a session thread more than once per process.");
+ }
+ }
+
+ protected abstract async launchImplementation(): Promise<void>;
+
+ }
+
interface Configuration {
showServerOutput: boolean;
masterIdentifier: string;
@@ -41,12 +86,21 @@ export namespace Session {
pollingFailureTolerance: 1
};
- interface MasterExtensions {
+ export interface Monitor {
+ log: (...optionalParams: any[]) => void;
+ restartServer: () => void;
+ setPort: (port: "server" | "socket" | string, value: number, immediateRestart: boolean) => void;
+ killSession: (graceful?: boolean) => never;
addReplCommand: (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => void;
addChildMessageHandler: (message: string, handler: ActionHandler) => void;
}
- export interface NotifierHooks {
+ export interface ServerWorker {
+ killSession: () => void;
+ addExitHandler: (handler: ExitHandler) => void;
+ }
+
+ export interface MonitorNotifierHooks {
key?: (key: string, masterLog: (...optionalParams: any[]) => void) => boolean | Promise<boolean>;
crash?: (error: Error, masterLog: (...optionalParams: any[]) => void) => boolean | Promise<boolean>;
}
@@ -56,14 +110,8 @@ export namespace Session {
args: any;
}
- export interface SessionHooks {
- masterLog: (...optionalParams: any[]) => void;
- killSession: (graceful?: boolean) => never;
- restartServer: () => void;
- }
-
- export type ExitHandler = (error: Error) => void | Promise<void>;
- export type ActionHandler = (action: SessionAction, hooks: SessionHooks) => void | Promise<void>;
+ export type ExitHandler = (reason: Error | null) => void | Promise<void>;
+ export type ActionHandler = (action: SessionAction) => void | Promise<void>;
export interface EmailTemplate {
subject: string;
body: string;
@@ -125,7 +173,7 @@ export namespace Session {
* Validates and reads the configuration file, accordingly builds a child process factory
* and spawns off an initial process that will respawn as predecessors die.
*/
- export async function initializeMonitorThread(notifiers?: NotifierHooks): Promise<MasterExtensions> {
+ export async function initializeMonitorThread(notifiers?: MonitorNotifierHooks): Promise<Monitor> {
console.log(timestamp(), cyan("initializing session..."));
let activeWorker: Worker;
const childMessageHandlers: { [message: string]: ActionHandler } = {};
@@ -143,26 +191,26 @@ export namespace Session {
} = configuration;
let { pollingIntervalSeconds } = configuration;
- const masterLog = (...optionalParams: any[]) => console.log(timestamp(), masterIdentifier, ...optionalParams);
+ const log = (...optionalParams: any[]) => console.log(timestamp(), masterIdentifier, ...optionalParams);
// this sends a pseudorandomly generated guid to the configuration's recipients, allowing them alone
// to kill the server via the /kill/:key route
let key: string | undefined;
if (notifiers && notifiers.key) {
key = Utils.GenerateGuid();
- const success = await notifiers.key(key, masterLog);
+ const success = await notifiers.key(key, log);
const statement = success ? green("distributed session key to recipients") : red("distribution of session key failed");
- masterLog(statement);
+ log(statement);
}
// handle exceptions in the master thread - there shouldn't be many of these
// the IPC (inter process communication) channel closed exception can't seem
// to be caught in a try catch, and is inconsequential, so it is ignored
- process.on("uncaughtException", ({ message, stack }) => {
+ process.on("uncaughtException", ({ message, stack }): void => {
if (message !== "Channel closed") {
- masterLog(red(message));
+ log(red(message));
if (stack) {
- masterLog(`uncaught exception\n${red(stack)}`);
+ log(`uncaught exception\n${red(stack)}`);
}
}
});
@@ -183,26 +231,26 @@ export namespace Session {
return false;
};
- const restartServer = () => {
+ const restartServer = (): void => {
// indicate to the worker that we are 'expecting' this restart
activeWorker.send({ setResponsiveness: false });
tryKillActiveWorker(true);
};
- const killSession = (graceful = true) => {
- masterLog(cyan(`exiting session ${graceful ? "clean" : "immediate"}ly`));
+ const killSession = (graceful = true): never => {
+ log(cyan(`exiting session ${graceful ? "clean" : "immediate"}ly`));
tryKillActiveWorker(graceful);
process.exit(0);
};
- const setPort = (port: string, value: number, immediateRestart: boolean) => {
+ const setPort = (port: "server" | "socket" | string, value: number, immediateRestart: boolean): void => {
if (value > 1023 && value < 65536) {
ports[port] = value;
if (immediateRestart) {
restartServer();
}
} else {
- masterLog(red(`${port} is an invalid port number`));
+ log(red(`${port} is an invalid port number`));
}
};
@@ -218,7 +266,7 @@ export namespace Session {
pollingIntervalSeconds,
session_key: key
});
- masterLog(cyan(`spawned new server worker with process id ${activeWorker.process.pid}`));
+ log(cyan(`spawned new server worker with process id ${activeWorker.process.pid}`));
// an IPC message handler that executes actions on the master thread when prompted by the active worker
activeWorker.on("message", async ({ lifecycle, action }) => {
if (action) {
@@ -226,14 +274,14 @@ export namespace Session {
console.log(timestamp(), `${workerIdentifier} action requested (${cyan(message)})`);
switch (message) {
case "kill":
- masterLog(red("an authorized user has manually ended the server session"));
+ log(red("an authorized user has manually ended the server session"));
killSession();
case "notify_crash":
if (notifiers && notifiers.crash) {
const { error } = args;
- const success = await notifiers.crash(error, masterLog);
+ const success = await notifiers.crash(error, log);
const statement = success ? green("distributed crash notification to recipients") : red("distribution of crash notification failed");
- masterLog(statement);
+ log(statement);
}
case "set_port":
const { port, value, immediateRestart } = args;
@@ -241,7 +289,7 @@ export namespace Session {
default:
const handler = childMessageHandlers[message];
if (handler) {
- handler({ message, args }, { restartServer, killSession, masterLog });
+ handler({ message, args });
}
}
} else if (lifecycle) {
@@ -253,7 +301,7 @@ export namespace Session {
// a helpful cluster event called on the master thread each time a child process exits
on("exit", ({ process: { pid } }, code, signal) => {
const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`;
- masterLog(cyan(prompt));
+ log(cyan(prompt));
// to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one
spawn();
});
@@ -269,7 +317,7 @@ export namespace Session {
repl.registerCommand("set", [/polling/, number, boolean], args => {
const newPollingIntervalSeconds = Math.floor(Number(args[2]));
if (newPollingIntervalSeconds < 0) {
- masterLog(red("the polling interval must be a non-negative integer"));
+ log(red("the polling interval must be a non-negative integer"));
} else {
if (newPollingIntervalSeconds !== pollingIntervalSeconds) {
pollingIntervalSeconds = newPollingIntervalSeconds;
@@ -285,7 +333,11 @@ export namespace Session {
// returned to allow the caller to add custom commands
return {
addReplCommand: repl.registerCommand,
- addChildMessageHandler: (message: string, handler: ActionHandler) => { childMessageHandlers[message] = handler; }
+ addChildMessageHandler: (message: string, handler: ActionHandler) => { childMessageHandlers[message] = handler; },
+ restartServer,
+ killSession,
+ setPort,
+ log
};
}
@@ -295,7 +347,7 @@ export namespace Session {
* email if the server encounters an uncaught exception or if the server cannot be reached.
* @param work the function specifying the work to be done by each worker thread
*/
- export async function initializeWorkerThread(work: Function): Promise<(handler: ExitHandler) => void> {
+ export async function initializeWorkerThread(work: Function): Promise<ServerWorker> {
let shouldServerBeResponsive = false;
const exitHandlers: ExitHandler[] = [];
let pollingFailureCount = 0;
@@ -315,6 +367,8 @@ export namespace Session {
}
});
+ const executeExitHandlers = async (reason: Error | null) => Promise.all(exitHandlers.map(handler => handler(reason)));
+
// called whenever the process has a reason to terminate, either through an uncaught exception
// in the process (potentially inconsistent state) or the server cannot be reached
const activeExit = async (error: Error): Promise<void> => {
@@ -326,7 +380,7 @@ export namespace Session {
args: { error }
}
});
- await Promise.all(exitHandlers.map(handler => handler(error)));
+ await executeExitHandlers(error);
// notify master thread (which will log update in the console) of crash event via IPC
lifecycleNotification(red(`crash event detected @ ${new Date().toUTCString()}`));
lifecycleNotification(red(error.message));
@@ -375,7 +429,10 @@ export namespace Session {
work();
pollServer(); // begin polling
- return (handler: ExitHandler) => exitHandlers.push(handler);
+ return {
+ addExitHandler: (handler: ExitHandler) => exitHandlers.push(handler),
+ killSession: () => process.send!({ action: { message: "kill" } })
+ };
}
} \ No newline at end of file
diff --git a/src/server/index.ts b/src/server/index.ts
index 9bc3c7617..27e9a677b 100644
--- a/src/server/index.ts
+++ b/src/server/index.ts
@@ -3,7 +3,6 @@ import { GoogleApiServerUtils } from "./apis/google/GoogleApiServerUtils";
import * as mobileDetect from 'mobile-detect';
import * as path from 'path';
import { Database } from './database';
-const serverPort = 4321;
import { DashUploadUtils } from './DashUploadUtils';
import RouteSubscriber from './RouteSubscriber';
import initializeServer from './server_initialization';
@@ -24,10 +23,7 @@ import GooglePhotosManager from "./ApiManagers/GooglePhotosManager";
import { Logger } from "./ProcessFactory";
import { yellow, red } from "colors";
import { Session } from "./Session/session";
-import { isMaster } from "cluster";
-import { execSync } from "child_process";
-import { Utils } from "../Utils";
-import { MessageStore } from "./Message";
+import { DashSessionAgent } from "./DashSession";
export const publicDirectory = path.resolve(__dirname, "public");
export const filesDirectory = path.resolve(publicDirectory, "files");
@@ -96,7 +92,7 @@ function routeSetter({ isRelease, addSupervisedRoute, logRegistrationOutcome }:
secureHandler: ({ req, res }) => {
if (req.params.key === process.env.session_key) {
res.send("<img src='https://media.giphy.com/media/NGIfqtcS81qi4/giphy.gif' style='width:100%;height:100%;'/>");
- process.send!({ action: { message: "kill" } });
+ sessionAgent.serverWorker.killSession();
} else {
res.redirect("/home");
}
@@ -136,7 +132,7 @@ function routeSetter({ isRelease, addSupervisedRoute, logRegistrationOutcome }:
* however, this becomes the logic invoked by a single worker thread spawned by
* the main monitor (master) thread.
*/
-async function launchServer() {
+export async function launchServer() {
await log_execution({
startMessage: "\nstarting execution of preliminary functions",
endMessage: "completed preliminary functions\n",
@@ -145,50 +141,7 @@ async function launchServer() {
await initializeServer(routeSetter);
}
-/**
- * If we're the monitor (master) thread, we should launch the monitor logic for the session.
- * Otherwise, we must be on a worker thread that was spawned *by* the monitor (master) thread, and thus
- * our job should be to run the server.
- */
-async function launchMonitoredSession() {
- if (isMaster) {
- const notificationRecipients = ["samuel_wilkins@brown.edu"];
- const signature = "-Dash Server Session Manager";
- const extensions = await Session.initializeMonitorThread({
- key: async (key, masterLog) => {
- const content = `The key for this session (started @ ${new Date().toUTCString()}) is ${key}.\n\n${signature}`;
- const failures = await Email.dispatchAll(notificationRecipients, "Server Termination Key", content);
- if (failures) {
- failures.map(({ recipient, error: { message } }) => masterLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`)));
- return false;
- }
- return true;
- },
- crash: async ({ name, message, stack }, masterLog) => {
- const body = [
- "You, as a Dash Administrator, are being notified of a server crash event. Here's what we know:",
- `name:\n${name}`,
- `message:\n${message}`,
- `stack:\n${stack}`,
- "The server is already restarting itself, but if you're concerned, use the Remote Desktop Connection to monitor progress.",
- ].join("\n\n");
- const content = `${body}\n\n${signature}`;
- const failures = await Email.dispatchAll(notificationRecipients, "Dash Web Server Crash", content);
- if (failures) {
- failures.map(({ recipient, error: { message } }) => masterLog(red(`dispatch failure @ ${recipient} (${yellow(message)})`)));
- return false;
- }
- return true;
- }
- });
- extensions.addReplCommand("pull", [], () => execSync("git pull", { stdio: ["ignore", "inherit", "inherit"] }));
- extensions.addReplCommand("solr", [/start|stop/g], args => SolrManager.SetRunning(args[0] === "start"));
- } else {
- const addExitHandler = await Session.initializeWorkerThread(launchServer); // server initialization delegated to worker
- addExitHandler(() => Utils.Emit(WebSocket._socket, MessageStore.ConnectionTerminated, "Manual"));
- }
-}
-
+export const sessionAgent: Session.AppliedSessionAgent = new DashSessionAgent();
/**
* If you're in development mode, you won't need to run a session.
* The session spawns off new server processes each time an error is encountered, and doesn't
@@ -196,7 +149,7 @@ async function launchMonitoredSession() {
* So, the 'else' clause is exactly what we've always run when executing npm start.
*/
if (process.env.RELEASE) {
- launchMonitoredSession();
+ sessionAgent.launch();
} else {
launchServer();
} \ No newline at end of file