diff options
| author | bobzel <zzzman@gmail.com> | 2025-03-10 16:13:04 -0400 |
|---|---|---|
| committer | bobzel <zzzman@gmail.com> | 2025-03-10 16:13:04 -0400 |
| commit | b7989dded8bb001876de6cbca59bf77935f0daf7 (patch) | |
| tree | 0dba0665674db7bb84770833df0a4100d0520701 /src/client/views/nodes/chatbot/vectorstore | |
| parent | 4979415d4604d280e81a162bf9a9d39c731d3738 (diff) | |
| parent | 5bf944035c0ba94ad15245416f51ca0329a51bde (diff) | |
Merge branch 'master' into alyssa-starter
Diffstat (limited to 'src/client/views/nodes/chatbot/vectorstore')
| -rw-r--r-- | src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts | 254 |
1 files changed, 162 insertions, 92 deletions
diff --git a/src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts b/src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts index f96f55997..afd34f28d 100644 --- a/src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts +++ b/src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts @@ -1,37 +1,40 @@ /** * @file Vectorstore.ts - * @description This file defines the Vectorstore class, which integrates with Pinecone for vector-based document indexing and Cohere for text embeddings. - * It handles tasks such as AI document management, document chunking, and retrieval of relevant document sections based on user queries. - * The class supports adding documents to the vectorstore, managing document status, and querying Pinecone for document chunks matching a query. + * @description This file defines the Vectorstore class, which integrates with Pinecone for vector-based document indexing and OpenAI text-embedding-3-large for text embeddings. + * It manages AI document handling, including adding documents, processing media files, combining document chunks, indexing documents, + * and retrieving relevant sections based on user queries. */ import { Index, IndexList, Pinecone, PineconeRecord, QueryResponse, RecordMetadata } from '@pinecone-database/pinecone'; -import { CohereClient } from 'cohere-ai'; -import { EmbedResponse } from 'cohere-ai/api'; import dotenv from 'dotenv'; +import path from 'path'; +import { v4 as uuidv4 } from 'uuid'; import { Doc } from '../../../../../fields/Doc'; -import { CsvCast, PDFCast, StrCast } from '../../../../../fields/Types'; +import { AudioCast, CsvCast, PDFCast, StrCast, VideoCast } from '../../../../../fields/Types'; import { Networking } from '../../../../Network'; import { AI_Document, CHUNK_TYPE, RAGChunk } from '../types/types'; +import OpenAI from 'openai'; +import { Embedding } from 'openai/resources'; +import { PineconeEnvironmentVarsNotSupportedError } from '@pinecone-database/pinecone/dist/errors'; dotenv.config(); /** * The Vectorstore class integrates with Pinecone for vector-based document indexing and retrieval, - * and Cohere for text embedding. It handles AI document management, uploads, and query-based retrieval. + * and OpenAI text-embedding-3-large for text embedding. It handles AI document management, uploads, and query-based retrieval. */ export class Vectorstore { private pinecone: Pinecone; // Pinecone client for managing the vector index. private index!: Index; // The specific Pinecone index used for document chunks. - private cohere: CohereClient; // Cohere client for generating embeddings. + private openai: OpenAI; // OpenAI client for generating embeddings. private indexName: string = 'pdf-chatbot'; // Default name for the index. private _id: string; // Unique ID for the Vectorstore instance. - private _doc_ids: string[] = []; // List of document IDs handled by this instance. + private _doc_ids: () => string[]; // List of document IDs handled by this instance. documents: AI_Document[] = []; // Store the documents indexed in the vectorstore. /** - * Constructor initializes the Pinecone and Cohere clients, sets up the document ID list, + * Initializes the Pinecone and OpenAI clients, sets up the document ID list, * and initializes the Pinecone index. * @param id The unique identifier for the vectorstore instance. * @param doc_ids A function that returns a list of document IDs. @@ -42,17 +45,17 @@ export class Vectorstore { throw new Error('PINECONE_API_KEY is not defined.'); } - // Initialize Pinecone and Cohere clients with API keys from the environment. + // Initialize Pinecone and OpenAI clients with API keys from the environment. this.pinecone = new Pinecone({ apiKey: pineconeApiKey }); - this.cohere = new CohereClient({ token: process.env.COHERE_API_KEY }); + this.openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY, dangerouslyAllowBrowser: true }); this._id = id; - this._doc_ids = doc_ids(); + this._doc_ids = doc_ids; this.initializeIndex(); } /** - * Initializes the Pinecone index by checking if it exists, and creating it if not. - * The index is set to use the cosine metric for vector similarity. + * Initializes the Pinecone index by checking if it exists and creating it if necessary. + * Sets the index to use cosine similarity for vector similarity calculations. */ private async initializeIndex() { const indexList: IndexList = await this.pinecone.listIndexes(); @@ -61,7 +64,7 @@ export class Vectorstore { if (!indexList.indexes?.some(index => index.name === this.indexName)) { await this.pinecone.createIndex({ name: this.indexName, - dimension: 1024, + dimension: 3072, metric: 'cosine', spec: { serverless: { @@ -77,91 +80,120 @@ export class Vectorstore { } /** - * Adds an AI document to the vectorstore. This method handles document chunking, uploading to the - * vectorstore, and updating the progress for long-running tasks like file uploads. - * @param doc The document to be added to the vectorstore. - * @param progressCallback Callback to update the progress of the upload. + * Adds an AI document to the vectorstore. Handles media file processing for audio/video, + * and text embedding for all document types. Updates document metadata during processing. + * @param doc The document to add. + * @param progressCallback Callback to track the progress of the addition process. */ async addAIDoc(doc: Doc, progressCallback: (progress: number, step: string) => void) { - console.log('Adding AI Document:', doc); const ai_document_status: string = StrCast(doc.ai_document_status); // Skip if the document is already in progress or completed. if (ai_document_status !== undefined && ai_document_status.trim() !== '' && ai_document_status !== '{}') { - if (ai_document_status === 'IN PROGRESS') { + if (ai_document_status === 'PROGRESS') { console.log('Already in progress.'); return; - } - if (!this._doc_ids.includes(StrCast(doc.ai_doc_id))) { - this._doc_ids.push(StrCast(doc.ai_doc_id)); + } else if (ai_document_status === 'COMPLETED') { + console.log('Already completed.'); + return; } } else { // Start processing the document. doc.ai_document_status = 'PROGRESS'; - console.log(doc); + const local_file_path: string = CsvCast(doc.data)?.url?.pathname ?? PDFCast(doc.data)?.url?.pathname ?? VideoCast(doc.data)?.url?.pathname ?? AudioCast(doc.data)?.url?.pathname; + + if (!local_file_path) { + console.log('Invalid file path.'); + return; + } + + const isAudioOrVideo = local_file_path.endsWith('.mp3') || local_file_path.endsWith('.mp4'); + let result: AI_Document & { doc_id: string }; + if (isAudioOrVideo) { + console.log('Processing media file...'); + const response = await Networking.PostToServer('/processMediaFile', { fileName: path.basename(local_file_path) }); + const segmentedTranscript = response.condensed; + console.log(segmentedTranscript); + const summary = response.summary; + doc.summary = summary; + // Generate embeddings for each chunk + const texts = segmentedTranscript.map((chunk: any) => chunk.text); - // Get the local file path (CSV or PDF). - const local_file_path: string = CsvCast(doc.data)?.url?.pathname ?? PDFCast(doc.data)?.url?.pathname; - console.log('Local File Path:', local_file_path); + try { + const embeddingsResponse = await this.openai.embeddings.create({ + model: 'text-embedding-3-large', + input: texts, + encoding_format: 'float', + }); - if (local_file_path) { - console.log('Creating AI Document...'); - // Start the document creation process by sending the file to the server. + doc.original_segments = JSON.stringify(response.full); + doc.ai_type = local_file_path.endsWith('.mp3') ? 'audio' : 'video'; + const doc_id = uuidv4(); + + // Add transcript and embeddings to metadata + result = { + doc_id, + purpose: '', + file_name: local_file_path, + num_pages: 0, + summary: '', + chunks: segmentedTranscript.map((chunk: any, index: number) => ({ + id: uuidv4(), + values: (embeddingsResponse.data as Embedding[])[index].embedding, // Assign embedding + metadata: { + indexes: chunk.indexes, + original_document: local_file_path, + doc_id: doc_id, + file_path: local_file_path, + start_time: chunk.start, + end_time: chunk.end, + text: chunk.text, + type: CHUNK_TYPE.VIDEO, + }, + })), + type: 'media', + }; + } catch (error) { + console.error('Error generating embeddings:', error); + throw new Error('Embedding generation failed'); + } + + doc.segmented_transcript = JSON.stringify(segmentedTranscript); + // Simplify chunks for storage + const simplifiedChunks = result.chunks.map(chunk => ({ + chunkId: chunk.id, + start_time: chunk.metadata.start_time, + end_time: chunk.metadata.end_time, + indexes: chunk.metadata.indexes, + chunkType: CHUNK_TYPE.VIDEO, + text: chunk.metadata.text, + })); + doc.chunk_simpl = JSON.stringify({ chunks: simplifiedChunks }); + } else { + // Existing document processing logic remains unchanged + console.log('Processing regular document...'); const { jobId } = await Networking.PostToServer('/createDocument', { file_path: local_file_path }); - // Poll the server for progress updates. - const inProgress = true; - let result: (AI_Document & { doc_id: string }) | null = null; // bcz: is this the correct type?? - while (inProgress) { - // Polling interval for status updates. + while (true) { await new Promise(resolve => setTimeout(resolve, 2000)); - - // Check if the job is completed. const resultResponse = await Networking.FetchFromServer(`/getResult/${jobId}`); const resultResponseJson = JSON.parse(resultResponse); if (resultResponseJson.status === 'completed') { - console.log('Result here:', resultResponseJson); result = resultResponseJson; break; } - - // Fetch progress information and update the progress callback. const progressResponse = await Networking.FetchFromServer(`/getProgress/${jobId}`); const progressResponseJson = JSON.parse(progressResponse); if (progressResponseJson) { - const progress = progressResponseJson.progress; - const step = progressResponseJson.step; - progressCallback(progress, step); + progressCallback(progressResponseJson.progress, progressResponseJson.step); } } - if (!result) { - console.error('Error processing document.'); - return; - } - - // Once completed, process the document and add it to the vectorstore. - console.log('Document JSON:', result); - this.documents.push(result); - await this.indexDocument(result); - console.log(`Document added: ${result.file_name}`); - - // Update document metadata such as summary, purpose, and vectorstore ID. - doc.summary = result.summary; - doc.ai_doc_id = result.doc_id; - this._doc_ids.push(result.doc_id); - doc.ai_purpose = result.purpose; - - if (!doc.vectorstore_id) { - doc.vectorstore_id = JSON.stringify([this._id]); - } else { - doc.vectorstore_id = JSON.stringify(JSON.parse(StrCast(doc.vectorstore_id)).concat([this._id])); - } - if (!doc.chunk_simpl) { doc.chunk_simpl = JSON.stringify({ chunks: [] }); } + doc.summary = result.summary; + doc.ai_purpose = result.purpose; - // Process each chunk of the document and update the document's chunk_simpl field. result.chunks.forEach((chunk: RAGChunk) => { const chunkToAdd = { chunkId: chunk.id, @@ -175,15 +207,28 @@ export class Vectorstore { new_chunk_simpl.chunks = new_chunk_simpl.chunks.concat(chunkToAdd); doc.chunk_simpl = JSON.stringify(new_chunk_simpl); }); + } + + // Index the document + await this.indexDocument(result); - // Mark the document status as completed. - doc.ai_document_status = 'COMPLETED'; + // Preserve existing metadata updates + if (!doc.vectorstore_id) { + doc.vectorstore_id = JSON.stringify([this._id]); + } else { + doc.vectorstore_id = JSON.stringify(JSON.parse(StrCast(doc.vectorstore_id)).concat([this._id])); } + + doc.ai_doc_id = result.doc_id; + + console.log(`Document added: ${result.file_name}`); + doc.ai_document_status = 'COMPLETED'; } } /** - * Indexes the processed document by uploading the document's vector chunks to the Pinecone index. + * Uploads the document's vector chunks to the Pinecone index. + * Prepares the metadata for each chunk and uses Pinecone's upsert operation. * @param document The processed document containing its chunks and metadata. */ private async indexDocument(document: AI_Document) { @@ -201,8 +246,42 @@ export class Vectorstore { } /** - * Retrieves the top K document chunks relevant to the user's query. - * This involves embedding the query using Cohere, then querying Pinecone for matching vectors. + * Combines document chunks until their combined text reaches a minimum word count. + * This is used to optimize retrieval and indexing processes. + * @param chunks The original chunks to combine. + * @returns Combined chunks with updated text and metadata. + */ + private combineChunks(chunks: RAGChunk[]): RAGChunk[] { + const combinedChunks: RAGChunk[] = []; + let currentChunk: RAGChunk | null = null; + let wordCount = 0; + + chunks.forEach(chunk => { + const textWords = chunk.metadata.text.split(' ').length; + + if (!currentChunk) { + currentChunk = { ...chunk, metadata: { ...chunk.metadata, text: chunk.metadata.text } }; + wordCount = textWords; + } else if (wordCount + textWords >= 500) { + combinedChunks.push(currentChunk); + currentChunk = { ...chunk, metadata: { ...chunk.metadata, text: chunk.metadata.text } }; + wordCount = textWords; + } else { + currentChunk.metadata.text += ` ${chunk.metadata.text}`; + wordCount += textWords; + } + }); + + if (currentChunk) { + combinedChunks.push(currentChunk); + } + + return combinedChunks; + } + + /** + * Retrieves the most relevant document chunks for a given query. + * Uses OpenAI for embedding the query and Pinecone for vector similarity matching. * @param query The search query string. * @param topK The number of top results to return (default is 10). * @returns A list of document chunks that match the query. @@ -210,38 +289,29 @@ export class Vectorstore { async retrieve(query: string, topK: number = 10): Promise<RAGChunk[]> { console.log(`Retrieving chunks for query: ${query}`); try { - // Generate an embedding for the query using Cohere. - const queryEmbeddingResponse: EmbedResponse = await this.cohere.embed({ - texts: [query], - model: 'embed-english-v3.0', - inputType: 'search_query', + // Generate an embedding for the query using OpenAI. + const queryEmbeddingResponse = await this.openai.embeddings.create({ + model: 'text-embedding-3-large', + input: query, + encoding_format: 'float', }); - let queryEmbedding: number[]; + let queryEmbedding = queryEmbeddingResponse.data[0].embedding; // Extract the embedding from the response. - if (Array.isArray(queryEmbeddingResponse.embeddings)) { - queryEmbedding = queryEmbeddingResponse.embeddings[0]; - } else if (queryEmbeddingResponse.embeddings && 'embeddings' in queryEmbeddingResponse.embeddings) { - queryEmbedding = (queryEmbeddingResponse.embeddings as { embeddings: number[][] }).embeddings[0]; - } else { - throw new Error('Invalid embedding response format'); - } - - if (!Array.isArray(queryEmbedding)) { - throw new Error('Query embedding is not an array'); - } + console.log(this._doc_ids()); // Query the Pinecone index using the embedding and filter by document IDs. const queryResponse: QueryResponse = await this.index.query({ vector: queryEmbedding, filter: { - doc_id: { $in: this._doc_ids }, + doc_id: { $in: this._doc_ids() }, }, topK, includeValues: true, includeMetadata: true, }); + console.log(queryResponse); // Map the results into RAGChunks and return them. return queryResponse.matches.map( |
