aboutsummaryrefslogtreecommitdiff
path: root/src/client/views/nodes/chatbot/vectorstore
diff options
context:
space:
mode:
Diffstat (limited to 'src/client/views/nodes/chatbot/vectorstore')
-rw-r--r--src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts258
1 files changed, 258 insertions, 0 deletions
diff --git a/src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts b/src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts
new file mode 100644
index 000000000..07a2b73bc
--- /dev/null
+++ b/src/client/views/nodes/chatbot/vectorstore/Vectorstore.ts
@@ -0,0 +1,258 @@
+import { Pinecone, Index, IndexList, PineconeRecord, RecordMetadata, QueryResponse } from '@pinecone-database/pinecone';
+import { CohereClient } from 'cohere-ai';
+import { EmbedResponse } from 'cohere-ai/api';
+import dotenv from 'dotenv';
+import { RAGChunk, AI_Document, CHUNK_TYPE } from '../types/types';
+import { Doc } from '../../../../../fields/Doc';
+import { CsvCast, PDFCast, StrCast } from '../../../../../fields/Types';
+import { Networking } from '../../../../Network';
+
+dotenv.config();
+
+/**
+ * The Vectorstore class integrates with Pinecone for vector-based document indexing and retrieval,
+ * and Cohere for text embedding. It handles AI document management, uploads, and query-based retrieval.
+ */
+export class Vectorstore {
+ private pinecone: Pinecone; // Pinecone client for managing the vector index.
+ private index!: Index; // The specific Pinecone index used for document chunks.
+ private cohere: CohereClient; // Cohere client for generating embeddings.
+ private indexName: string = 'pdf-chatbot'; // Default name for the index.
+ private _id: string; // Unique ID for the Vectorstore instance.
+ private _doc_ids: string[] = []; // List of document IDs handled by this instance.
+
+ documents: AI_Document[] = []; // Store the documents indexed in the vectorstore.
+
+ /**
+ * Constructor initializes the Pinecone and Cohere clients, sets up the document ID list,
+ * and initializes the Pinecone index.
+ * @param id The unique identifier for the vectorstore instance.
+ * @param doc_ids A function that returns a list of document IDs.
+ */
+ constructor(id: string, doc_ids: () => string[]) {
+ const pineconeApiKey = process.env.PINECONE_API_KEY;
+ if (!pineconeApiKey) {
+ throw new Error('PINECONE_API_KEY is not defined.');
+ }
+
+ // Initialize Pinecone and Cohere clients with API keys from the environment.
+ this.pinecone = new Pinecone({ apiKey: pineconeApiKey });
+ this.cohere = new CohereClient({ token: process.env.COHERE_API_KEY });
+ this._id = id;
+ this._doc_ids = doc_ids();
+ this.initializeIndex();
+ }
+
+ /**
+ * Initializes the Pinecone index by checking if it exists, and creating it if not.
+ * The index is set to use the cosine metric for vector similarity.
+ */
+ private async initializeIndex() {
+ const indexList: IndexList = await this.pinecone.listIndexes();
+
+ // Check if the index already exists, otherwise create it.
+ if (!indexList.indexes?.some(index => index.name === this.indexName)) {
+ await this.pinecone.createIndex({
+ name: this.indexName,
+ dimension: 1024,
+ metric: 'cosine',
+ spec: {
+ serverless: {
+ cloud: 'aws',
+ region: 'us-east-1',
+ },
+ },
+ });
+ }
+
+ // Set the index for future use.
+ this.index = this.pinecone.Index(this.indexName);
+ }
+
+ /**
+ * Adds an AI document to the vectorstore. This method handles document chunking, uploading to the
+ * vectorstore, and updating the progress for long-running tasks like file uploads.
+ * @param doc The document to be added to the vectorstore.
+ * @param progressCallback Callback to update the progress of the upload.
+ */
+ async addAIDoc(doc: Doc, progressCallback: (progress: number, step: string) => void) {
+ console.log('Adding AI Document:', doc);
+ const ai_document_status: string = StrCast(doc.ai_document_status);
+
+ // Skip if the document is already in progress or completed.
+ if (ai_document_status !== undefined && ai_document_status.trim() !== '' && ai_document_status !== '{}') {
+ if (ai_document_status === 'IN PROGRESS') {
+ console.log('Already in progress.');
+ return;
+ }
+ if (!this._doc_ids.includes(StrCast(doc.ai_doc_id))) {
+ this._doc_ids.push(StrCast(doc.ai_doc_id));
+ }
+ } else {
+ // Start processing the document.
+ doc.ai_document_status = 'PROGRESS';
+ console.log(doc);
+
+ // Get the local file path (CSV or PDF).
+ const local_file_path: string = CsvCast(doc.data)?.url?.pathname ?? PDFCast(doc.data)?.url?.pathname;
+ console.log('Local File Path:', local_file_path);
+
+ if (local_file_path) {
+ console.log('Creating AI Document...');
+ // Start the document creation process by sending the file to the server.
+ const { jobId } = await Networking.PostToServer('/createDocument', { file_path: local_file_path });
+
+ // Poll the server for progress updates.
+ let inProgress: boolean = true;
+ let result: any = null;
+ while (inProgress) {
+ // Polling interval for status updates.
+ await new Promise(resolve => setTimeout(resolve, 2000));
+
+ // Check if the job is completed.
+ const resultResponse = await Networking.FetchFromServer(`/getResult/${jobId}`);
+ const resultResponseJson = JSON.parse(resultResponse);
+ if (resultResponseJson.status === 'completed') {
+ console.log('Result here:', resultResponseJson);
+ result = resultResponseJson;
+ break;
+ }
+
+ // Fetch progress information and update the progress callback.
+ const progressResponse = await Networking.FetchFromServer(`/getProgress/${jobId}`);
+ const progressResponseJson = JSON.parse(progressResponse);
+ if (progressResponseJson) {
+ const progress = progressResponseJson.progress;
+ const step = progressResponseJson.step;
+ progressCallback(progress, step);
+ }
+ }
+
+ // Once completed, process the document and add it to the vectorstore.
+ console.log('Document JSON:', result);
+ this.documents.push(result);
+ await this.indexDocument(result);
+ console.log(`Document added: ${result.file_name}`);
+
+ // Update document metadata such as summary, purpose, and vectorstore ID.
+ doc.summary = result.summary;
+ doc.ai_doc_id = result.doc_id;
+ this._doc_ids.push(result.doc_id);
+ doc.ai_purpose = result.purpose;
+
+ if (!doc.vectorstore_id) {
+ doc.vectorstore_id = JSON.stringify([this._id]);
+ } else {
+ doc.vectorstore_id = JSON.stringify(JSON.parse(StrCast(doc.vectorstore_id)).concat([this._id]));
+ }
+
+ if (!doc.chunk_simpl) {
+ doc.chunk_simpl = JSON.stringify({ chunks: [] });
+ }
+
+ // Process each chunk of the document and update the document's chunk_simpl field.
+ result.chunks.forEach((chunk: RAGChunk) => {
+ const chunkToAdd = {
+ chunkId: chunk.id,
+ startPage: chunk.metadata.start_page,
+ endPage: chunk.metadata.end_page,
+ location: chunk.metadata.location,
+ chunkType: chunk.metadata.type as CHUNK_TYPE,
+ text: chunk.metadata.text,
+ };
+ const new_chunk_simpl = JSON.parse(StrCast(doc.chunk_simpl));
+ new_chunk_simpl.chunks = new_chunk_simpl.chunks.concat(chunkToAdd);
+ doc.chunk_simpl = JSON.stringify(new_chunk_simpl);
+ });
+
+ // Mark the document status as completed.
+ doc.ai_document_status = 'COMPLETED';
+ }
+ }
+ }
+
+ /**
+ * Indexes the processed document by uploading the document's vector chunks to the Pinecone index.
+ * @param document The processed document containing its chunks and metadata.
+ */
+ private async indexDocument(document: any) {
+ console.log('Uploading vectors to content namespace...');
+
+ // Prepare Pinecone records for each chunk in the document.
+ const pineconeRecords: PineconeRecord[] = (document.chunks as RAGChunk[]).map(chunk => ({
+ id: chunk.id,
+ values: chunk.values,
+ metadata: { ...chunk.metadata } as RecordMetadata,
+ }));
+
+ // Upload the records to Pinecone.
+ await this.index.upsert(pineconeRecords);
+ }
+
+ /**
+ * Retrieves the top K document chunks relevant to the user's query.
+ * This involves embedding the query using Cohere, then querying Pinecone for matching vectors.
+ * @param query The search query string.
+ * @param topK The number of top results to return (default is 10).
+ * @returns A list of document chunks that match the query.
+ */
+ async retrieve(query: string, topK: number = 10): Promise<RAGChunk[]> {
+ console.log(`Retrieving chunks for query: ${query}`);
+ try {
+ // Generate an embedding for the query using Cohere.
+ const queryEmbeddingResponse: EmbedResponse = await this.cohere.embed({
+ texts: [query],
+ model: 'embed-english-v3.0',
+ inputType: 'search_query',
+ });
+
+ let queryEmbedding: number[];
+
+ // Extract the embedding from the response.
+ if (Array.isArray(queryEmbeddingResponse.embeddings)) {
+ queryEmbedding = queryEmbeddingResponse.embeddings[0];
+ } else if (queryEmbeddingResponse.embeddings && 'embeddings' in queryEmbeddingResponse.embeddings) {
+ queryEmbedding = (queryEmbeddingResponse.embeddings as { embeddings: number[][] }).embeddings[0];
+ } else {
+ throw new Error('Invalid embedding response format');
+ }
+
+ if (!Array.isArray(queryEmbedding)) {
+ throw new Error('Query embedding is not an array');
+ }
+
+ // Query the Pinecone index using the embedding and filter by document IDs.
+ const queryResponse: QueryResponse = await this.index.query({
+ vector: queryEmbedding,
+ filter: {
+ doc_id: { $in: this._doc_ids },
+ },
+ topK,
+ includeValues: true,
+ includeMetadata: true,
+ });
+
+ // Map the results into RAGChunks and return them.
+ return queryResponse.matches.map(
+ match =>
+ ({
+ id: match.id,
+ values: match.values as number[],
+ metadata: match.metadata as {
+ text: string;
+ type: string;
+ original_document: string;
+ file_path: string;
+ doc_id: string;
+ location: string;
+ start_page: number;
+ end_page: number;
+ },
+ }) as RAGChunk
+ );
+ } catch (error) {
+ console.error(`Error retrieving chunks: ${error}`);
+ return [];
+ }
+ }
+}