aboutsummaryrefslogtreecommitdiff
path: root/src/server/DashSession/Session/agents/monitor.ts
blob: 6cdad46c2fda79133e3ae4ae00bc3d719d93c9d8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
import { ExecOptions, exec } from 'child_process';
import * as _cluster from 'cluster';
import { Worker } from 'cluster';
import { blue, cyan, red, white, yellow } from 'colors';
import { readFileSync } from 'fs';
import { ValidationError, validate } from 'jsonschema';
import Repl, { ReplAction } from '../utilities/repl';
import { Configuration, Identifiers, colorMapping, configurationSchema, defaultConfig } from '../utilities/session_config';
import { Utilities } from '../utilities/utilities';
import { ExitHandler } from './applied_session_agent';
import IPCMessageReceiver from './process_message_router';
import { ErrorLike, MessageHandler, manage } from './promisified_ipc_manager';
import { ServerWorker } from './server_worker';

const cluster = _cluster as any;
const { isWorker, setupMaster, on, fork } = cluster;

/**
 * Validates and reads the configuration file, accordingly builds a child process factory
 * and spawns off an initial process that will respawn as predecessors die.
 */
export class Monitor extends IPCMessageReceiver {
    private static count = 0;
    private finalized = false;
    private exitHandlers: ExitHandler[] = [];
    private readonly config: Configuration;
    private activeWorker: Worker | undefined;
    private key: string | undefined;
    private repl: Repl;

    public static Create() {
        if (isWorker) {
            ServerWorker.IPCManager.emit('kill', {
                reason: 'cannot create a monitor on the worker process.',
                graceful: false,
                errorCode: 1,
            });
            process.exit(1);
        } else if (++Monitor.count > 1) {
            console.error(red('cannot create more than one monitor.'));
            process.exit(1);
        }
        return new Monitor();
    }

    private constructor() {
        super();
        console.log(this.timestamp(), cyan('initializing session...'));
        this.configureInternalHandlers();
        this.config = this.loadAndValidateConfiguration();
        this.initializeClusterFunctions();
        this.repl = this.initializeRepl();
    }

    protected configureInternalHandlers = () => {
        // handle exceptions in the master thread - there shouldn't be many of these
        // the IPC (inter process communication) channel closed exception can't seem
        // to be caught in a try catch, and is inconsequential, so it is ignored
        process.on('uncaughtException', ({ message, stack }): void => {
            if (message !== 'Channel closed') {
                this.mainLog(red(message));
                if (stack) {
                    this.mainLog(`uncaught exception\n${red(stack)}`);
                }
            }
        });

        this.on('kill', ({ reason, graceful, errorCode }) => this.killSession(reason, graceful, errorCode));
        this.on('lifecycle', ({ event }) => console.log(this.timestamp(), `${this.config.identifiers.worker.text} lifecycle phase (${event})`));
    };

    private initializeClusterFunctions = () => {
        // determines whether or not we see the compilation / initialization / runtime output of each child server process
        const output = this.config.showServerOutput ? 'inherit' : 'ignore';
        setupMaster({ stdio: ['ignore', output, output, 'ipc'] });

        // a helpful cluster event called on the master thread each time a child process exits
        on('exit', ({ process: { pid } }: { process: { pid: any } }, code: any, signal: any) => {
            const prompt = `server worker with process id ${pid} has exited with code ${code}${signal === null ? '' : `, having encountered signal ${signal}`}.`;
            this.mainLog(cyan(prompt));
            // to make this a robust, continuous session, every time a child process dies, we immediately spawn a new one
            this.spawn();
        });
    };

    public finalize = (sessionKey: string): void => {
        if (this.finalized) {
            throw new Error('Session monitor is already finalized');
        }
        this.finalized = true;
        this.key = sessionKey;
        this.spawn();
    };

