import { ChildProcess } from 'child_process'; import { Utilities } from '../utilities/utilities'; /** * Specifies a general message format for this API */ export type Message = { name: string; args?: T; }; export type MessageHandler = (args: T) => any | Promise; /** * Captures the logic to execute upon receiving a message * of a certain name. */ export type HandlerMap = { [name: string]: MessageHandler[] }; /** * This will always literally be a child process. But, though setting * up a manager in the parent will indeed see the target as the ChildProcess, * setting up a manager in the child will just see itself as a regular NodeJS.Process. */ export type IPCTarget = NodeJS.Process | ChildProcess; interface Metadata { isResponse: boolean; id: string; } /** * When a message is emitted, it is embedded with private metadata * to facilitate the resolution of promises, etc. */ interface InternalMessage extends Message { metadata: Metadata; } /** * Allows for the transmission of the error's key features over IPC. */ export interface ErrorLike { name: string; message: string; stack?: string; } /** * The arguments returned in a message sent from the target upon completion. */ export interface Response { results?: T[]; error?: ErrorLike; } const destroyEvent = '__destroy__'; /** * This is a wrapper utility class that allows the caller process * to emit an event and return a promise that resolves when it and all * other processes listening to its emission of this event have completed. */ export class PromisifiedIPCManager { private readonly target: IPCTarget; private pendingMessages: { [id: string]: string } = {}; private isDestroyed = false; private get callerIsTarget() { return process.pid === this.target.pid; } constructor(target: IPCTarget, handlers?: HandlerMap) { this.target = target; if (handlers) { handlers[destroyEvent] = [this.destroyHelper]; this.target.addListener('message', this.generateInternalHandler(handlers)); } } /** * This routine uniquely identifies each message, then adds a general * message listener that waits for a response with the same id before resolving * the promise. */ public emit = async (name: string, args?: any): Promise> => { if (this.isDestroyed) { const error = { name: 'FailedDispatch', message: 'Cannot use a destroyed IPC manager to emit a message.' }; return { error }; } return new Promise>(resolve => { const messageId = Utilities.guid(); type InternalMessageHandler = (message: any /* MessageListener */) => any | Promise; const responseHandler: InternalMessageHandler = ({ metadata: { id, isResponse }, args: hargs }) => { if (isResponse && id === messageId) { this.target.removeListener('message', responseHandler); resolve(hargs); } }; this.target.addListener('message', responseHandler); const message = { name, args, metadata: { id: messageId, isResponse: false } }; if (!(this.target.send && this.target.send(message))) { const error: ErrorLike = { name: 'FailedDispatch', message: "Either the target's send method was undefined or the act of sending failed." }; resolve({ error }); this.target.removeListener('message', responseHandler); } }); }; /** * Invoked from either the parent or the child process, this allows * any unresolved promises to continue in the target process, but dispatches a dummy * completion response for each of the pending messages, allowing their * promises in the caller to resolve. */ public destroy = () => // eslint-disable-next-line no-async-promise-executor new Promise(async resolve => { if (this.callerIsTarget) { this.destroyHelper(); } else { await this.emit(destroyEvent); } resolve(); }); /** * Dispatches the dummy responses and sets the isDestroyed flag to true. */ private destroyHelper = () => { const { pendingMessages } = this; this.isDestroyed = true; Object.keys(pendingMessages).forEach(id => { const error: ErrorLike = { name: 'ManagerDestroyed', message: 'The IPC manager was destroyed before the response could be returned.' }; const message: InternalMessage = { name: pendingMessages[id], args: { error }, metadata: { id, isResponse: true } }; this.target.send?.(message); }); this.pendingMessages = {}; }; /** * This routine receives a uniquely identified message. If the message is itself a response, * it is ignored to avoid infinite mutual responses. Otherwise, the routine awaits its completion using whatever * router the caller has installed, and then sends a response containing the original message id, * which will ultimately invoke the responseHandler of the original emission and resolve the * sender's promise. */ private generateInternalHandler = (handlers: HandlerMap): MessageHandler => async (message: InternalMessage) => { const { name, args, metadata } = message; if (name && metadata && !metadata.isResponse) { const { id } = metadata; this.pendingMessages[id] = name; let error: Error | undefined; let results: any[] | undefined; try { const registered = handlers[name]; if (registered) { results = await Promise.all(registered.map(handler => handler(args))); } } catch (e: any) { error = e; } if (!this.isDestroyed && this.target.send) { const metadataRes = { id, isResponse: true }; const response: Response = { results, error }; const messageRes = { name, args: response, metadata: metadataRes }; delete this.pendingMessages[id]; this.target.send(messageRes); } } }; } /** * Convenience constructor * @param target the process / worker to which to attach the specialized listeners */ export function manage(target: IPCTarget, handlers?: HandlerMap) { return new PromisifiedIPCManager(target, handlers); }