aboutsummaryrefslogtreecommitdiff
path: root/src/client/views/nodes/chatbot/vectorstore
diff options
context:
space:
mode:
authorbobzel <zzzman@gmail.com>2025-03-10 16:13:04 -0400
committerbobzel <zzzman@gmail.com>2025-03-10 16:13:04 -0400
commitb7989dded8bb001876de6cbca59bf77935f0daf7 (patch)
tree0dba0665674db7bb84770833df0a4100d0520701 /src/client/views/nodes/chatbot/vectorstore
parent4979415d4604d280e81a162bf9a9d39c731d3738 (diff)
parent5bf944035c0ba94ad15245416f51ca0329a51bde (diff)
Merge branch 'master' into alyssa-starter
Diffstat (limited to 'src/client/views/nodes/chatbot/vectorstore')
-rw-r--r--src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts254
1 files changed, 162 insertions, 92 deletions
diff --git a/src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts b/src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts
index f96f55997..afd34f28d 100644
--- a/src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts
+++ b/src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts
@@ -1,37 +1,40 @@
/**
* @file Vectorstore.ts
- * @description This file defines the Vectorstore class, which integrates with Pinecone for vector-based document indexing and Cohere for text embeddings.
- * It handles tasks such as AI document management, document chunking, and retrieval of relevant document sections based on user queries.
- * The class supports adding documents to the vectorstore, managing document status, and querying Pinecone for document chunks matching a query.
+ * @description This file defines the Vectorstore class, which integrates with Pinecone for vector-based document indexing and OpenAI text-embedding-3-large for text embeddings.
+ * It manages AI document handling, including adding documents, processing media files, combining document chunks, indexing documents,
+ * and retrieving relevant sections based on user queries.
*/
import { Index, IndexList, Pinecone, PineconeRecord, QueryResponse, RecordMetadata } from '@pinecone-database/pinecone';
-import { CohereClient } from 'cohere-ai';
-import { EmbedResponse } from 'cohere-ai/api';
import dotenv from 'dotenv';
+import path from 'path';
+import { v4 as uuidv4 } from 'uuid';
import { Doc } from '../../../../../fields/Doc';
-import { CsvCast, PDFCast, StrCast } from '../../../../../fields/Types';
+import { AudioCast, CsvCast, PDFCast, StrCast, VideoCast } from '../../../../../fields/Types';
import { Networking } from '../../../../Network';
import { AI_Document, CHUNK_TYPE, RAGChunk } from '../types/types';
+import OpenAI from 'openai';
+import { Embedding } from 'openai/resources';
+import { PineconeEnvironmentVarsNotSupportedError } from '@pinecone-database/pinecone/dist/errors';
dotenv.config();
/**
* The Vectorstore class integrates with Pinecone for vector-based document indexing and retrieval,
- * and Cohere for text embedding. It handles AI document management, uploads, and query-based retrieval.
+ * and OpenAI text-embedding-3-large for text embedding. It handles AI document management, uploads, and query-based retrieval.
*/
export class Vectorstore {
private pinecone: Pinecone; // Pinecone client for managing the vector index.
private index!: Index; // The specific Pinecone index used for document chunks.
- private cohere: CohereClient; // Cohere client for generating embeddings.
+ private openai: OpenAI; // OpenAI client for generating embeddings.
private indexName: string = 'pdf-chatbot'; // Default name for the index.
private _id: string; // Unique ID for the Vectorstore instance.
- private _doc_ids: string[] = []; // List of document IDs handled by this instance.
+ private _doc_ids: () => string[]; // List of document IDs handled by this instance.
documents: AI_Document[] = []; // Store the documents indexed in the vectorstore.
/**
- * Constructor initializes the Pinecone and Cohere clients, sets up the document ID list,
+ * Initializes the Pinecone and OpenAI clients, sets up the document ID list,
* and initializes the Pinecone index.
* @param id The unique identifier for the vectorstore instance.
* @param doc_ids A function that returns a list of document IDs.
@@ -42,17 +45,17 @@ export class Vectorstore {
throw new Error('PINECONE_API_KEY is not defined.');
}
- // Initialize Pinecone and Cohere clients with API keys from the environment.
+ // Initialize Pinecone and OpenAI clients with API keys from the environment.
this.pinecone = new Pinecone({ apiKey: pineconeApiKey });
- this.cohere = new CohereClient({ token: process.env.COHERE_API_KEY });
+ this.openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY, dangerouslyAllowBrowser: true });
this._id = id;
- this._doc_ids = doc_ids();
+ this._doc_ids = doc_ids;
this.initializeIndex();
}
/**
- * Initializes the Pinecone index by checking if it exists, and creating it if not.
- * The index is set to use the cosine metric for vector similarity.
+ * Initializes the Pinecone index by checking if it exists and creating it if necessary.
+ * Sets the index to use cosine similarity for vector similarity calculations.
*/
private async initializeIndex() {
const indexList: IndexList = await this.pinecone.listIndexes();
@@ -61,7 +64,7 @@ export class Vectorstore {
if (!indexList.indexes?.some(index => index.name === this.indexName)) {
await this.pinecone.createIndex({
name: this.indexName,
- dimension: 1024,
+ dimension: 3072,
metric: 'cosine',
spec: {
serverless: {
@@ -77,91 +80,120 @@ export class Vectorstore {
}
/**
- * Adds an AI document to the vectorstore. This method handles document chunking, uploading to the
- * vectorstore, and updating the progress for long-running tasks like file uploads.
- * @param doc The document to be added to the vectorstore.
- * @param progressCallback Callback to update the progress of the upload.
+ * Adds an AI document to the vectorstore. Handles media file processing for audio/video,
+ * and text embedding for all document types. Updates document metadata during processing.
+ * @param doc The document to add.
+ * @param progressCallback Callback to track the progress of the addition process.
*/
async addAIDoc(doc: Doc, progressCallback: (progress: number, step: string) => void) {
- console.log('Adding AI Document:', doc);
const ai_document_status: string = StrCast(doc.ai_document_status);
// Skip if the document is already in progress or completed.
if (ai_document_status !== undefined && ai_document_status.trim() !== '' && ai_document_status !== '{}') {
- if (ai_document_status === 'IN PROGRESS') {
+ if (ai_document_status === 'PROGRESS') {
console.log('Already in progress.');
return;
- }
- if (!this._doc_ids.includes(StrCast(doc.ai_doc_id))) {
- this._doc_ids.push(StrCast(doc.ai_doc_id));
+ } else if (ai_document_status === 'COMPLETED') {
+ console.log('Already completed.');
+ return;
}
} else {
// Start processing the document.
doc.ai_document_status = 'PROGRESS';
- console.log(doc);
+ const local_file_path: string = CsvCast(doc.data)?.url?.pathname ?? PDFCast(doc.data)?.url?.pathname ?? VideoCast(doc.data)?.url?.pathname ?? AudioCast(doc.data)?.url?.pathname;
+
+ if (!local_file_path) {
+ console.log('Invalid file path.');
+ return;
+ }
+
+ const isAudioOrVideo = local_file_path.endsWith('.mp3') || local_file_path.endsWith('.mp4');
+ let result: AI_Document & { doc_id: string };
+ if (isAudioOrVideo) {
+ console.log('Processing media file...');
+ const response = await Networking.PostToServer('/processMediaFile', { fileName: path.basename(local_file_path) });
+ const segmentedTranscript = response.condensed;
+ console.log(segmentedTranscript);
+ const summary = response.summary;
+ doc.summary = summary;
+ // Generate embeddings for each chunk
+ const texts = segmentedTranscript.map((chunk: any) => chunk.text);
- // Get the local file path (CSV or PDF).
- const local_file_path: string = CsvCast(doc.data)?.url?.pathname ?? PDFCast(doc.data)?.url?.pathname;
- console.log('Local File Path:', local_file_path);
+ try {
+ const embeddingsResponse = await this.openai.embeddings.create({
+ model: 'text-embedding-3-large',
+ input: texts,
+ encoding_format: 'float',
+ });
- if (local_file_path) {
- console.log('Creating AI Document...');
- // Start the document creation process by sending the file to the server.
+ doc.original_segments = JSON.stringify(response.full);
+ doc.ai_type = local_file_path.endsWith('.mp3') ? 'audio' : 'video';
+ const doc_id = uuidv4();
+
+ // Add transcript and embeddings to metadata
+ result = {
+ doc_id,
+ purpose: '',
+ file_name: local_file_path,
+ num_pages: 0,
+ summary: '',
+ chunks: segmentedTranscript.map((chunk: any, index: number) => ({
+ id: uuidv4(),
+ values: (embeddingsResponse.data as Embedding[])[index].embedding, // Assign embedding
+ metadata: {
+ indexes: chunk.indexes,
+ original_document: local_file_path,
+ doc_id: doc_id,
+ file_path: local_file_path,
+ start_time: chunk.start,
+ end_time: chunk.end,
+ text: chunk.text,
+ type: CHUNK_TYPE.VIDEO,
+ },
+ })),
+ type: 'media',
+ };
+ } catch (error) {
+ console.error('Error generating embeddings:', error);
+ throw new Error('Embedding generation failed');
+ }
+
+ doc.segmented_transcript = JSON.stringify(segmentedTranscript);
+ // Simplify chunks for storage
+ const simplifiedChunks = result.chunks.map(chunk => ({
+ chunkId: chunk.id,
+ start_time: chunk.metadata.start_time,
+ end_time: chunk.metadata.end_time,
+ indexes: chunk.metadata.indexes,
+ chunkType: CHUNK_TYPE.VIDEO,
+ text: chunk.metadata.text,
+ }));
+ doc.chunk_simpl = JSON.stringify({ chunks: simplifiedChunks });
+ } else {
+ // Existing document processing logic remains unchanged
+ console.log('Processing regular document...');
const { jobId } = await Networking.PostToServer('/createDocument', { file_path: local_file_path });
- // Poll the server for progress updates.
- const inProgress = true;
- let result: (AI_Document & { doc_id: string }) | null = null; // bcz: is this the correct type??
- while (inProgress) {
- // Polling interval for status updates.
+ while (true) {
await new Promise(resolve => setTimeout(resolve, 2000));
-
- // Check if the job is completed.
const resultResponse = await Networking.FetchFromServer(`/getResult/${jobId}`);
const resultResponseJson = JSON.parse(resultResponse);
if (resultResponseJson.status === 'completed') {
- console.log('Result here:', resultResponseJson);
result = resultResponseJson;
break;
}
-
- // Fetch progress information and update the progress callback.
const progressResponse = await Networking.FetchFromServer(`/getProgress/${jobId}`);
const progressResponseJson = JSON.parse(progressResponse);
if (progressResponseJson) {
- const progress = progressResponseJson.progress;
- const step = progressResponseJson.step;
- progressCallback(progress, step);
+ progressCallback(progressResponseJson.progress, progressResponseJson.step);
}
}
- if (!result) {
- console.error('Error processing document.');
- return;
- }
-
- // Once completed, process the document and add it to the vectorstore.
- console.log('Document JSON:', result);
- this.documents.push(result);
- await this.indexDocument(result);
- console.log(`Document added: ${result.file_name}`);
-
- // Update document metadata such as summary, purpose, and vectorstore ID.
- doc.summary = result.summary;
- doc.ai_doc_id = result.doc_id;
- this._doc_ids.push(result.doc_id);
- doc.ai_purpose = result.purpose;
-
- if (!doc.vectorstore_id) {
- doc.vectorstore_id = JSON.stringify([this._id]);
- } else {
- doc.vectorstore_id = JSON.stringify(JSON.parse(StrCast(doc.vectorstore_id)).concat([this._id]));
- }
-
if (!doc.chunk_simpl) {
doc.chunk_simpl = JSON.stringify({ chunks: [] });
}
+ doc.summary = result.summary;
+ doc.ai_purpose = result.purpose;
- // Process each chunk of the document and update the document's chunk_simpl field.
result.chunks.forEach((chunk: RAGChunk) => {
const chunkToAdd = {
chunkId: chunk.id,
@@ -175,15 +207,28 @@ export class Vectorstore {
new_chunk_simpl.chunks = new_chunk_simpl.chunks.concat(chunkToAdd);
doc.chunk_simpl = JSON.stringify(new_chunk_simpl);
});
+ }
+
+ // Index the document
+ await this.indexDocument(result);
- // Mark the document status as completed.
- doc.ai_document_status = 'COMPLETED';
+ // Preserve existing metadata updates
+ if (!doc.vectorstore_id) {
+ doc.vectorstore_id = JSON.stringify([this._id]);
+ } else {
+ doc.vectorstore_id = JSON.stringify(JSON.parse(StrCast(doc.vectorstore_id)).concat([this._id]));
}
+
+ doc.ai_doc_id = result.doc_id;
+
+ console.log(`Document added: ${result.file_name}`);
+ doc.ai_document_status = 'COMPLETED';
}
}
/**
- * Indexes the processed document by uploading the document's vector chunks to the Pinecone index.
+ * Uploads the document's vector chunks to the Pinecone index.
+ * Prepares the metadata for each chunk and uses Pinecone's upsert operation.
* @param document The processed document containing its chunks and metadata.
*/
private async indexDocument(document: AI_Document) {
@@ -201,8 +246,42 @@ export class Vectorstore {
}
/**
- * Retrieves the top K document chunks relevant to the user's query.
- * This involves embedding the query using Cohere, then querying Pinecone for matching vectors.
+ * Combines document chunks until their combined text reaches a minimum word count.
+ * This is used to optimize retrieval and indexing processes.
+ * @param chunks The original chunks to combine.
+ * @returns Combined chunks with updated text and metadata.
+ */
+ private combineChunks(chunks: RAGChunk[]): RAGChunk[] {
+ const combinedChunks: RAGChunk[] = [];
+ let currentChunk: RAGChunk | null = null;
+ let wordCount = 0;
+
+ chunks.forEach(chunk => {
+ const textWords = chunk.metadata.text.split(' ').length;
+
+ if (!currentChunk) {
+ currentChunk = { ...chunk, metadata: { ...chunk.metadata, text: chunk.metadata.text } };
+ wordCount = textWords;
+ } else if (wordCount + textWords >= 500) {
+ combinedChunks.push(currentChunk);
+ currentChunk = { ...chunk, metadata: { ...chunk.metadata, text: chunk.metadata.text } };
+ wordCount = textWords;
+ } else {
+ currentChunk.metadata.text += ` ${chunk.metadata.text}`;
+ wordCount += textWords;
+ }
+ });
+
+ if (currentChunk) {
+ combinedChunks.push(currentChunk);
+ }
+
+ return combinedChunks;
+ }
+
+ /**
+ * Retrieves the most relevant document chunks for a given query.
+ * Uses OpenAI for embedding the query and Pinecone for vector similarity matching.
* @param query The search query string.
* @param topK The number of top results to return (default is 10).
* @returns A list of document chunks that match the query.
@@ -210,38 +289,29 @@ export class Vectorstore {
async retrieve(query: string, topK: number = 10): Promise<RAGChunk[]> {
console.log(`Retrieving chunks for query: ${query}`);
try {
- // Generate an embedding for the query using Cohere.
- const queryEmbeddingResponse: EmbedResponse = await this.cohere.embed({
- texts: [query],
- model: 'embed-english-v3.0',
- inputType: 'search_query',
+ // Generate an embedding for the query using OpenAI.
+ const queryEmbeddingResponse = await this.openai.embeddings.create({
+ model: 'text-embedding-3-large',
+ input: query,
+ encoding_format: 'float',
});
- let queryEmbedding: number[];
+ let queryEmbedding = queryEmbeddingResponse.data[0].embedding;
// Extract the embedding from the response.
- if (Array.isArray(queryEmbeddingResponse.embeddings)) {
- queryEmbedding = queryEmbeddingResponse.embeddings[0];
- } else if (queryEmbeddingResponse.embeddings && 'embeddings' in queryEmbeddingResponse.embeddings) {
- queryEmbedding = (queryEmbeddingResponse.embeddings as { embeddings: number[][] }).embeddings[0];
- } else {
- throw new Error('Invalid embedding response format');
- }
-
- if (!Array.isArray(queryEmbedding)) {
- throw new Error('Query embedding is not an array');
- }
+ console.log(this._doc_ids());
// Query the Pinecone index using the embedding and filter by document IDs.
const queryResponse: QueryResponse = await this.index.query({
vector: queryEmbedding,
filter: {
- doc_id: { $in: this._doc_ids },
+ doc_id: { $in: this._doc_ids() },
},
topK,
includeValues: true,
includeMetadata: true,
});
+ console.log(queryResponse);
// Map the results into RAGChunks and return them.
return queryResponse.matches.map(