    public readonly coreHooks = Object.freeze({
        onCrashDetected: (listener: MessageHandler<{ error: ErrorLike }>) => this.on(Monitor.IntrinsicEvents.CrashDetected, listener),
        onServerRunning: (listener: MessageHandler<{ isFirstTime: boolean }>) => this.on(Monitor.IntrinsicEvents.ServerRunning, listener),
    });

    /**
     * Kill this session and its active child
     * server process, either gracefully (may wait
     * indefinitely, but at least allows active networking
     * requests to complete) or immediately.
     */
    public killSession = async (reason: string, graceful = true, errorCode = 0) => {
        this.mainLog(cyan(`exiting session ${graceful ? 'clean' : 'immediate'}ly`));
        this.mainLog(`session exit reason: ${red(reason)}`);
        await this.executeExitHandlers(true);
        await this.killActiveWorker(graceful, true);
        process.exit(errorCode);
    };

    /**
     * Execute the list of functions registered to be called
     * whenever the process exits.
     */
    public addExitHandler = (handler: ExitHandler) => this.exitHandlers.push(handler);

    /**
     * Extend the default repl by adding in custom commands
     * that can invoke application logic external to this module
     */
    public addReplCommand = (basename: string, argPatterns: (RegExp | string)[], action: ReplAction) => {
        this.repl.registerCommand(basename, argPatterns, action);
    };

    public exec = (command: string, options?: ExecOptions) =>
        new Promise<void>(resolve => {
            exec(command, { ...options, encoding: 'utf8' }, (error, stdout, stderr) => {
                if (error) {
                    this.execLog(red(`unable to execute ${white(command)}`));
                    error.message.split('\n').forEach(line => line.length && this.execLog(red(`(error) ${line}`)));
                } else {
                    const outLines = stdout.split('\n').filter(line => line.length);
                    if (outLines.length) {
                        outLines.forEach(line => line.length && this.execLog(cyan(`(stdout) ${line}`)));
                    }
                    const errorLines = stderr.split('\n').filter(line => line.length);
                    if (errorLines.length) {
                        errorLines.forEach(line => line.length && this.execLog(yellow(`(stderr) ${line}`)));
                    }
                }
                resolve();
            });
        });

    /**
     * Generates a blue UTC string associated with the time
     * of invocation.
     */
    private timestamp = () => blue(`[${new Date().toUTCString()}]`);

    /**
     * A formatted, identified and timestamped log in color
     */
    public mainLog = (...optionalParams: any[]) => {
        console.log(this.timestamp(), this.config.identifiers.master.text, ...optionalParams);
    };

    /**
     * A formatted, identified and timestamped log in color for non-
     */
    private execLog = (...optionalParams: any[]) => {
        console.log(this.timestamp(), this.config.identifiers.exec.text, ...optionalParams);
    };

    /**
     * Reads in configuration .json file only once, in the master thread
     * and pass down any variables the pertinent to the child processes as environment variables.
     */
    private loadAndValidateConfiguration = (): Configuration => {
        let config: Configuration | undefined;
        try {
            console.log(this.timestamp(), cyan('validating configuration...'));
            config = JSON.parse(readFileSync('./session.config.json', 'utf8'));
            const options = {
                throwError: true,
                allowUnknownAttributes: false,
            };
            // ensure all necessary and no excess information is specified by the configuration file
            validate(config, configurationSchema, options);
            config = Utilities.preciseAssign({}, defaultConfig, config);
        } catch (error: any) {
            if (error instanceof ValidationError) {
                console.log(red('\nSession configuration failed.'));
                console.log('The given session.config.json configuration file is invalid.');
                console.log(`${error.instance}: ${error.stack}`);
                process.exit(0);
            } else if (error.code === 'ENOENT' && error.path === './session.config.json') {
                console.log(cyan('Loading default session parameters...'));
                console.log('Consider including a session.config.json configuration file in your project root for customization.');
                config = Utilities.preciseAssign({}, defaultConfig);
            } else {
                console.log(red('\nSession configuration failed.'));
                console.log('The following unknown error occurred during configuration.');
                console.log(error.stack);
                process.exit(0);
            }
        } finally {
            const { identifiers } = config!;
            Object.keys(identifiers).forEach(key => {
                const resolved = key as keyof Identifiers;
                const { text, color } = identifiers[resolved];
                identifiers[resolved].text = (colorMapping.get(color) || white)(`${text}:`);
            });
            return config!;
        }
    };

