import { blue } from 'colors'; import * as express from 'express'; import { createServer } from 'https'; import { Server, Socket } from '../../node_modules/socket.io/dist/index'; import { networkInterfaces } from 'os'; import { Utils } from '../Utils'; import { logPort } from './ActionUtilities'; import { timeMap } from './ApiManagers/UserManager'; import { GoogleCredentialsLoader, SSL } from './apis/google/CredentialsLoader'; import YoutubeApi from './apis/youtube/youtubeApiSample'; import { initializeGuest } from './authentication/DashUserModel'; import { Client } from './Client'; import { DashStats } from './DashStats'; import { Database } from './database'; import { DocumentsCollection } from './IDatabase'; import { Diff, GestureContent, MessageStore, MobileDocumentUploadContent, MobileInkOverlayContent, Transferable, Types, UpdateMobileInkOverlayPositionContent, YoutubeQueryInput, YoutubeQueryTypes } from './Message'; import { Search } from './Search'; import { resolvedPorts } from './server_Initialization'; import * as _ from 'lodash'; export namespace WebSocket { export let _socket: Socket; export const clients: { [key: string]: Client } = {}; export const socketMap = new Map(); export const userOperations = new Map(); export let disconnect: Function; export async function initialize(isRelease: boolean, app: express.Express) { let io: Server; if (isRelease) { const { socketPort } = process.env; if (socketPort) { resolvedPorts.socket = Number(socketPort); } io = new Server(createServer(SSL.Credentials, app), SSL.Credentials as any); io.listen(resolvedPorts.socket); } else { io = new Server(); io.listen(resolvedPorts.socket); } logPort('websocket', resolvedPorts.socket); io.on('connection', socket => { _socket = socket; socket.use((_packet, next) => { const userEmail = socketMap.get(socket); if (userEmail) { timeMap[userEmail] = Date.now(); } next(); }); socket.emit(MessageStore.UpdateStats.Message, DashStats.getUpdatedStatsBundle()); // convenience function to log server messages on the client function log(message?: any, ...optionalParams: any[]) { socket.emit('log', ['Message from server:', message, ...optionalParams]); } socket.on('message', function (message, room) { console.log('Client said: ', message); socket.in(room).emit('message', message); }); socket.on('create or join', function (room) { console.log('Received request to create or join room ' + room); const clientsInRoom = socket.rooms.has(room); const numClients = clientsInRoom ? Object.keys(room.sockets).length : 0; console.log('Room ' + room + ' now has ' + numClients + ' client(s)'); if (numClients === 0) { socket.join(room); console.log('Client ID ' + socket.id + ' created room ' + room); socket.emit('created', room, socket.id); } else if (numClients === 1) { console.log('Client ID ' + socket.id + ' joined room ' + room); socket.in(room).emit('join', room); socket.join(room); socket.emit('joined', room, socket.id); socket.in(room).emit('ready'); } else { // max two clients socket.emit('full', room); } }); socket.on('ipaddr', function () { const ifaces = networkInterfaces(); for (const dev in ifaces) { ifaces[dev]?.forEach(function (details) { if (details.family === 'IPv4' && details.address !== '127.0.0.1') { socket.emit('ipaddr', details.address); } }); } }); socket.on('bye', function () { console.log('received bye'); }); socket.on('disconnect', function () { let currentUser = socketMap.get(socket); if (!(currentUser === undefined)) { let currentUsername = currentUser.split(' ')[0]; DashStats.logUserLogout(currentUsername, socket); delete timeMap[currentUsername]; } }); Utils.Emit(socket, MessageStore.Foo, 'handshooken'); Utils.AddServerHandler(socket, MessageStore.Bar, guid => barReceived(socket, guid)); Utils.AddServerHandler(socket, MessageStore.SetField, args => setField(socket, args)); Utils.AddServerHandlerCallback(socket, MessageStore.GetField, getField); Utils.AddServerHandlerCallback(socket, MessageStore.GetFields, getFields); if (isRelease) { Utils.AddServerHandler(socket, MessageStore.DeleteAll, () => doDelete(false)); } Utils.AddServerHandler(socket, MessageStore.CreateField, CreateField); Utils.AddServerHandlerCallback(socket, MessageStore.YoutubeApiQuery, HandleYoutubeQuery); Utils.AddServerHandler(socket, MessageStore.UpdateField, diff => UpdateField(socket, diff)); Utils.AddServerHandler(socket, MessageStore.DeleteField, id => DeleteField(socket, id)); Utils.AddServerHandler(socket, MessageStore.DeleteFields, ids => DeleteFields(socket, ids)); Utils.AddServerHandler(socket, MessageStore.GesturePoints, content => processGesturePoints(socket, content)); Utils.AddServerHandler(socket, MessageStore.MobileInkOverlayTrigger, content => processOverlayTrigger(socket, content)); Utils.AddServerHandler(socket, MessageStore.UpdateMobileInkOverlayPosition, content => processUpdateOverlayPosition(socket, content)); Utils.AddServerHandler(socket, MessageStore.MobileDocumentUpload, content => processMobileDocumentUpload(socket, content)); Utils.AddServerHandlerCallback(socket, MessageStore.GetRefField, GetRefField); Utils.AddServerHandlerCallback(socket, MessageStore.GetRefFields, GetRefFields); /** * Whenever we receive the go-ahead, invoke the import script and pass in * as an emitter and a terminator the functions that simply broadcast a result * or indicate termination to the client via the web socket */ disconnect = () => { socket.broadcast.emit('connection_terminated', Date.now()); socket.disconnect(true); }; }); setInterval(function () { // Utils.Emit(socket, MessageStore.UpdateStats, DashStats.getUpdatedStatsBundle()); io.emit(MessageStore.UpdateStats.Message, DashStats.getUpdatedStatsBundle()); }, DashStats.SAMPLING_INTERVAL); } function processGesturePoints(socket: Socket, content: GestureContent) { socket.broadcast.emit('receiveGesturePoints', content); } function processOverlayTrigger(socket: Socket, content: MobileInkOverlayContent) { socket.broadcast.emit('receiveOverlayTrigger', content); } function processUpdateOverlayPosition(socket: Socket, content: UpdateMobileInkOverlayPositionContent) { socket.broadcast.emit('receiveUpdateOverlayPosition', content); } function processMobileDocumentUpload(socket: Socket, content: MobileDocumentUploadContent) { socket.broadcast.emit('receiveMobileDocumentUpload', content); } function HandleYoutubeQuery([query, callback]: [YoutubeQueryInput, (result?: any[]) => void]) { const { ProjectCredentials } = GoogleCredentialsLoader; switch (query.type) { case YoutubeQueryTypes.Channels: YoutubeApi.authorizedGetChannel(ProjectCredentials); break; case YoutubeQueryTypes.SearchVideo: YoutubeApi.authorizedGetVideos(ProjectCredentials, query.userInput, callback); case YoutubeQueryTypes.VideoDetails: YoutubeApi.authorizedGetVideoDetails(ProjectCredentials, query.videoIds, callback); } } export async function doDelete(onlyFields = true) { const target: string[] = []; onlyFields && target.push(DocumentsCollection); await Database.Instance.dropSchema(...target); if (process.env.DISABLE_SEARCH !== 'true') { await Search.clear(); } initializeGuest(); } function barReceived(socket: Socket, userEmail: string) { clients[userEmail] = new Client(userEmail.toString()); const currentdate = new Date(); const datetime = currentdate.getDate() + '/' + (currentdate.getMonth() + 1) + '/' + currentdate.getFullYear() + ' @ ' + currentdate.getHours() + ':' + currentdate.getMinutes() + ':' + currentdate.getSeconds(); console.log(blue(`user ${userEmail} has connected to the web socket at: ${datetime}`)); printActiveUsers(); timeMap[userEmail] = Date.now(); socketMap.set(socket, userEmail + ' at ' + datetime); userOperations.set(userEmail, 0); DashStats.logUserLogin(userEmail, socket); } function getField([id, callback]: [string, (result?: Transferable) => void]) { Database.Instance.getDocument(id, (result?: Transferable) => callback(result ? result : undefined)); } function getFields([ids, callback]: [string[], (result: Transferable[]) => void]) { Database.Instance.getDocuments(ids, callback); } function setField(socket: Socket, newValue: Transferable) { Database.Instance.update(newValue.id, newValue, () => socket.broadcast.emit(MessageStore.SetField.Message, newValue)); // broadcast set value to all other clients if (newValue.type === Types.Text) { // if the newValue has sring type, then it's suitable for searching -- pass it to SOLR Search.updateDocument({ id: newValue.id, data: { set: (newValue as any).data } }); } } function GetRefFieldLocal([id, callback]: [string, (result?: Transferable) => void]) { return Database.Instance.getDocument(id, callback); } function GetRefField([id, callback]: [string, (result?: Transferable) => void]) { process.stdout.write(`+`); GetRefFieldLocal([id, callback]); } function GetRefFields([ids, callback]: [string[], (result?: Transferable[]) => void]) { process.stdout.write(`${ids.length}…`); Database.Instance.getDocuments(ids, callback); } const suffixMap: { [type: string]: string | [string, string | ((json: any) => any)] } = { number: '_n', string: '_t', boolean: '_b', image: ['_t', 'url'], video: ['_t', 'url'], pdf: ['_t', 'url'], audio: ['_t', 'url'], web: ['_t', 'url'], map: ['_t', 'url'], script: ['_t', value => value.script.originalScript], RichTextField: ['_t', value => value.Text], date: ['_d', value => new Date(value.date).toISOString()], proxy: ['_i', 'fieldId'], list: [ '_l', list => { const results = []; for (const value of list.fields) { const term = ToSearchTerm(value); if (term) { results.push(term.value); } } return results.length ? results : null; }, ], }; function ToSearchTerm(val: any): { suffix: string; value: any } | undefined { if (val === null || val === undefined) { return; } const type = val.__type || typeof val; let suffix = suffixMap[type]; if (!suffix) { return; } if (Array.isArray(suffix)) { const accessor = suffix[1]; if (typeof accessor === 'function') { val = accessor(val); } else { val = val[accessor]; } suffix = suffix[0]; } return { suffix, value: val }; } function getSuffix(value: string | [string, any]): string { return typeof value === 'string' ? value : value[0]; } function addToListField(socket: Socket, diff: Diff, curListItems?: Transferable): void { diff.diff.$set = diff.diff.$addToSet; delete diff.diff.$addToSet; // convert add to set to a query of the current fields, and then a set of the composition of the new fields with the old ones const updatefield = Array.from(Object.keys(diff.diff.$set))[0]; const newListItems = diff.diff.$set[updatefield]?.fields; if (!newListItems) { console.log('Error: addToListField - no new list items'); return; } const curList = (curListItems as any)?.fields?.[updatefield.replace('fields.', '')]?.fields.filter((item: any) => item !== undefined) || []; diff.diff.$set[updatefield].fields = [...curList, ...newListItems]; //, ...newListItems.filter((newItem: any) => newItem === null || !curList.some((curItem: any) => curItem.fieldId ? curItem.fieldId === newItem.fieldId : curItem.heading ? curItem.heading === newItem.heading : curItem === newItem))]; const sendBack = diff.diff.length !== diff.diff.$set[updatefield].fields.length; delete diff.diff.length; Database.Instance.update( diff.id, diff.diff, () => { if (sendBack) { console.log('Warning: list modified during update. Composite list is being returned.'); const id = socket.id; (socket as any).id = ''; socket.broadcast.emit(MessageStore.UpdateField.Message, diff); (socket as any).id = id; } else socket.broadcast.emit(MessageStore.UpdateField.Message, diff); dispatchNextOp(diff.id); }, false ); } /** * findClosestIndex() is a helper function that will try to find * the closest index of a list that has the same value as * a specified argument/index pair. * @param list the list to search through * @param indexesToDelete a list of indexes that are already marked for deletion * so they will be ignored * @param value the value of the item to remove * @param hintIndex the index that the element was at on the client's copy of * the data * @returns the closest index with the same value or -1 if the element was not found. */ function findClosestIndex(list: any, indexesToDelete: number[], value: any, hintIndex: number) { let closestIndex = -1; for (let i = 0; i < list.length; i++) { if (list[i] === value && !indexesToDelete.includes(i)) { if (Math.abs(i - hintIndex) < Math.abs(closestIndex - hintIndex)) { closestIndex = i; } } } return closestIndex; } /** * remFromListField() receives the items to remove and a hint * from the client, and attempts to make the modification to the * server's copy of the data. If server's copy does not match * the client's after removal, the server will SEND BACk * its version to the client. * @param socket the socket that the client is connected on * @param diff an object containing the items to remove and a hint * (the hint contains start index and deleteCount, the number of * items to delete) * @param curListItems the server's current copy of the data */ function remFromListField(socket: Socket, diff: Diff, curListItems?: Transferable): void { diff.diff.$set = diff.diff.$remFromSet; delete diff.diff.$remFromSet; const updatefield = Array.from(Object.keys(diff.diff.$set))[0]; const remListItems = diff.diff.$set[updatefield].fields; const curList = (curListItems as any)?.fields?.[updatefield.replace('fields.', '')]?.fields.filter((f: any) => f !== null) || []; const hint = diff.diff.$set.hint; if (hint) { // indexesToRemove stores the indexes that we mark for deletion, which is later used to filter the list (delete the elements) let indexesToRemove: number[] = []; for (let i = 0; i < hint.deleteCount; i++) { if (curList.length > i + hint.start && _.isEqual(curList[i + hint.start], remListItems[i])) { indexesToRemove.push(i + hint.start); continue; } let closestIndex = findClosestIndex(curList, indexesToRemove, remListItems[i], i + hint.start); if (closestIndex !== -1) { indexesToRemove.push(closestIndex); } else { console.log('Item to delete was not found - index = -1'); } } diff.diff.$set[updatefield].fields = curList?.filter((curItem: any, index: number) => !indexesToRemove.includes(index)); } else { // go back to the original way to delete if we didn't receive // a hint from the client diff.diff.$set[updatefield].fields = curList?.filter( (curItem: any) => !remListItems.some((remItem: any) => (remItem.fieldId ? remItem.fieldId === curItem.fieldId : remItem.heading ? remItem.heading === curItem.heading : remItem === curItem)) ); } // if the client and server have different versions of the data after // deletion, they will have different lengths and the server will // send its version of the data to the client const sendBack = diff.diff.length !== diff.diff.$set[updatefield].fields.length; delete diff.diff.length; Database.Instance.update( diff.id, diff.diff, () => { if (sendBack) { // the two copies are different, so the server sends its copy. console.log('SEND BACK'); const id = socket.id; (socket as any).id = ''; socket.broadcast.emit(MessageStore.UpdateField.Message, diff); (socket as any).id = id; } else socket.broadcast.emit(MessageStore.UpdateField.Message, diff); dispatchNextOp(diff.id); }, false ); } const pendingOps = new Map(); function dispatchNextOp(id: string) { const next = pendingOps.get(id)!.shift(); if (next) { const { diff, socket } = next; if (diff.diff.$addToSet) { return GetRefFieldLocal([diff.id, (result?: Transferable) => addToListField(socket, diff, result)]); // would prefer to have Mongo handle list additions direclty, but for now handle it on our own } if (diff.diff.$remFromSet) { return GetRefFieldLocal([diff.id, (result?: Transferable) => remFromListField(socket, diff, result)]); // would prefer to have Mongo handle list additions direclty, but for now handle it on our own } return GetRefFieldLocal([diff.id, (result?: Transferable) => SetField(socket, diff, result)]); } if (!pendingOps.get(id)!.length) pendingOps.delete(id); } function printActiveUsers() { socketMap.forEach((user, socket) => { !socket.disconnected && console.log(user); }); } var CurUser: string | undefined = undefined; function UpdateField(socket: Socket, diff: Diff) { const curUser = socketMap.get(socket); if (!curUser) return; let currentUsername = curUser.split(' ')[0]; userOperations.set(currentUsername, userOperations.get(currentUsername) !== undefined ? userOperations.get(currentUsername)! + 1 : 0); if (CurUser !== socketMap.get(socket)) { CurUser = socketMap.get(socket); console.log('Switch User: ' + CurUser); } if (pendingOps.has(diff.id)) { pendingOps.get(diff.id)!.push({ diff, socket }); return true; } pendingOps.set(diff.id, [{ diff, socket }]); if (diff.diff.$addToSet) { return GetRefFieldLocal([diff.id, (result?: Transferable) => addToListField(socket, diff, result)]); // would prefer to have Mongo handle list additions direclty, but for now handle it on our own } if (diff.diff.$remFromSet) { return GetRefFieldLocal([diff.id, (result?: Transferable) => remFromListField(socket, diff, result)]); // would prefer to have Mongo handle list additions direclty, but for now handle it on our own } return GetRefFieldLocal([diff.id, (result?: Transferable) => SetField(socket, diff, result)]); } function SetField(socket: Socket, diff: Diff, curListItems?: Transferable) { Database.Instance.update(diff.id, diff.diff, () => socket.broadcast.emit(MessageStore.UpdateField.Message, diff), false); const docfield = diff.diff.$set || diff.diff.$unset; if (docfield) { const update: any = { id: diff.id }; let dynfield = false; for (let key in docfield) { if (!key.startsWith('fields.')) continue; dynfield = true; const val = docfield[key]; key = key.substring(7); Object.values(suffixMap).forEach(suf => { update[key + getSuffix(suf)] = { set: null }; }); const term = ToSearchTerm(val); if (term !== undefined) { const { suffix, value } = term; update[key + suffix] = { set: value }; if (key.endsWith('modificationDate')) { update['modificationDate' + suffix] = value; } } } if (dynfield) { Search.updateDocument(update); } } dispatchNextOp(diff.id); } function DeleteField(socket: Socket, id: string) { Database.Instance.delete({ _id: id }).then(() => { socket.broadcast.emit(MessageStore.DeleteField.Message, id); }); Search.deleteDocuments([id]); } function DeleteFields(socket: Socket, ids: string[]) { Database.Instance.delete({ _id: { $in: ids } }).then(() => { socket.broadcast.emit(MessageStore.DeleteFields.Message, ids); }); Search.deleteDocuments(ids); } function CreateField(newValue: any) { Database.Instance.insert(newValue); } }