import { blue } from 'colors'; import { createServer } from 'https'; import * as _ from 'lodash'; import { networkInterfaces } from 'os'; import { Server, Socket } from 'socket.io'; import { SecureContextOptions } from 'tls'; import { ServerUtils } from '../ServerUtils'; import { serializedDoctype, serializedFieldsType } from '../fields/ObjectField'; import { logPort } from './ActionUtilities'; import { Client } from './Client'; import { DashStats } from './DashStats'; import { DocumentsCollection } from './IDatabase'; import { Diff, GestureContent, MessageStore } from './Message'; import { resolvedPorts, socketMap, timeMap, userOperations } from './SocketData'; import { initializeGuest } from './authentication/DashUserModel'; import { Database } from './database'; export namespace WebSocket { let CurUser: string | undefined; export let _socket: Socket; export let _disconnect: () => void; export const clients: { [key: string]: Client } = {}; function processGesturePoints(socket: Socket, content: GestureContent) { socket.broadcast.emit('receiveGesturePoints', content); } export async function doDelete(onlyFields = true) { const target: string[] = []; onlyFields && target.push(DocumentsCollection); await Database.Instance.dropSchema(...target); initializeGuest(); } function printActiveUsers() { socketMap.forEach((user, socket) => !socket.disconnected && console.log(user)); } function barReceived(socket: Socket, userEmail: string) { clients[userEmail] = new Client(userEmail.toString()); const currentdate = new Date(); const datetime = currentdate.getDate() + '/' + (currentdate.getMonth() + 1) + '/' + currentdate.getFullYear() + ' @ ' + currentdate.getHours() + ':' + currentdate.getMinutes() + ':' + currentdate.getSeconds(); console.log(blue(`user ${userEmail} has connected to the web socket at: ${datetime}`)); printActiveUsers(); timeMap[userEmail] = Date.now(); socketMap.set(socket, userEmail + ' at ' + datetime); userOperations.set(userEmail, 0); DashStats.logUserLogin(userEmail); } function GetRefFieldLocal(id: string, callback: (result?: serializedDoctype | undefined) => void) { return Database.Instance.getDocument(id, callback); } function GetRefField([id, callback]: [string, (result?: serializedDoctype) => void]) { process.stdout.write(`+`); GetRefFieldLocal(id, callback); } function GetRefFields([ids, callback]: [string[], (result?: serializedDoctype[]) => void]) { process.stdout.write(`${ids.length}…`); Database.Instance.getDocuments(ids, callback); } const pendingOps = new Map(); function dispatchNextOp(id: string): unknown { const next = pendingOps.get(id)?.shift(); // eslint-disable-next-line @typescript-eslint/no-unused-vars const nextOp = (res: boolean) => dispatchNextOp(id); if (next) { const { diff, socket } = next; // ideally, we'd call the Database update method for all actions, but for now we handle list insertion/removal on our own switch (diff.diff.$addToSet ? 'add' : diff.diff.$remFromSet ? 'rem' : 'set') { case 'add': return GetRefFieldLocal(id, (result) => addToListField(socket, diff, result, nextOp)); // prettier-ignore case 'rem': return GetRefFieldLocal(id, (result) => remFromListField(socket, diff, result, nextOp)); // prettier-ignore default: return Database.Instance.update(id, diff.diff, () => nextOp(socket.broadcast.emit(MessageStore.UpdateField.Message, diff)), false ); // prettier-ignore } } return !pendingOps.get(id)?.length && pendingOps.delete(id); } function addToListField(socket: Socket, diff: Diff, listDoc: serializedDoctype | undefined, cb: (res: boolean) => void): void { const $addToSet = diff.diff.$addToSet as serializedFieldsType; const updatefield = Array.from(Object.keys($addToSet ?? {}))[0]; const newListItems = $addToSet?.[updatefield]?.fields; if (newListItems) { const length = diff.diff.$addToSet?.length; diff.diff.$set = $addToSet; // convert add to set to a query of the current fields, and then a set of the composition of the new fields with the old ones delete diff.diff.$addToSet; // can't pass $set to Mongo, or it will do that insetead of $addToSet const listItems = listDoc?.fields?.[updatefield.replace('fields.', '')]?.fields.filter(item => item) ?? []; diff.diff.$set[updatefield]!.fields = [...listItems, ...newListItems]; // , ...newListItems.filter((newItem: any) => newItem === null || !curList.some((curItem: any) => curItem.fieldId ? curItem.fieldId === newItem.fieldId : curItem.heading ? curItem.heading === newItem.heading : curItem === newItem))]; // if the client's list length is not the same as what we're writing to the server, // then we need to send the server's version back to the client so that they are in synch. // this could happen if another client made a change before the server receives the update from the first client const target = length !== diff.diff.$set[updatefield].fields.length ? socket : socket.broadcast; target === socket && console.log('Warning: SEND BACK: list modified during add update. Composite list is being returned.'); Database.Instance.update(diff.id, diff.diff, () => cb(target.emit(MessageStore.UpdateField.Message, diff)), false); } else cb(false); } /** * findClosestIndex() is a helper function that will try to find * the closest index of a list that has the same value as * a specified argument/index pair. * @param list the list to search through * @param indexesToDelete a list of indexes that are already marked for deletion * so they will be ignored * @param value the value of the item to remove * @param hintIndex the index that the element was at on the client's copy of * the data * @returns the closest index with the same value or -1 if the element was not found. */ function findClosestIndex(list: { fieldId: string; __type: string }[], indexesToDelete: number[], value: { fieldId: string; __type: string }, hintIndex: number) { let closestIndex = -1; for (let i = 0; i < list.length; i++) { if (list[i] === value && !indexesToDelete.includes(i)) { if (Math.abs(i - hintIndex) < Math.abs(closestIndex - hintIndex)) { closestIndex = i; } } } return closestIndex; } /** * remFromListField() receives the items to remove and a hint * from the client, and attempts to make the modification to the * server's copy of the data. If server's copy does not match * the client's after removal, the server will SEND BACk * its version to the client. * @param socket the socket that the client is connected on * @param diff an object containing the items to remove and a hint * (the hint contains start index and deleteCount, the number of * items to delete) * @param curListItems the server's current copy of the data */ function remFromListField(socket: Socket, diff: Diff, curListItems: serializedDoctype | undefined, cb: (res: boolean) => void): void { const $remFromSet = diff.diff.$remFromSet as serializedFieldsType; const updatefield = Array.from(Object.keys($remFromSet ?? {}))[0]; const remListItems = $remFromSet?.[updatefield]?.fields; if (remListItems) { const hint = diff.diff.$remFromSet?.hint; const length = diff.diff.$remFromSet?.length; diff.diff.$set = $remFromSet; // convert rem from set to a query of the current fields, and then a set of the old fields minus the removed ones delete diff.diff.$remFromSet; // can't pass $set to Mongo, or it will do that insetead of $remFromSet const curList = curListItems?.fields?.[updatefield.replace('fields.', '')]?.fields.filter(f => f) ?? []; if (hint) { // indexesToRemove stores the indexes that we mark for deletion, which is later used to filter the list (delete the elements) const indexesToRemove: number[] = []; for (let i = 0; i < hint.deleteCount; i++) { if (curList.length > i + hint.start && _.isEqual(curList[i + hint.start], remListItems[i])) { indexesToRemove.push(i + hint.start); } else { const closestIndex = findClosestIndex(curList, indexesToRemove, remListItems[i], i + hint.start); if (closestIndex !== -1) { indexesToRemove.push(closestIndex); } else { console.log('Item to delete was not found'); } } } diff.diff.$set[updatefield]!.fields = curList.filter((curItem, index) => !indexesToRemove.includes(index)); } else { // if we didn't get a hint, remove all matching items from the list diff.diff.$set[updatefield]!.fields = curList?.filter(curItem => !remListItems.some(remItem => (remItem.fieldId ? remItem.fieldId === curItem.fieldId : remItem.heading ? remItem.heading === curItem.heading : remItem === curItem))); } // if the client's list length is not the same as what we're writing to the server, // then we need to send the server's version back to the client so that they are in synch. // this could happen if another client made a change before the server receives the update from the first client const target = length !== diff.diff.$set[updatefield].fields.length ? socket : socket.broadcast; target === socket && console.log('Warning: SEND BACK: list modified during remove update. Composite list is being returned.'); Database.Instance.update(diff.id, diff.diff, () => cb(target.emit(MessageStore.UpdateField.Message, diff)), false); } else cb(false); } function UpdateField(socket: Socket, diff: Diff) { const curUser = socketMap.get(socket); if (curUser) { const currentUsername = curUser.split(' ')[0]; userOperations.set(currentUsername, userOperations.get(currentUsername) !== undefined ? userOperations.get(currentUsername)! + 1 : 0); if (CurUser !== socketMap.get(socket)) { CurUser = socketMap.get(socket); console.log('Switch User: ' + CurUser); } if (pendingOps.has(diff.id)) { pendingOps.get(diff.id)!.push({ diff, socket }); return true; } pendingOps.set(diff.id, [{ diff, socket }]); return dispatchNextOp(diff.id); } return false; } function DeleteField(socket: Socket, id: string) { Database.Instance.delete({ _id: id }).then(() => socket.broadcast.emit(MessageStore.DeleteField.Message, id)); } function DeleteFields(socket: Socket, ids: string[]) { Database.Instance.delete({ _id: { $in: ids } }).then(() => socket.broadcast.emit(MessageStore.DeleteFields.Message, ids)); } function CreateDocField(newValue: serializedDoctype) { Database.Instance.insert(newValue); } export async function initialize(isRelease: boolean, credentials: SecureContextOptions) { let io: Server; if (isRelease) { const { socketPort } = process.env; if (socketPort) { resolvedPorts.socket = Number(socketPort); } const httpsServer = createServer(credentials); io = new Server(httpsServer, {}); httpsServer.listen(resolvedPorts.socket); } else { io = new Server(); io.listen(resolvedPorts.socket); } logPort('websocket', resolvedPorts.socket); io.on('connection', socket => { _socket = socket; socket.use((_packet, next) => { const userEmail = socketMap.get(socket); if (userEmail) { timeMap[userEmail] = Date.now(); } next(); }); socket.emit(MessageStore.UpdateStats.Message, DashStats.getUpdatedStatsBundle()); socket.on('message', (message, room) => { console.log('Client said: ', message); socket.in(room).emit('message', message); }); socket.on('ipaddr', () => networkInterfaces().keys?.forEach(dev => { if (dev.family === 'IPv4' && dev.address !== '127.0.0.1') { socket.emit('ipaddr', dev.address); } }) ); socket.on('bye', () => console.log('received bye')); socket.on('disconnect', () => { const currentUser = socketMap.get(socket); if (currentUser !== undefined) { const currentUsername = currentUser.split(' ')[0]; DashStats.logUserLogout(currentUsername); delete timeMap[currentUsername]; } }); ServerUtils.Emit(socket, MessageStore.Foo, 'handshooken'); ServerUtils.AddServerHandler(socket, MessageStore.Bar, guid => barReceived(socket, guid)); if (isRelease) { ServerUtils.AddServerHandler(socket, MessageStore.DeleteAll, () => doDelete(false)); } ServerUtils.AddServerHandler(socket, MessageStore.CreateDocField, CreateDocField); ServerUtils.AddServerHandler(socket, MessageStore.UpdateField, diff => UpdateField(socket, diff)); ServerUtils.AddServerHandler(socket, MessageStore.DeleteField, id => DeleteField(socket, id)); ServerUtils.AddServerHandler(socket, MessageStore.DeleteFields, ids => DeleteFields(socket, ids)); ServerUtils.AddServerHandler(socket, MessageStore.GesturePoints, content => processGesturePoints(socket, content)); ServerUtils.AddServerHandlerCallback(socket, MessageStore.GetRefField, GetRefField); ServerUtils.AddServerHandlerCallback(socket, MessageStore.GetRefFields, GetRefFields); /** * Whenever we receive the go-ahead, invoke the import script and pass in * as an emitter and a terminator the functions that simply broadcast a result * or indicate termination to the client via the web socket */ _disconnect = () => { socket.broadcast.emit('connection_terminated', Date.now()); socket.disconnect(true); }; }); setInterval(() => { // Utils.Emit(socket, MessageStore.UpdateStats, DashStats.getUpdatedStatsBundle()); io.emit(MessageStore.UpdateStats.Message, DashStats.getUpdatedStatsBundle()); }, DashStats.SAMPLING_INTERVAL); } }