    /**
     * Builds the repl that allows the following commands to be typed into stdin of the master thread.
     */
    private initializeRepl = (): Repl => {
        const repl = new Repl({ identifier: () => `${this.timestamp()} ${this.config.identifiers.master.text}` });
        const boolean = /true|false/;
        const number = /\d+/;
        const letters = /[a-zA-Z]+/;
        repl.registerCommand('exit', [/clean|force/], args => this.killSession('manual exit requested by repl', args[0] === 'clean', 0));
        repl.registerCommand('restart', [/clean|force/], args => this.killActiveWorker(args[0] === 'clean'));
        repl.registerCommand('set', [letters, 'port', number, boolean], args => this.setPort(args[0], Number(args[2]), args[3] === 'true'));
        repl.registerCommand('set', [/polling/, number, boolean], args => {
            const newPollingIntervalSeconds = Math.floor(Number(args[1]));
            if (newPollingIntervalSeconds < 0) {
                this.mainLog(red('the polling interval must be a non-negative integer'));
            } else if (newPollingIntervalSeconds !== this.config.polling.intervalSeconds) {
                this.config.polling.intervalSeconds = newPollingIntervalSeconds;
                if (args[2] === 'true') {
                    Monitor.IPCManager.emit('updatePollingInterval', { newPollingIntervalSeconds });
                }
            }
        });
        return repl;
    };

    private executeExitHandlers = async (reason: Error | boolean) => Promise.all(this.exitHandlers.map(handler => handler(reason)));

    /**
     * Attempts to kill the active worker gracefully, unless otherwise specified.
     */
    private killActiveWorker = async (graceful = true, isSessionEnd = false): Promise<void> => {
        if (this.activeWorker && !this.activeWorker.isDead()) {
            if (graceful) {
                Monitor.IPCManager.emit('manualExit', { isSessionEnd });
            } else {
                await ServerWorker.IPCManager.destroy();
                this.activeWorker.process.kill();
            }
        }
    };

    /**
     * Allows the caller to set the port at which the target (be it the server,
     * the websocket, some other custom port) is listening. If an immediate restart
     * is specified, this monitor will kill the active child and re-launch the server
     * at the port. Otherwise, the updated port won't be used until / unless the child
     * dies on its own and triggers a restart.
     */
    private setPort = (port: 'server' | 'socket' | string, value: number, immediateRestart: boolean): void => {
        if (value > 1023 && value < 65536) {
            this.config.ports[port] = value;
            if (immediateRestart) {
                this.killActiveWorker();
            }
        } else {
            this.mainLog(red(`${port} is an invalid port number`));
        }
    };

    /**
     * Kills the current active worker and proceeds to spawn a new worker,
     * feeding in configuration information as environment variables.
     */
    private spawn = async (): Promise<void> => {
        await this.killActiveWorker();
        const {
            config: { polling, ports },
            key,
        } = this;
        this.activeWorker = fork({
            pollingRoute: polling.route,
            pollingFailureTolerance: polling.failureTolerance,
            serverPort: ports.server,
            socketPort: ports.socket,
            pollingIntervalSeconds: polling.intervalSeconds,
            session_key: key,
        });
        if (this.activeWorker) {
            Monitor.IPCManager = manage(this.activeWorker.process, this.handlers);
        }
        this.mainLog(cyan(`spawned new server worker with process id ${this.activeWorker?.process.pid}`));
    };
}

// eslint-disable-next-line no-redeclare
export namespace Monitor {
    export enum IntrinsicEvents {
        KeyGenerated = 'key_generated',
        CrashDetected = 'crash_detected',
        ServerRunning = 'server_running',
    }
}