diff options
| author | A.J. Shulman <Shulman.aj@gmail.com> | 2024-12-18 11:46:14 -0500 |
|---|---|---|
| committer | A.J. Shulman <Shulman.aj@gmail.com> | 2024-12-18 11:46:14 -0500 |
| commit | ad1e0cf62187e0f8bbb19b4720b7681585361de9 (patch) | |
| tree | 673dd63ddc1808e6e89dab5021c2136cbbe843c8 /src/client/views/nodes/chatbot/vectorstore | |
| parent | 9e447814b551c352709296ae562f1f50480320f5 (diff) | |
better
Diffstat (limited to 'src/client/views/nodes/chatbot/vectorstore')
| -rw-r--r-- | src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts | 247 |
1 files changed, 155 insertions, 92 deletions
diff --git a/src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts b/src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts index f96f55997..af27ebe80 100644 --- a/src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts +++ b/src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts @@ -10,9 +10,11 @@ import { CohereClient } from 'cohere-ai'; import { EmbedResponse } from 'cohere-ai/api'; import dotenv from 'dotenv'; import { Doc } from '../../../../../fields/Doc'; -import { CsvCast, PDFCast, StrCast } from '../../../../../fields/Types'; +import { AudioCast, Cast, CsvCast, DocCast, PDFCast, StrCast, VideoCast } from '../../../../../fields/Types'; import { Networking } from '../../../../Network'; import { AI_Document, CHUNK_TYPE, RAGChunk } from '../types/types'; +import path from 'path'; +import { v4 as uuidv4 } from 'uuid'; dotenv.config(); @@ -77,109 +79,137 @@ export class Vectorstore { } /** - * Adds an AI document to the vectorstore. This method handles document chunking, uploading to the - * vectorstore, and updating the progress for long-running tasks like file uploads. - * @param doc The document to be added to the vectorstore. - * @param progressCallback Callback to update the progress of the upload. + * Adds an AI document to the vectorstore, handling media files separately. + * Preserves all existing document processing logic. + * @param doc The document to add. + * @param progressCallback Callback to track progress. */ async addAIDoc(doc: Doc, progressCallback: (progress: number, step: string) => void) { - console.log('Adding AI Document:', doc); - const ai_document_status: string = StrCast(doc.ai_document_status); - - // Skip if the document is already in progress or completed. - if (ai_document_status !== undefined && ai_document_status.trim() !== '' && ai_document_status !== '{}') { - if (ai_document_status === 'IN PROGRESS') { - console.log('Already in progress.'); - return; - } - if (!this._doc_ids.includes(StrCast(doc.ai_doc_id))) { - this._doc_ids.push(StrCast(doc.ai_doc_id)); + const local_file_path: string = CsvCast(doc.data)?.url?.pathname ?? PDFCast(doc.data)?.url?.pathname ?? VideoCast(doc.data)?.url?.pathname ?? AudioCast(doc.data)?.url?.pathname; + + if (!local_file_path) { + throw new Error('Invalid file path.'); + } + + const isAudioOrVideo = local_file_path.endsWith('.mp3') || local_file_path.endsWith('.mp4'); + let result: AI_Document & { doc_id: string }; + + if (isAudioOrVideo) { + console.log('Processing media file...'); + const response = await Networking.PostToServer('/processMediaFile', { fileName: path.basename(local_file_path) }); + const segmentedTranscript = response; + + // Generate embeddings for each chunk + const texts = segmentedTranscript.map((chunk: any) => chunk.text); + + try { + const embeddingsResponse = await this.cohere.v2.embed({ + model: 'embed-english-v3.0', + inputType: 'classification', + embeddingTypes: ['float'], // Specify that embeddings should be floats + texts, // Pass the array of chunk texts + }); + + if (!embeddingsResponse.embeddings.float || embeddingsResponse.embeddings.float.length !== texts.length) { + throw new Error('Mismatch between embeddings and the number of chunks'); + } + + // Assign embeddings to each chunk + segmentedTranscript.forEach((chunk: any, index: number) => { + if (!embeddingsResponse.embeddings || !embeddingsResponse.embeddings.float) { + throw new Error('Invalid embeddings response'); + } + //chunk.embedding = embeddingsResponse.embeddings.float[index]; + }); + + // Add transcript and embeddings to metadata + result = { + purpose: '', + file_name: path.basename(local_file_path), + num_pages: 0, + summary: '', + chunks: segmentedTranscript.map((chunk: any, index: number) => ({ + id: uuidv4(), + values: (embeddingsResponse.embeddings.float as number[][])[index], // Assign embedding + metadata: { + ...chunk, + original_document: doc.id, + doc_id: doc.id, + file_path: local_file_path, + start_time: chunk.start, + end_time: chunk.end, + text: chunk.text, + }, + })), + type: 'media', + doc_id: StrCast(doc.id), + }; + } catch (error) { + console.error('Error generating embeddings:', error); + throw new Error('Embedding generation failed'); } + + doc.segmented_transcript = JSON.stringify(segmentedTranscript); } else { - // Start processing the document. - doc.ai_document_status = 'PROGRESS'; - console.log(doc); - - // Get the local file path (CSV or PDF). - const local_file_path: string = CsvCast(doc.data)?.url?.pathname ?? PDFCast(doc.data)?.url?.pathname; - console.log('Local File Path:', local_file_path); - - if (local_file_path) { - console.log('Creating AI Document...'); - // Start the document creation process by sending the file to the server. - const { jobId } = await Networking.PostToServer('/createDocument', { file_path: local_file_path }); - - // Poll the server for progress updates. - const inProgress = true; - let result: (AI_Document & { doc_id: string }) | null = null; // bcz: is this the correct type?? - while (inProgress) { - // Polling interval for status updates. - await new Promise(resolve => setTimeout(resolve, 2000)); - - // Check if the job is completed. - const resultResponse = await Networking.FetchFromServer(`/getResult/${jobId}`); - const resultResponseJson = JSON.parse(resultResponse); - if (resultResponseJson.status === 'completed') { - console.log('Result here:', resultResponseJson); - result = resultResponseJson; - break; - } + // Existing document processing logic remains unchanged + console.log('Processing regular document...'); + const { jobId } = await Networking.PostToServer('/createDocument', { file_path: local_file_path }); - // Fetch progress information and update the progress callback. - const progressResponse = await Networking.FetchFromServer(`/getProgress/${jobId}`); - const progressResponseJson = JSON.parse(progressResponse); - if (progressResponseJson) { - const progress = progressResponseJson.progress; - const step = progressResponseJson.step; - progressCallback(progress, step); - } + while (true) { + await new Promise(resolve => setTimeout(resolve, 2000)); + const resultResponse = await Networking.FetchFromServer(`/getResult/${jobId}`); + const resultResponseJson = JSON.parse(resultResponse); + if (resultResponseJson.status === 'completed') { + result = resultResponseJson; + break; } - if (!result) { - console.error('Error processing document.'); - return; + const progressResponse = await Networking.FetchFromServer(`/getProgress/${jobId}`); + const progressResponseJson = JSON.parse(progressResponse); + if (progressResponseJson) { + progressCallback(progressResponseJson.progress, progressResponseJson.step); } + } + } - // Once completed, process the document and add it to the vectorstore. - console.log('Document JSON:', result); - this.documents.push(result); - await this.indexDocument(result); - console.log(`Document added: ${result.file_name}`); - - // Update document metadata such as summary, purpose, and vectorstore ID. - doc.summary = result.summary; - doc.ai_doc_id = result.doc_id; - this._doc_ids.push(result.doc_id); - doc.ai_purpose = result.purpose; - - if (!doc.vectorstore_id) { - doc.vectorstore_id = JSON.stringify([this._id]); - } else { - doc.vectorstore_id = JSON.stringify(JSON.parse(StrCast(doc.vectorstore_id)).concat([this._id])); - } + // Index the document + await this.indexDocument(result); - if (!doc.chunk_simpl) { - doc.chunk_simpl = JSON.stringify({ chunks: [] }); - } + // Simplify chunks for storage + const simplifiedChunks = result.chunks.map(chunk => ({ + chunkId: chunk.id, + start_time: chunk.metadata.start_time, + end_time: chunk.metadata.end_time, + chunkType: CHUNK_TYPE.TEXT, + text: chunk.metadata.text, + })); + doc.chunk_simpl = JSON.stringify({ chunks: simplifiedChunks }); - // Process each chunk of the document and update the document's chunk_simpl field. - result.chunks.forEach((chunk: RAGChunk) => { - const chunkToAdd = { - chunkId: chunk.id, - startPage: chunk.metadata.start_page, - endPage: chunk.metadata.end_page, - location: chunk.metadata.location, - chunkType: chunk.metadata.type as CHUNK_TYPE, - text: chunk.metadata.text, - }; - const new_chunk_simpl = JSON.parse(StrCast(doc.chunk_simpl)); - new_chunk_simpl.chunks = new_chunk_simpl.chunks.concat(chunkToAdd); - doc.chunk_simpl = JSON.stringify(new_chunk_simpl); - }); + // Preserve existing metadata updates + if (!doc.vectorstore_id) { + doc.vectorstore_id = JSON.stringify([this._id]); + } else { + doc.vectorstore_id = JSON.stringify(JSON.parse(StrCast(doc.vectorstore_id)).concat([this._id])); + } - // Mark the document status as completed. - doc.ai_document_status = 'COMPLETED'; - } + if (!doc.chunk_simpl) { + doc.chunk_simpl = JSON.stringify({ chunks: [] }); } + + result.chunks.forEach((chunk: RAGChunk) => { + const chunkToAdd = { + chunkId: chunk.id, + startPage: chunk.metadata.start_page, + endPage: chunk.metadata.end_page, + location: chunk.metadata.location, + chunkType: chunk.metadata.type as CHUNK_TYPE, + text: chunk.metadata.text, + }; + const new_chunk_simpl = JSON.parse(StrCast(doc.chunk_simpl)); + new_chunk_simpl.chunks = new_chunk_simpl.chunks.concat(chunkToAdd); + doc.chunk_simpl = JSON.stringify(new_chunk_simpl); + }); + + console.log(`Document added: ${result.file_name}`); } /** @@ -201,6 +231,39 @@ export class Vectorstore { } /** + * Combines chunks until their combined text is at least 500 words. + * @param chunks The original chunks. + * @returns Combined chunks. + */ + private combineChunks(chunks: RAGChunk[]): RAGChunk[] { + const combinedChunks: RAGChunk[] = []; + let currentChunk: RAGChunk | null = null; + let wordCount = 0; + + chunks.forEach(chunk => { + const textWords = chunk.metadata.text.split(' ').length; + + if (!currentChunk) { + currentChunk = { ...chunk, metadata: { ...chunk.metadata, text: chunk.metadata.text } }; + wordCount = textWords; + } else if (wordCount + textWords >= 500) { + combinedChunks.push(currentChunk); + currentChunk = { ...chunk, metadata: { ...chunk.metadata, text: chunk.metadata.text } }; + wordCount = textWords; + } else { + currentChunk.metadata.text += ` ${chunk.metadata.text}`; + wordCount += textWords; + } + }); + + if (currentChunk) { + combinedChunks.push(currentChunk); + } + + return combinedChunks; + } + + /** * Retrieves the top K document chunks relevant to the user's query. * This involves embedding the query using Cohere, then querying Pinecone for matching vectors. * @param query The search query string. |
