1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
|
import { ExitHandler } from "./applied_session_agent";
import { Configuration, configurationSchema, defaultConfig, Identifiers, colorMapping } from "../utilities/session_config";
import Repl, { ReplAction } from "../utilities/repl";
import { isWorker, setupMaster, on, Worker, fork } from "cluster";
import { IPC } from "../utilities/ipc";
import { red, cyan, white, yellow, blue, green } from "colors";
import { exec, ExecOptions } from "child_process";
import { Utils } from "../../../Utils";
import { validate, ValidationError } from "jsonschema";
import { Utilities } from "../utilities/utilities";
import { readFileSync } from "fs";
import { EventEmitter } from "events";
/**
* Validates and reads the configuration file, accordingly builds a child process factory
* and spawns off an initial process that will respawn as predecessors die.
*/
export class Monitor extends EventEmitter {
private static count = 0;
private finalized = false;
private exitHandlers: ExitHandler[] = [];
private readonly config: Configuration;
private onMessage: { [message: string]: Monitor.ServerMessageHandler[] | undefined } = {};
private activeWorker: Worker | undefined;
private key: string | undefined;
private repl: Repl;
public static Create() {
if (isWorker) {
IPC.dispatchMessage(process, {
action: {
message: "kill",
args: {
reason: "cannot create a monitor on the worker process.",
graceful: false,
errorCode: 1
}
}
});
process.exit(1);
} else if (++Monitor.count > 1) {
console.error(red("cannot create more than one monitor."));
process.exit(1);
} else {
return new Monitor();
}
}
/**
* Kill this session and its active child
* server process, either gracefully (may wait
* indefinitely, but at least allows active networking
* requests to complete) or immediately.
*/
public killSession = async (reason: string, graceful = true, errorCode = 0) => {
this.mainLog(cyan(`exiting session ${graceful ? "clean" : "immediate"}ly`));
this.mainLog(`session exit reason: ${(red(reason))}`);
await this.executeExitHandlers(true);
this.killActiveWorker(graceful, true);
process.exit(errorCode);
}
/**
* Execute the list of functions registered to be called
* whenever the process exits.
*/
public addExitHandler = (handler: ExitHandler) => this.exitHandlers.push(handler);
/**
* Extend the default repl by adding in custom commands
* that can invoke application logic external to this module
*/
public addReplCommand = (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => {
this.repl.registerCommand(basename, argPatterns, action);
}
public exec = (command: string, options?: ExecOptions) => {
return new Promise<void>(resolve => {
exec(command, { ...options, encoding: "utf8" }, (error, stdout, stderr) => {
if (error) {
this.execLog(red(`unable to execute ${white(command)}`));
error.message.split("\n").forEach(line => line.length && this.execLog(red(`(error) ${line}`)));
} else {
let outLines: string[], errorLines: string[];
if ((outLines = stdout.split("\n").filter(line => line.length)).length) {
outLines.forEach(line => line.length && this.execLog(cyan(`(stdout) ${line}`)));
}
if ((errorLines = stderr.split("\n").filter(line => line.length)).length) {
errorLines.forEach(line => line.length && this.execLog(yellow(`(stderr) ${line}`)));
}
}
resolve();
});
});
}
/**
* Add a listener at this message. When the monitor process
* receives a message, it will invoke all registered functions.
*/
public addServerMessageListener = (message: string, handler: Monitor.ServerMessageHandler) => {
const handlers = this.onMessage[message];
if (handlers) {
handlers.push(handler);
} else {
this.onMessage[message] = [handler];
}
}
/**
* Unregister a given listener at this message.
*/
public removeServerMessageListener = (message: string, handler: Monitor.ServerMessageHandler) => {
const handlers = this.onMessage[message];
if (handlers) {
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
}
}
/**
* Unregister all listeners at this message.
*/
public clearServerMessageListeners = (message: string) => this.onMessage[message] = undefined;
private constructor() {
super();
console.log(this.timestamp(), cyan("initializing session..."));
this.config = this.loadAndValidateConfiguration();
// determines whether or not we see the compilation / initialization / runtime output of each child server process
const output = this.config.showServerOutput ? "inherit" : "ignore";
setupMaster({ stdio: ["ignore", output, output, "ipc"] });
// handle exceptions in the master thread - there shouldn't be many of these
// the IPC (inter process communication) channel closed exception can't seem
// to be caught in a try catch, and is inconsequential, so it is ignored
process.on("uncaughtException", ({ message, stack }): void => {
if (message !== "Channel closed") {
this.mainLog(red(message));
if (stack) {
this.mainLog(`uncaught exception\n${red(stack)}`);
}
}
});
// a helpful cluster event called on the master thread each time a child process exits
on("exit", ({ process: { pid } }, code, signal) => {
const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? "" : `, having encountered signal ${signal}`}.`;
this.mainLog(cyan(prompt));
// to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one
this.spawn();
});
this.repl = this.initializeRepl();
}
public finalize = (): void => {
if (this.finalized) {
throw new Error("Session monitor is already finalized");
}
this.finalized = true;
this.emit(Monitor.IntrinsicEvents.KeyGenerated, this.key = Utils.GenerateGuid());
this.spawn();
}
/**
* Generates a blue UTC string associated with the time
* of invocation.
*/
private timestamp = () => blue(`[${new Date().toUTCString()}]`);
/**
* A formatted, identified and timestamped log in color
*/
public mainLog = (...optionalParams: any[]) => {
console.log(this.timestamp(), this.config.identifiers.master.text, ...optionalParams);
}
/**
* A formatted, identified and timestamped log in color for non-
*/
private execLog = (...optionalParams: any[]) => {
console.log(this.timestamp(), this.config.identifiers.exec.text, ...optionalParams);
}
/**
* Reads in configuration .json file only once, in the master thread
* and pass down any variables the pertinent to the child processes as environment variables.
*/
private loadAndValidateConfiguration = (): Configuration => {
let config: Configuration;
try {
console.log(this.timestamp(), cyan("validating configuration..."));
config = JSON.parse(readFileSync('./session.config.json', 'utf8'));
const options = {
throwError: true,
allowUnknownAttributes: false
};
// ensure all necessary and no excess information is specified by the configuration file
validate(config, configurationSchema, options);
config = Utilities.preciseAssign({}, defaultConfig, config);
} catch (error) {
if (error instanceof ValidationError) {
console.log(red("\nSession configuration failed."));
console.log("The given session.config.json configuration file is invalid.");
console.log(`${error.instance}: ${error.stack}`);
process.exit(0);
} else if (error.code === "ENOENT" && error.path === "./session.config.json") {
console.log(cyan("Loading default session parameters..."));
console.log("Consider including a session.config.json configuration file in your project root for customization.");
config = Utilities.preciseAssign({}, defaultConfig);
} else {
console.log(red("\nSession configuration failed."));
console.log("The following unknown error occurred during configuration.");
console.log(error.stack);
process.exit(0);
}
} finally {
const { identifiers } = config!;
Object.keys(identifiers).forEach(key => {
const resolved = key as keyof Identifiers;
const { text, color } = identifiers[resolved];
identifiers[resolved].text = (colorMapping.get(color) || white)(`${text}:`);
});
return config!;
}
}
/**
* Builds the repl that allows the following commands to be typed into stdin of the master thread.
*/
private initializeRepl = (): Repl => {
const repl = new Repl({ identifier: () => `${this.timestamp()} ${this.config.identifiers.master.text}` });
const boolean = /true|false/;
const number = /\d+/;
const letters = /[a-zA-Z]+/;
repl.registerCommand("exit", [/clean|force/], args => this.killSession("manual exit requested by repl", args[0] === "clean", 0));
repl.registerCommand("restart", [/clean|force/], args => this.killActiveWorker(args[0] === "clean"));
repl.registerCommand("set", [letters, "port", number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === "true"));
repl.registerCommand("set", [/polling/, number, boolean], async args => {
const newPollingIntervalSeconds = Math.floor(Number(args[1]));
if (newPollingIntervalSeconds < 0) {
this.mainLog(red("the polling interval must be a non-negative integer"));
} else {
if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) {
this.config.polling.intervalSeconds = newPollingIntervalSeconds;
if (args[2] === "true") {
return IPC.dispatchMessage(this.activeWorker!, { newPollingIntervalSeconds }, true);
}
}
}
});
return repl;
}
private executeExitHandlers = async (reason: Error | boolean) => Promise.all(this.exitHandlers.map(handler => handler(reason)));
/**
* Attempts to kill the active worker gracefully, unless otherwise specified.
*/
private killActiveWorker = (graceful = true, isSessionEnd = false): void => {
if (this.activeWorker && !this.activeWorker.isDead()) {
if (graceful) {
IPC.dispatchMessage(this.activeWorker, { manualExit: { isSessionEnd } });
} else {
this.activeWorker.process.kill();
}
}
}
/**
* Allows the caller to set the port at which the target (be it the server,
* the websocket, some other custom port) is listening. If an immediate restart
* is specified, this monitor will kill the active child and re-launch the server
* at the port. Otherwise, the updated port won't be used until / unless the child
* dies on its own and triggers a restart.
*/
private setPort = (port: "server" | "socket" | string, value: number, immediateRestart: boolean): void => {
if (value > 1023 && value < 65536) {
this.config.ports[port] = value;
if (immediateRestart) {
this.killActiveWorker();
}
} else {
this.mainLog(red(`${port} is an invalid port number`));
}
}
/**
* Kills the current active worker and proceeds to spawn a new worker,
* feeding in configuration information as environment variables.
*/
private spawn = (): void => {
const {
polling: {
route,
failureTolerance,
intervalSeconds
},
ports
} = this.config;
this.killActiveWorker();
this.activeWorker = fork({
pollingRoute: route,
pollingFailureTolerance: failureTolerance,
serverPort: ports.server,
socketPort: ports.socket,
pollingIntervalSeconds: intervalSeconds,
session_key: this.key,
ipc_suffix: IPC.suffix
});
this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`));
// an IPC message handler that executes actions on the master thread when prompted by the active worker
IPC.addMessagesHandler(this.activeWorker!, async ({ lifecycle, action }) => {
if (action) {
const { message, args } = action as Monitor.Action;
console.log(this.timestamp(), `${this.config.identifiers.worker.text} action requested (${cyan(message)})`);
switch (message) {
case "kill":
const { reason, graceful, errorCode } = args;
this.killSession(reason, graceful, errorCode);
break;
case "notify_crash":
this.emit(Monitor.IntrinsicEvents.CrashDetected, args.error);
break;
case Monitor.IntrinsicEvents.ServerRunning:
this.emit(Monitor.IntrinsicEvents.ServerRunning, args.firstTime);
break;
case "set_port":
const { port, value, immediateRestart } = args;
this.setPort(port, value, immediateRestart);
break;
}
const handlers = this.onMessage[message];
if (handlers) {
handlers.forEach(handler => handler({ message, args }));
}
}
if (lifecycle) {
console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${lifecycle})`);
}
});
}
}
export namespace Monitor {
export interface Action {
message: string;
args: any;
}
export type ServerMessageHandler = (action: Action) => void | Promise<void>;
export enum IntrinsicEvents {
KeyGenerated = "key_generated",
CrashDetected = "crash_detected",
ServerRunning = "server_running"
}
}
|