1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
|
/* eslint-disable @typescript-eslint/no-namespace */
import { action } from 'mobx';
import { Socket, io } from 'socket.io-client';
import { ClientUtils } from '../ClientUtils';
import { Utils, emptyFunction } from '../Utils';
import { Doc, FieldType, Opt, SetObjGetRefField, SetObjGetRefFields } from '../fields/Doc';
import { UpdatingFromServer } from '../fields/DocSymbols';
import { FieldLoader } from '../fields/FieldLoader';
import { HandleUpdate, Id, Parent } from '../fields/FieldSymbols';
import { ObjectField, serverOpType } from '../fields/ObjectField';
import { Message, MessageStore } from '../server/Message';
import { SerializationHelper } from './util/SerializationHelper';
/**
* This class encapsulates the transfer and cross-client synchronization of
* data stored only in documents (RefFields). In the process, it also
* creates and maintains a cache of documents so that they can be accessed
* more efficiently. Currently, there is no cache eviction scheme in place.
*
* NOTE: while this class is technically abstracted to work with any [RefField], because
* [Doc] instances are the only [RefField] we need / have implemented at the moment, the documentation
* will treat all data used here as [Doc]s
*
* Any time we want to write a new field to the database (via the server)
* or update ourselves based on the server's update message, that occurs here
*/
export namespace DocServer {
let _cache: { [id: string]: Doc | Promise<Opt<Doc>> } = {};
export function Cache() {
return _cache;
}
function errorFunc(): never {
throw new Error("Can't use DocServer without calling init first");
}
let _UpdateField: (id: string, diff: serverOpType) => void = errorFunc;
let _CreateDocField: (field: Doc) => void = errorFunc;
export function AddServerHandler<T>(socket: Socket, message: Message<T>, handler: (args: T) => void) {
socket.on(message.Message, Utils.loggingCallback('Incoming', handler, message.Name));
}
export function Emit<T>(socket: Socket, message: Message<T>, args: T) {
// log('Emit', message.Name, args, false);
socket.emit(message.Message, args);
}
export function EmitCallback<T>(socket: Socket, message: Message<T>, args: T): Promise<unknown>;
export function EmitCallback<T>(socket: Socket, message: Message<T>, args: T, fn: (args: unknown) => unknown): void;
export function EmitCallback<T>(socket: Socket, message: Message<T>, args: T, fn?: (args: unknown) => unknown): void | Promise<unknown> {
// log('Emit', message.Name, args, false);
if (fn) {
socket.emit(message.Message, args, Utils.loggingCallback('Receiving', fn, message.Name));
} else {
return new Promise<unknown>(res => {
socket.emit(message.Message, args, Utils.loggingCallback('Receiving', res, message.Name));
});
}
}
let _socket: Socket;
// this client's distinct GUID created at initialization
let USER_ID: string;
// indicates whether or not a document is currently being udpated, and, if so, its id
export enum WriteMode {
Default = 0, // Anything goes
Playground = 1, // Playground (write own/no read other updates)
LiveReadonly = 2, // Live Readonly (no write/read others)
LivePlayground = 3, // Live Playground (write own/read others)
}
const fieldWriteModes: { [field: string]: WriteMode } = {};
const docsWithUpdates: { [field: string]: Set<Doc> } = {};
export const PlaygroundFields: string[] = [];
export function setLivePlaygroundFields(livePlaygroundFields: string[]) {
DocServer.PlaygroundFields.push(...livePlaygroundFields);
livePlaygroundFields.forEach(f => DocServer.setFieldWriteMode(f, DocServer.WriteMode.LivePlayground));
}
export function setPlaygroundFields(playgroundFields: string[]) {
DocServer.PlaygroundFields.push(...playgroundFields);
playgroundFields.forEach(f => DocServer.setFieldWriteMode(f, DocServer.WriteMode.Playground));
}
export function IsPlaygroundField(field: string) {
return DocServer.PlaygroundFields?.includes(field.replace(/^_/, ''));
}
export function setFieldWriteMode(field: string, writeMode: WriteMode) {
fieldWriteModes[field] = writeMode;
if (writeMode !== WriteMode.Playground) {
const docs = docsWithUpdates[field];
if (docs) {
docs.forEach(doc => Doc.RunCachedUpdate(doc, field));
delete docsWithUpdates[field];
}
}
}
export function getFieldWriteMode(field: string) {
return ClientUtils.CurrentUserEmail() === 'guest' ? WriteMode.LivePlayground : fieldWriteModes[field] || WriteMode.Default;
}
export function registerDocWithCachedUpdate(doc: Doc, field: string, oldValue: FieldType) {
let list = docsWithUpdates[field];
if (!list) {
list = docsWithUpdates[field] = new Set();
}
if (!list.has(doc)) {
Doc.AddCachedUpdate(doc, field, oldValue);
list.add(doc);
}
}
const instructions = 'This page will automatically refresh after this alert is closed. Expect to reconnect after about 30 seconds.';
function alertUser(connectionTerminationReason: string) {
switch (connectionTerminationReason) {
case 'crash':
alert(`Dash has temporarily crashed. Administrators have been notified and the server is restarting itself. ${instructions}`);
break;
case 'temporary':
alert(`An administrator has chosen to restart the server. ${instructions}`);
break;
case 'exit':
alert('An administrator has chosen to kill the server. Do not expect to reconnect until administrators start the server.');
break;
default:
console.log(`Received an unknown ConnectionTerminated message: ${connectionTerminationReason}`);
}
window.location.reload();
}
export namespace Control {
let _isReadOnly = false;
export function makeReadOnly() {
if (!_isReadOnly) {
_isReadOnly = true;
_CreateDocField = field => {
_cache[field[Id]] = field;
};
_UpdateField = emptyFunction;
// _RespondToUpdate = emptyFunction; // bcz: option: don't clear RespondToUpdate to continue to receive updates as others change the DB
}
}
export function makeEditable() {
if (Control.isReadOnly()) {
location.reload();
// _isReadOnly = false;
// _CreateField = _CreateFieldImpl;
// _UpdateField = _UpdateFieldImpl;
// _respondToUpdate = _respondToUpdateImpl;
// _cache = {};
}
}
export function isReadOnly() {
return _isReadOnly;
}
}
/**
* This function emits a message (with this client's
* unique GUID) to the server
* indicating that this client has connected
*/
function onConnection() {
_socket.emit(MessageStore.Bar.Message, USER_ID);
}
export namespace Util {
/**
* Emits a message to the server that wipes
* all documents in the database.
*/
export function deleteDatabase() {
DocServer.Emit(_socket, MessageStore.DeleteAll, {});
}
}
// RETRIEVE DOCS FROM SERVER
/**
* Given a single Doc GUID, this utility function will asynchronously attempt to fetch the id's associated
* field, first looking in the RefField cache and then communicating with
* the server if the document has not been cached.
* @param id the id of the requested document
*/
const _GetRefFieldImpl = (id: string, force: boolean = false): Promise<Opt<Doc>> => {
// an initial pass through the cache to determine whether the document needs to be fetched,
// is already in the process of being fetched or already exists in the
// cache
const cached = _cache[id];
if (cached === undefined || force) {
// NOT CACHED => we'll have to send a request to the server
// synchronously, we emit a single callback to the server requesting the serialized (i.e. represented by a string)
// field for the given ids. This returns a promise, which, when resolved, indicates the the JSON serialized version of
// the field has been returned from the server
const getSerializedField = DocServer.EmitCallback(_socket, MessageStore.GetRefField, id);
// when the serialized RefField has been received, go head and begin deserializing it into an object.
// Here, once deserialized, we also invoke .proto to 'load' the document's prototype, which ensures that all
// future .proto calls on the Doc won't have to go farther than the cache to get their actual value.
const deserializeField = getSerializedField.then(async fieldJson => {
// deserialize
const field = (await SerializationHelper.Deserialize(fieldJson)) as Doc;
if (force && field && cached instanceof Doc) {
cached[UpdatingFromServer] = true;
Array.from(Object.keys(field)).forEach(key => {
const fieldval = field[key];
if (fieldval instanceof ObjectField) {
fieldval[Parent] = undefined;
}
cached[key] = field[key];
});
cached[UpdatingFromServer] = false;
return cached;
}
if (field !== undefined) {
_cache[id] = field;
} else {
delete _cache[id];
}
return field;
// either way, overwrite or delete any promises cached at this id (that we inserted as flags
// to indicate that the field was in the process of being fetched). Now everything
// should be an actual value within or entirely absent from the cache.
});
// here, indicate that the document associated with this id is currently
// being retrieved and cached
!force && (_cache[id] = deserializeField);
return force ? (cached instanceof Promise ? cached : new Promise<Doc>(res => res(cached))) : deserializeField;
}
if (cached instanceof Promise) {
// BEING RETRIEVED AND CACHED => some other caller previously (likely recently) called GetRefField(s),
// and requested the document I'm looking for. Shouldn't fetch again, just
// return this promise which will resolve to the field itself (see 7)
return cached;
}
// CACHED => great, let's just return the cached field we have
return Promise.resolve(cached).then(
field => field
// (field instanceof Doc) && fetchProto(field);
);
};
const _GetCachedRefFieldImpl = (id: string): Opt<Doc> => {
const cached = _cache[id];
if (cached !== undefined && !(cached instanceof Promise)) {
return cached;
}
return undefined;
};
let _GetRefField: (id: string, force: boolean) => Promise<Opt<Doc>> = errorFunc;
let _GetCachedRefField: (id: string) => Opt<Doc> = errorFunc;
export function GetRefField(id: string, force = false): Promise<Opt<Doc>> {
return _GetRefField(id, force);
}
export function GetCachedRefField(id: string): Opt<Doc> {
return _GetCachedRefField(id);
}
/**
* Given a list of Doc GUIDs, this utility function will asynchronously attempt to each id's associated
* field, first looking in the RefField cache and then communicating with
* the server if the document has not been cached.
* @param ids the ids that map to the reqested documents
*/
const _GetRefFieldsImpl = async (ids: string[]): Promise<Map<string, Opt<Doc>>> => {
const uncachedRequestedIds: string[] = [];
const deserializeDocPromises: Promise<Opt<Doc>>[] = [];
// setup a Promise that we will resolve after all cached Docs have been acquired.
let allCachesFilledResolver!: (value: Opt<Doc> | PromiseLike<Opt<Doc>>) => void;
const allCachesFilledPromise = new Promise<Opt<Doc>>(res => {
allCachesFilledResolver = res;
});
const fetchDocPromises: Map<string, Promise<Opt<Doc>>> = new Map(); // { p: Promise<Doc>; id: string }[] = []; // promises to fetch the value for a requested Doc
// Determine which requested documents need to be fetched
for (const id of ids.filter(filterid => filterid)) {
if (_cache[id] === undefined) {
// EMPTY CACHE - make promise that we resolve after all batch-requested Docs have been fetched and deserialized and we know we have this Doc
// eslint-disable-next-line no-loop-func
_cache[id] = new Promise<Opt<Doc>>(res =>
allCachesFilledPromise.then(() => {
// if all Docs have been cached, then we can be sure the fetched Doc has been found and cached. So return it to anyone who had been awaiting it.
const cache = _cache[id];
if (!(cache instanceof Doc)) console.log('CACHE WAS NEVER FILLED!!');
res(cache instanceof Doc ? cache : undefined);
})
);
fetchDocPromises.set(id, _cache[id]);
uncachedRequestedIds.push(id); // add to list of Doc requests from server
}
// else CACHED => do nothing, Doc or promise of Doc is already in cache
}
if (uncachedRequestedIds.length) {
console.log('Requesting ' + uncachedRequestedIds.length);
setTimeout(action(() => { FieldLoader.ServerLoadStatus.requested = uncachedRequestedIds.length; })); // prettier-ignore
// Synchronously emit a single server request for the serialized (i.e. represented by a string) Doc ids
// This returns a promise, that resolves when all the JSON serialized Docs have been retrieved
const serializedFields = (await DocServer.EmitCallback(_socket, MessageStore.GetRefFields, uncachedRequestedIds)) as { id: string; fields: unknown[]; __type: string }[];
let processed = 0;
console.log('Retrieved ' + serializedFields.length + ' fields');
// After the serialized Docs have been received, deserialize them into objects.
for (const field of serializedFields) {
++processed % 150 === 0 &&
// eslint-disable-next-line no-await-in-loop
(await new Promise<number>(
// eslint-disable-next-line no-loop-func
res =>
setTimeout(action(() => res(FieldLoader.ServerLoadStatus.retrieved = processed))) // prettier-ignore
)); // force loading to yield to splash screen rendering to update progress
if (fetchDocPromises.has(field.id)) {
// Doc hasn't started deserializing yet - the cache still has the fetch promise
// eslint-disable-next-line no-loop-func
const deserializePromise = SerializationHelper.Deserialize(field).then((deserialized: unknown) => {
const doc = deserialized as Doc;
// overwrite any fetch or deserialize cache promise with deserialized value.
// fetch promises wait to resolve until after all deserializations; deserialize promises resolve upon deserializaton
if (deserialized !== undefined) _cache[field.id] = doc;
else delete _cache[field.id];
return doc;
});
deserializeDocPromises.push((_cache[field.id] = deserializePromise)); // replace the cache's placeholder fetch promise with the deserializePromise
fetchDocPromises.delete(field.id);
} else if (_cache[field.id] instanceof Promise) {
console.log('.');
}
}
}
await Promise.all(deserializeDocPromises); // promise resolves when cache is up-to-date with all requested Docs
Array.from(fetchDocPromises).forEach(([id]) => delete _cache[id]);
allCachesFilledResolver(undefined); // notify anyone who was promised a Doc fron when it was just being fetched (since all requested Docs have now been fetched and deserialized)
console.log('Deserialized ' + (uncachedRequestedIds.length - fetchDocPromises.size) + ' fields');
return new Map<string, Opt<Doc>>(ids.map(id => [id, _cache[id] instanceof Doc ? (_cache[id] as Doc) : undefined]) as [string, Opt<Doc>][]);
};
let _GetRefFields: (ids: string[]) => Promise<Map<string, Opt<Doc>>> = errorFunc;
export function GetRefFields(ids: string[]) {
return _GetRefFields(ids);
}
// WRITE A NEW DOCUMENT TO THE SERVER
let _cacheNeedsUpdate = false;
export function CacheNeedsUpdate() {
return _cacheNeedsUpdate;
}
/**
* A wrapper around the function local variable _CreateDocField.
* This allows us to swap in different executions while comfortably
* calling the same function throughout the code base (such as in Util.makeReadonly())
* @param field the [RefField] to be serialized and sent to the server to be stored in the database
*/
export function CreateDocField(field: Doc) {
_cacheNeedsUpdate = true;
_CreateDocField(field);
}
function _CreateDocFieldImpl(field: Doc) {
_cache[field[Id]] = field;
const initialState = SerializationHelper.Serialize(field);
ClientUtils.CurrentUserEmail() !== 'guest' && DocServer.Emit(_socket, MessageStore.CreateDocField, initialState);
}
// NOTIFY THE SERVER OF AN UPDATE TO A DOC'S STATE
/**
* A wrapper around the function local variable _emitFieldUpdate.
* This allows us to swap in different executions while comfortably
* calling the same function throughout the code base (such as in Util.makeReadonly())
* @param id the id of the [Doc] whose state has been updated in our client
* @param updatedState the new value of the document. At some point, this
* should actually be a proper diff, to improve efficiency
*/
export function UpdateField(id: string, updatedState: serverOpType) {
_UpdateField(id, updatedState);
}
function _UpdateFieldImpl(id: string, diff: serverOpType) {
!DocServer.Control.isReadOnly() && ClientUtils.CurrentUserEmail() !== 'guest' && DocServer.Emit(_socket, MessageStore.UpdateField, { id, diff });
}
function _respondToUpdateImpl(change: { id: string; diff: serverOpType }) {
const { id } = change;
// to be valid, the Diff object must reference
// a document's id
if (id === undefined) {
return;
}
const update = (f: Opt<Doc>) => {
// if the RefField is absent from the cache or
// its promise in the cache resolves to undefined, there
// can't be anything to update
if (f === undefined) {
return;
}
// extract this Doc's update handler
const handler = f[HandleUpdate];
if (handler) {
handler.call(f, change.diff as { $set: { [key: string]: FieldType } } | { $unset: unknown });
}
};
// check the cache for the field
const field = _cache[id];
if (field instanceof Promise) {
// if the field is still being retrieved, update when the promise is resolved
field.then(update);
} else {
// otherwise, just execute the update
update(field);
}
}
export function DeleteDocument(id: string) {
ClientUtils.CurrentUserEmail() !== 'guest' && DocServer.Emit(_socket, MessageStore.DeleteField, id);
}
export function DeleteDocuments(ids: string[]) {
ClientUtils.CurrentUserEmail() !== 'guest' && DocServer.Emit(_socket, MessageStore.DeleteFields, ids);
}
function _respondToDeleteImpl(ids: string | string[]) {
function deleteId(id: string) {
delete _cache[id];
}
if (typeof ids === 'string') {
deleteId(ids);
} else if (Array.isArray(ids)) {
ids.map(deleteId);
}
}
const _RespondToUpdate = _respondToUpdateImpl;
const _respondToDelete = _respondToDeleteImpl;
function respondToUpdate(change: { id: string; diff: serverOpType }) {
_RespondToUpdate(change);
}
function respondToDelete(ids: string | string[]) {
_respondToDelete(ids);
}
export function init(protocol: string, hostname: string, port: number, identifier: string) {
_cache = {};
USER_ID = identifier;
_socket = io(`${protocol.startsWith('https') ? 'wss' : 'ws'}://${hostname}:${port}`, { transports: ['websocket'], rejectUnauthorized: false });
_socket.on('connect_error', (err: unknown) => console.log(err));
// io.connect(`https://7f079dda.ngrok.io`);// if using ngrok, create a special address for the websocket
_GetCachedRefField = _GetCachedRefFieldImpl;
SetObjGetRefField((_GetRefField = _GetRefFieldImpl));
SetObjGetRefFields((_GetRefFields = _GetRefFieldsImpl));
_CreateDocField = _CreateDocFieldImpl;
_UpdateField = _UpdateFieldImpl;
/**
* Whenever the server sends us its handshake message on our
* websocket, we use the above function to return the handshake.
*/
DocServer.AddServerHandler(_socket, MessageStore.Foo, onConnection);
DocServer.AddServerHandler(_socket, MessageStore.UpdateField, respondToUpdate);
DocServer.AddServerHandler(_socket, MessageStore.DeleteField, respondToDelete);
DocServer.AddServerHandler(_socket, MessageStore.DeleteFields, respondToDelete);
DocServer.AddServerHandler(_socket, MessageStore.ConnectionTerminated, alertUser);
}
}
|