aboutsummaryrefslogtreecommitdiff
path: root/src/server/DashSession/Session/agents/promisified_ipc_manager.ts
blob: fc870d003bd4ec2d8da0a9edd60d25b01c612a47 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import { ChildProcess } from 'child_process';
import { Utilities } from '../utilities/utilities';

/**
 * Specifies a general message format for this API
 */
export type Message<T = any> = {
    name: string;
    args?: T;
};
export type MessageHandler<T = any> = (args: T) => any | Promise<any>;

/**
 * Captures the logic to execute upon receiving a message
 * of a certain name.
 */
export type HandlerMap = { [name: string]: MessageHandler[] };

/**
 * This will always literally be a child process. But, though setting
 * up a manager in the parent will indeed see the target as the ChildProcess,
 * setting up a manager in the child will just see itself as a regular NodeJS.Process.
 */
export type IPCTarget = NodeJS.Process | ChildProcess;

interface Metadata {
    isResponse: boolean;
    id: string;
}
/**
 * When a message is emitted, it is embedded with private metadata
 * to facilitate the resolution of promises, etc.
 */
interface InternalMessage extends Message {
    metadata: Metadata;
}

/**
 * Allows for the transmission of the error's key features over IPC.
 */
export interface ErrorLike {
    name: string;
    message: string;
    stack?: string;
}

/**
 * The arguments returned in a message sent from the target upon completion.
 */
export interface Response<T = any> {
    results?: T[];
    error?: ErrorLike;
}

const destroyEvent = '__destroy__';

/**
 * This is a wrapper utility class that allows the caller process
 * to emit an event and return a promise that resolves when it and all
 * other processes listening to its emission of this event have completed.
 */
export class PromisifiedIPCManager {
    private readonly target: IPCTarget;
    private pendingMessages: { [id: string]: string } = {};
    private isDestroyed = false;
    private get callerIsTarget() {
        return process.pid === this.target.pid;
    }

    constructor(target: IPCTarget, handlers?: HandlerMap) {
        this.target = target;
        if (handlers) {
            handlers[destroyEvent] = [this.destroyHelper];
            this.target.addListener('message', this.generateInternalHandler(handlers));
        }
    }

    /**
     * This routine uniquely identifies each message, then adds a general
     * message listener that waits for a response with the same id before resolving
     * the promise.
     */
    public emit = async <T = any>(name: string, args?: any): Promise<Response<T>> => {
        if (this.isDestroyed) {
            const error = { name: 'FailedDispatch', message: 'Cannot use a destroyed IPC manager to emit a message.' };
            return { error };
        }
        return new Promise<Response<T>>(resolve => {
            const messageId = Utilities.guid();
            type InternalMessageHandler = (message: any /* MessageListener */) => any | Promise<any>;
            const responseHandler: InternalMessageHandler = ({ metadata: { id, isResponse }, args: hargs }) => {
                if (isResponse && id === messageId) {
                    this.target.removeListener('message', responseHandler);
                    resolve(hargs);
                }
            };
            this.target.addListener('message', responseHandler);
            const message = { name, args, metadata: { id: messageId, isResponse: false } };
            if (!(this.target.send && this.target.send(message))) {
                const error: ErrorLike = { name: 'FailedDispatch', message: "Either the target's send method was undefined or the act of sending failed." };
                resolve({ error });
                this.target.removeListener('message', responseHandler);
            }
        });
    };

    /**
     * Invoked from either the parent or the child process, this allows
     * any unresolved promises to continue in the target process, but dispatches a dummy
     * completion response for each of the pending messages, allowing their
     * promises in the caller to resolve.
     */
    public destroy = () =>
        // eslint-disable-next-line no-async-promise-executor
        new Promise<void>(async resolve => {
            if (this.callerIsTarget) {
                this.destroyHelper();
            } else {
                await this.emit(destroyEvent);
            }
            resolve();
        });

    /**
     * Dispatches the dummy responses and sets the isDestroyed flag to true.
     */
    private destroyHelper = () => {
        const { pendingMessages } = this;
        this.isDestroyed = true;
        Object.keys(pendingMessages).forEach(id => {
            const error: ErrorLike = { name: 'ManagerDestroyed', message: 'The IPC manager was destroyed before the response could be returned.' };
            const message: InternalMessage = { name: pendingMessages[id], args: { error }, metadata: { id, isResponse: true } };
            this.target.send?.(message);
        });
        this.pendingMessages = {};
    };

    /**
     * This routine receives a uniquely identified message. If the message is itself a response,
     * it is ignored to avoid infinite mutual responses. Otherwise, the routine awaits its completion using whatever
     * router the caller has installed, and then sends a response containing the original message id,
     * which will ultimately invoke the responseHandler of the original emission and resolve the
     * sender's promise.
     */
    private generateInternalHandler =
        (handlers: HandlerMap): MessageHandler =>
        async (message: InternalMessage) => {
            const { name, args, metadata } = message;
            if (name && metadata && !metadata.isResponse) {
                const { id } = metadata;
                this.pendingMessages[id] = name;
                let error: Error | undefined;
                let results: any[] | undefined;
                try {
                    const registered = handlers[name];
                    if (registered) {
                        results = await Promise.all(registered.map(handler => handler(args)));
                    }
                } catch (e: any) {
                    error = e;
                }
                if (!this.isDestroyed && this.target.send) {
                    const metadataRes = { id, isResponse: true };
                    const response: Response = { results, error };
                    const messageRes = { name, args: response, metadata: metadataRes };
                    delete this.pendingMessages[id];
                    this.target.send(messageRes);
                }
            }
        };
}

/**
 * Convenience constructor
 * @param target the process / worker to which to attach the specialized listeners
 */
export function manage(target: IPCTarget, handlers?: HandlerMap) {
    return new PromisifiedIPCManager(target, handlers);
}