import * as mongodb from 'mongodb'; import * as mongoose from 'mongoose'; import { Opt } from '../fields/Doc'; import { emptyFunction, Utils } from '../Utils'; import { GoogleApiServerUtils } from './apis/google/GoogleApiServerUtils'; import { DocumentsCollection, IDatabase } from './IDatabase'; import { MemoryDatabase } from './MemoryDatabase'; import { Upload } from './SharedMediaTypes'; import { serializedDoctype } from '../fields/ObjectField'; export namespace Database { export let disconnect: () => void; class DocSchema implements mongodb.BSON.Document { _id!: string; id!: string; } const schema = 'Dash'; const port = 27017; export const url = `mongodb://localhost:${port}/${schema}`; enum ConnectionStates { disconnected = 0, connected = 1, connecting = 2, disconnecting = 3, uninitialized = 99, } export async function tryInitializeConnection() { try { const { connection } = mongoose; disconnect = async () => new Promise(resolve => { connection.close().then(resolve); }); if (connection.readyState === ConnectionStates.disconnected) { await new Promise((resolve, reject) => { connection.on('error', reject); connection.on('connected', () => { console.log(`mongoose established default connection at ${url}`); resolve(); }); mongoose.connect(url, { // useNewUrlParser: true, dbName: schema, // reconnectTries: Number.MAX_VALUE, // reconnectInterval: 1000, }); }); } } catch (e) { console.error(`Mongoose FAILED to establish default connection at ${url} with the following error:`); console.error(e); console.log('Since a valid database connection is required to use Dash, the server process will now exit.\nPlease try again later.'); process.exit(1); } } // eslint-disable-next-line @typescript-eslint/no-shadow export class Database implements IDatabase { private MongoClient = mongodb.MongoClient; private currentWrites: { [id: string]: Promise } = {}; private db?: mongodb.Db; private onConnect: (() => void)[] = []; async doConnect() { console.error(`\nConnecting to Mongo with URL : ${url}\n`); return new Promise(resolve => { this.MongoClient.connect(url, { connectTimeoutMS: 30000, socketTimeoutMS: 30000 }).then(client => { console.error('mongo connect response\n'); if (!client) { console.error('\nMongo connect failed with the error:\n'); process.exit(0); } this.db = client.db(); this.onConnect.forEach(fn => fn()); resolve(); }); }); } public async update(id: string, value: any, callback: (err: mongodb.MongoError, res: mongodb.UpdateResult) => void, upsert = true, collectionName = DocumentsCollection) { if (this.db) { const collection = this.db.collection(collectionName); const prom = this.currentWrites[id]; // eslint-disable-next-line prefer-const let newProm: Promise; const run = (): Promise => new Promise(resolve => { collection .updateOne({ _id: id }, value, { upsert }) .then(res => { if (this.currentWrites[id] === newProm) { delete this.currentWrites[id]; } resolve(); callback(undefined as any, res); }) .catch(error => { console.log('MOngo UPDATE ONE ERROR:', error); }); }); newProm = prom ? prom.then(run) : run(); this.currentWrites[id] = newProm; return newProm; } this.onConnect.push(() => this.update(id, value, callback, upsert, collectionName)); return undefined; } public replace(id: string, value: any, callback: (err: mongodb.MongoError, res: mongodb.UpdateResult) => void, upsert = true, collectionName = DocumentsCollection) { if (this.db) { const collection = this.db.collection(collectionName); const prom = this.currentWrites[id]; // eslint-disable-next-line prefer-const let newProm: Promise; const run = (): Promise => new Promise(resolve => { collection.replaceOne({ _id: id }, value, { upsert }).then(res => { if (this.currentWrites[id] === newProm) { delete this.currentWrites[id]; } resolve(); callback(undefined as any, res as any); }); }); newProm = prom ? prom.then(run) : run(); this.currentWrites[id] = newProm; } else { this.onConnect.push(() => this.replace(id, value, callback, upsert, collectionName)); } } public async getCollectionNames() { const cursor = this.db?.listCollections(); const collectionNames: string[] = []; if (cursor) { // eslint-disable-next-line no-await-in-loop while (await cursor.hasNext()) { // eslint-disable-next-line no-await-in-loop const collection = await cursor.next(); collection && collectionNames.push(collection.name); } } return collectionNames; } public delete(query: any, collectionName?: string): Promise; public delete(id: string, collectionName?: string): Promise; public delete(idIn: any, collectionName = DocumentsCollection) { let id = idIn; if (typeof id === 'string') { id = { _id: id }; } if (this.db) { const { db } = this; return new Promise(res => { db.collection(collectionName) .deleteMany(id) .then(result => res(result)); }); } return new Promise(res => { this.onConnect.push(() => res(this.delete(id, collectionName))); }); } public async dropSchema(...targetSchemas: string[]): Promise { const executor = async (database: mongodb.Db) => { // eslint-disable-next-line no-use-before-define const existing = await Instance.getCollectionNames(); let valid: string[]; if (targetSchemas.length) { valid = targetSchemas.filter(collection => existing.includes(collection)); } else { valid = existing; } const pending = Promise.all(valid.map(schemaName => database.dropCollection(schemaName))); return (await pending).every(dropOutcome => dropOutcome); }; if (this.db) { return executor(this.db); } this.onConnect.push(() => this.db && executor(this.db)); return undefined; } public async insert(valueIn: any, collectionName = DocumentsCollection) { const value = valueIn; if (this.db && value !== null) { if ('id' in value) { value._id = value.id; delete value.id; } const id = value._id; const collection = this.db.collection(collectionName); const prom = this.currentWrites[id]; // eslint-disable-next-line prefer-const let newProm: Promise; const run = (): Promise => new Promise(resolve => { collection .insertOne(value) .then(() => { if (this.currentWrites[id] === newProm) { delete this.currentWrites[id]; } resolve(); }) .catch(err => console.log('Mongo INSERT ERROR: ', err)); }); newProm = prom ? prom.then(run) : run(); this.currentWrites[id] = newProm; return newProm; } if (value !== null) { this.onConnect.push(() => this.insert(value, collectionName)); } return undefined; } public getDocument(id: string, fn: (result?: serializedDoctype) => void, collectionName = DocumentsCollection) { if (this.db) { const collection = this.db.collection(collectionName); collection.findOne({ _id: id }).then(resultIn => { const result = resultIn; if (result) { result.id = result._id; // delete result._id; fn(result as any); } else { fn(undefined); } }); } else { this.onConnect.push(() => this.getDocument(id, fn, collectionName)); } } public async getDocuments(ids: string[], fn: (result: serializedDoctype[]) => void, collectionName = DocumentsCollection) { if (this.db) { const found = await this.db .collection(collectionName) .find({ _id: { $in: ids } }) .toArray(); fn( found.map((docIn: any) => { const doc = docIn; doc.id = doc._id; delete doc._id; return doc; }) ); } else { this.onConnect.push(() => this.getDocuments(ids, fn, collectionName)); } } public async visit(ids: string[], fn: (result: any) => string[] | Promise, collectionName = DocumentsCollection): Promise { if (this.db) { const visited = new Set(); while (ids.length) { const count = Math.min(ids.length, 1000); const index = ids.length - count; const fetchIds = ids.splice(index, count).filter(id => !visited.has(id)); if (fetchIds.length) { // eslint-disable-next-line no-await-in-loop const docs = await new Promise<{ [key: string]: any }[]>(res => { this.getDocuments(fetchIds, res, collectionName); }); docs.forEach(async doc => { const { id } = doc; visited.add(id); ids.push(...(await fn(doc))); }); } } return undefined; } return new Promise(res => { this.onConnect.push(() => { this.visit(ids, fn, collectionName); res(); }); }); } public query(query: { [key: string]: any }, projection?: { [key: string]: 0 | 1 }, collectionName = DocumentsCollection): Promise { if (this.db) { let cursor = this.db.collection(collectionName).find(query); if (projection) { cursor = cursor.project(projection); } return Promise.resolve(cursor); } return new Promise(res => { this.onConnect.push(() => { res(this.query(query, projection, collectionName)); }); }); } public updateMany(query: any, update: any, collectionName = DocumentsCollection) { if (this.db) { const { db } = this; return new Promise(res => { db.collection(collectionName) .updateMany(query, update) .then(result => res(result)) .catch(error => console.log('Mongo INSERT MANY ERROR:', error)); }); } return new Promise(res => { this.onConnect.push(() => this.updateMany(query, update, collectionName) .then(res) .catch(error => console.log('Mongo UPDATAE MANY ERROR: ', error)) ); }); } public print() { console.log('db says hi!'); } } function getDatabase() { switch (process.env.DB) { case 'MEM': return new MemoryDatabase(); default: return new Database(); } } export const Instance = getDatabase(); /** * Provides definitions and apis for working with * portions of the database not dedicated to storing documents * or Dash-internal user data. */ export namespace Auxiliary { /** * All the auxiliary MongoDB collections (schemas) */ export enum AuxiliaryCollections { GooglePhotosUploadHistory = 'uploadedFromGooglePhotos', GoogleAccess = 'googleAuthentication', } /** * Searches for the @param query in the specified @param collection, * and returns at most the first @param cap results. If @param removeId is true, * as it is by default, each object will be stripped of its database id. */ const SanitizedCappedQuery = async (query: { [key: string]: any }, collection: string, cap: number, removeId = true) => { const cursor = await Instance.query(query, undefined, collection); const results = await cursor.toArray(); const slice = results.slice(0, Math.min(cap, results.length)); return removeId ? slice.map((result: any) => { delete result._id; return result; }) : slice; }; /** * Searches for the @param query in the specified @param collection, * and returns at most the first result. If @param removeId is true, * as it is by default, each object will be stripped of its database id. * Worth the special case since it converts the Array return type to a single * object of the specified type. */ const SanitizedSingletonQuery = async (query: { [key: string]: any }, collection: string, removeId = true): Promise> => { const results = await SanitizedCappedQuery(query, collection, 1, removeId); return results.length ? results[0] : undefined; }; /** * Checks to see if an image with the given @param contentSize * already exists in the aux database, i.e. has already been downloaded from Google Photos. */ export const QueryUploadHistory = async (contentSize: number) => SanitizedSingletonQuery({ contentSize }, AuxiliaryCollections.GooglePhotosUploadHistory); /** * Records the uploading of the image with the given @param information, * using the given content size as a seed for the database id. */ export const LogUpload = async (information: Upload.ImageInformation) => { const bundle = { _id: Utils.GenerateDeterministicGuid(String(information.contentSize)), ...information, }; return Instance.insert(bundle, AuxiliaryCollections.GooglePhotosUploadHistory); }; /** * Manages the storage, retrieval and updating of the access token that * facilitates interactions with all their APIs for a given account. */ export namespace GoogleAccessToken { /** * Format stored in database. */ type StoredCredentials = GoogleApiServerUtils.EnrichedCredentials & { _id: string }; /** * Retrieves the credentials associaed with @param userId * and optionally removes their database id according to @param removeId. */ export const Fetch = async (userId: string, removeId = true): Promise> => SanitizedSingletonQuery({ userId }, AuxiliaryCollections.GoogleAccess, removeId); /** * Writes the @param enrichedCredentials to the database, associated * with @param userId for later retrieval and updating. */ export const Write = async (userId: string, enrichedCredentials: GoogleApiServerUtils.EnrichedCredentials) => Instance.insert({ userId, canAccess: [], ...enrichedCredentials }, AuxiliaryCollections.GoogleAccess); /** * Updates the @param accessToken and @param expiryDate fields * in the stored credentials associated with @param userId. */ export const Update = async (userId: string, accessToken: string, expiryDate: number) => { const entry = await Fetch(userId, false); if (entry) { const parameters = { $set: { access_token: accessToken, expiry_date: expiryDate } }; return Instance.update(entry._id, parameters, emptyFunction, true, AuxiliaryCollections.GoogleAccess); } return undefined; }; /** * Revokes the credentials associated with @param userId. */ export const Revoke = async (userId: string) => { const entry = await Fetch(userId, false); if (entry) { Instance.delete({ _id: entry._id }, AuxiliaryCollections.GoogleAccess); } }; } } }