/** * @file AssistantManager.ts * @description This file defines the AssistantManager class, responsible for managing various * API routes related to the Assistant functionality. It provides features such as file handling, * web scraping, and integration with third-party APIs like OpenAI and Google Custom Search. * It also handles job tracking and progress reporting for tasks like document creation and web scraping. * Utility functions for path manipulation and file operations are included, along with * a mechanism for handling retry logic during API calls. */ import { Readability } from '@mozilla/readability'; import axios from 'axios'; import { spawn } from 'child_process'; import * as fs from 'fs'; import { writeFile } from 'fs'; import { google } from 'googleapis'; import { JSDOM } from 'jsdom'; import OpenAI from 'openai'; import * as path from 'path'; import * as puppeteer from 'puppeteer'; import { promisify } from 'util'; import * as uuid from 'uuid'; import { AI_Document } from '../../client/views/nodes/chatbot/types/types'; import { DashUploadUtils } from '../DashUploadUtils'; import { Method } from '../RouteManager'; import { filesDirectory, publicDirectory } from '../SocketData'; import ApiManager, { Registration } from './ApiManager'; import { env } from 'process'; // Enumeration of directories where different file types are stored export enum Directory { parsed_files = 'parsed_files', images = 'images', videos = 'videos', pdfs = 'pdfs', text = 'text', pdf_thumbnails = 'pdf_thumbnails', audio = 'audio', csv = 'csv', chunk_images = 'chunk_images', scrape_images = 'scrape_images', } // In-memory job tracking const jobResults: { [key: string]: unknown } = {}; const jobProgress: { [key: string]: unknown } = {}; /** * Constructs a normalized path to a file in the server's file system. * @param directory The directory where the file is stored. * @param filename The name of the file. * @returns The full normalized path to the file. */ export function serverPathToFile(directory: Directory, filename: string) { return path.normalize(`${filesDirectory}/${directory}/${filename}`); } /** * Constructs a normalized path to a directory in the server's file system. * @param directory The directory to access. * @returns The full normalized path to the directory. */ export function pathToDirectory(directory: Directory) { return path.normalize(`${filesDirectory}/${directory}`); } /** * Constructs the client-accessible URL for a file. * @param directory The directory where the file is stored. * @param filename The name of the file. * @returns The URL path to the file. */ export function clientPathToFile(directory: Directory, filename: string) { return `/files/${directory}/${filename}`; } // Promisified versions of filesystem functions const writeFileAsync = promisify(writeFile); const readFileAsync = promisify(fs.readFile); /** * Class responsible for handling various API routes related to the Assistant functionality. * This class extends `ApiManager` and handles registration of routes and secure request handlers. */ export default class AssistantManager extends ApiManager { /** * Registers all API routes and initializes necessary services like OpenAI and Google Custom Search. * @param register The registration method to register routes and handlers. */ protected initialize(register: Registration): void { // Initialize Google Custom Search API const customsearch = google.customsearch('v1'); const openai = new OpenAI({ apiKey: env.OPENAI_API_KEY }); // Register Wikipedia summary API route register({ method: Method.POST, subscription: '/getWikipediaSummary', secureHandler: async ({ req, res }) => { const { title } = req.body; try { // Fetch summary from Wikipedia using axios const response = await axios.get('https://en.wikipedia.org/w/api.php', { params: { action: 'query', list: 'search', srsearch: title, format: 'json', }, }); const summary = response.data.query.search[0]?.snippet || 'No article found with that title.'; res.send({ text: summary }); } catch (error) { console.error('Error retrieving Wikipedia summary:', error); res.status(500).send({ error: 'Error retrieving article summary from Wikipedia.', }); } }, }); // Register an API route to retrieve web search results using Google Custom Search // This route filters results by checking their x-frame-options headers for security purposes register({ method: Method.POST, subscription: '/getWebSearchResults', secureHandler: async ({ req, res }) => { const { query, max_results } = req.body; const MIN_VALID_RESULTS_RATIO = 0.75; // 3/4 threshold let startIndex = 1; // Start at the first result initially const fetchSearchResults = async (start: number) => { return customsearch.cse.list({ q: query, cx: process.env._CLIENT_GOOGLE_SEARCH_ENGINE_ID, key: process.env._CLIENT_GOOGLE_API_KEY, safe: 'active', num: max_results, start, // This controls which result index the search starts from }); }; const filterResultsByXFrameOptions = async ( results: { url: string | null | undefined; snippet: string | null | undefined; }[] ) => { const filteredResults = await Promise.all( results .filter(result => result.url) .map(async result => { try { const urlResponse = await axios.head(result.url!, { timeout: 5000 }); const xFrameOptions = urlResponse.headers['x-frame-options']; if (xFrameOptions && xFrameOptions.toUpperCase() === 'SAMEORIGIN') { return result; } } catch (error) { console.error(`Error checking x-frame-options for URL: ${result.url}`, error); } return null; // Exclude the result if it doesn't match }) ); return filteredResults.filter(result => result !== null); // Remove null results }; try { // Fetch initial search results let response = await fetchSearchResults(startIndex); const initialResults = response.data.items?.map(item => ({ url: item.link, snippet: item.snippet, })) || []; // Filter the initial results let validResults = await filterResultsByXFrameOptions(initialResults); // If valid results are less than 3/4 of max_results, fetch more results while (validResults.length < max_results * MIN_VALID_RESULTS_RATIO) { // Increment the start index by the max_results to fetch the next set of results startIndex += max_results; response = await fetchSearchResults(startIndex); const additionalResults = response.data.items?.map(item => ({ url: item.link, snippet: item.snippet, })) || []; const additionalValidResults = await filterResultsByXFrameOptions(additionalResults); validResults = [...validResults, ...additionalValidResults]; // Combine valid results // Break if no more results are available if (additionalValidResults.length === 0 || response.data.items?.length === 0) { break; } } // Return the filtered valid results res.send({ results: validResults.slice(0, max_results) }); // Limit the results to max_results } catch (error) { console.error('Error performing web search:', error); res.status(500).send({ error: 'Failed to perform web search', }); } }, }); /** * Converts a video file to audio format using ffmpeg. * @param videoPath The path to the input video file. * @param outputAudioPath The path to the output audio file. * @returns A promise that resolves when the conversion is complete. */ function convertVideoToAudio(videoPath: string, outputAudioPath: string): Promise { return new Promise((resolve, reject) => { const ffmpegProcess = spawn('ffmpeg', [ '-i', videoPath, // Input file '-vn', // No video '-acodec', 'pcm_s16le', // Audio codec '-ac', '1', // Number of audio channels '-ar', '16000', // Audio sampling frequency '-f', 'wav', // Output format outputAudioPath, // Output file ]); ffmpegProcess.on('error', error => { console.error('Error running ffmpeg:', error); reject(error); }); ffmpegProcess.on('close', code => { if (code === 0) { console.log('Audio extraction complete:', outputAudioPath); resolve(); } else { reject(new Error(`ffmpeg exited with code ${code}`)); } }); }); } // Register an API route to process a media file (audio or video) // Extracts audio from video files, transcribes the audio using OpenAI Whisper, and provides a summary register({ method: Method.POST, subscription: '/processMediaFile', secureHandler: async ({ req, res }) => { const { fileName } = req.body; // Ensure the filename is provided if (!fileName) { res.status(400).send({ error: 'Filename is required' }); return; } try { // Determine the file type and location const isAudio = fileName.toLowerCase().endsWith('.mp3'); const directory = isAudio ? Directory.audio : Directory.videos; const filePath = serverPathToFile(directory, fileName); // Check if the file exists if (!fs.existsSync(filePath)) { res.status(404).send({ error: 'File not found' }); return; } console.log(`Processing ${isAudio ? 'audio' : 'video'} file: ${fileName}`); // Step 1: Extract audio if it's a video let audioPath = filePath; if (!isAudio) { const audioFileName = `${path.basename(fileName, path.extname(fileName))}.wav`; audioPath = path.join(pathToDirectory(Directory.audio), audioFileName); console.log('Extracting audio from video...'); await convertVideoToAudio(filePath, audioPath); } // Step 2: Transcribe audio using OpenAI Whisper console.log('Transcribing audio...'); const transcription = await openai.audio.transcriptions.create({ file: fs.createReadStream(audioPath), model: 'whisper-1', response_format: 'verbose_json', timestamp_granularities: ['segment'], }); console.log('Audio transcription complete.'); // Step 3: Extract concise JSON console.log('Extracting concise JSON...'); const originalSegments = transcription.segments?.map((segment, index) => ({ index: index.toString(), text: segment.text, start: segment.start, end: segment.end, })); interface ConciseSegment { text: string; indexes: string[]; start: number | null; end: number | null; } const combinedSegments = []; let currentGroup: ConciseSegment = { text: '', indexes: [], start: null, end: null }; let currentDuration = 0; originalSegments?.forEach(segment => { const segmentDuration = segment.end - segment.start; if (currentDuration + segmentDuration <= 4000) { // Add segment to the current group currentGroup.text += (currentGroup.text ? ' ' : '') + segment.text; currentGroup.indexes.push(segment.index); if (currentGroup.start === null) { currentGroup.start = segment.start; } currentGroup.end = segment.end; currentDuration += segmentDuration; } else { // Push the current group and start a new one combinedSegments.push({ ...currentGroup }); currentGroup = { text: segment.text, indexes: [segment.index], start: segment.start, end: segment.end, }; currentDuration = segmentDuration; } }); // Push the final group if it has content if (currentGroup.text) { combinedSegments.push({ ...currentGroup }); } const lastSegment = combinedSegments[combinedSegments.length - 1]; // Check if the last segment is too short and combine it with the second last if (combinedSegments.length > 1 && lastSegment.end && lastSegment.start) { const secondLastSegment = combinedSegments[combinedSegments.length - 2]; const lastDuration = lastSegment.end - lastSegment.start; if (lastDuration < 30) { // Combine the last segment with the second last secondLastSegment.text += (secondLastSegment.text ? ' ' : '') + lastSegment.text; secondLastSegment.indexes = secondLastSegment.indexes.concat(lastSegment.indexes); secondLastSegment.end = lastSegment.end; // Remove the last segment from the array combinedSegments.pop(); } } console.log('Segments combined successfully.'); console.log('Generating summary using GPT-4...'); const combinedText = combinedSegments.map(segment => segment.text).join(' '); let summary = ''; try { const completion = await openai.chat.completions.create({ messages: [{ role: 'system', content: `Summarize the following text in a concise paragraph:\n\n${combinedText}` }], model: 'gpt-4o', }); console.log('Summary generation complete.'); summary = completion.choices[0].message.content ?? 'Summary could not be generated.'; } catch (summaryError) { console.error('Error generating summary:', summaryError); summary = 'Summary could not be generated.'; } // Step 5: Return the JSON result res.send({ full: originalSegments, condensed: combinedSegments, summary }); } catch (error) { console.error('Error processing media file:', error); res.status(500).send({ error: 'Failed to process media file' }); } }, }); // Axios instance with custom headers for scraping const axiosInstance = axios.create({ headers: { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', }, }); /** * Utility function to introduce delay (used for retries). * @param ms Delay in milliseconds. */ const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); /** * Function to fetch a URL with retry logic, handling rate limits. * Retries a request if it fails due to rate limits (HTTP status 429). * @param url The URL to fetch. * @param retries The number of retry attempts. * @param backoff Initial backoff time in milliseconds. */ const fetchWithRetry = async (url: string, retries = 3, backoff = 300): Promise => { try { const response = await axiosInstance.get(url); return response.data; } catch (error) { if (retries > 0 && (error as { response: { status: number } }).response?.status === 429) { // bcz: don't know the error type console.log(`Rate limited. Retrying in ${backoff}ms...`); await delay(backoff); return fetchWithRetry(url, retries - 1, backoff * 2); } // prettier-ignore throw error; } }; // Register an API route to generate an image using OpenAI's DALL-E model // Uploads the generated image to the server and provides a URL for access register({ method: Method.POST, subscription: '/generateImage', secureHandler: async ({ req, res }) => { const { image_prompt } = req.body; if (!image_prompt) { res.status(400).send({ error: 'No prompt provided' }); return; } try { const image = await openai.images.generate({ model: 'dall-e-3', prompt: image_prompt, response_format: 'url' }); console.log(image); const url = image.data?.[0].url; const result = url ? await DashUploadUtils.UploadImage(url) : { error: 'Image generation failed' }; res.send({ result, url }); } catch (error) { console.error('Error fetching the URL:', error); res.status(500).send({ error: 'Failed to fetch the URL', }); } }, }); // Register an API route to fetch data from a URL using a proxy with retry logic // Useful for bypassing rate limits or scraping inaccessible data register({ method: Method.POST, subscription: '/proxyFetch', secureHandler: async ({ req, res }) => { const { url } = req.body; if (!url) { res.status(400).send({ error: 'No URL provided' }); return; } try { const data = await fetchWithRetry(url); res.send({ data }); } catch (error) { console.error('Error fetching the URL:', error); res.status(500).send({ error: 'Failed to fetch the URL', }); } }, }); // Register an API route to scrape website content using Puppeteer and JSDOM // Extracts and returns readable content from a given URL register({ method: Method.POST, subscription: '/scrapeWebsite', secureHandler: async ({ req, res }) => { const { url } = req.body; try { // Launch Puppeteer browser to navigate to the webpage const browser = await puppeteer.launch({ args: ['--no-sandbox', '--disable-setuid-sandbox'], }); const page = await browser.newPage(); await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'); await page.goto(url, { waitUntil: 'networkidle2' }); // Extract HTML content const htmlContent = await page.content(); await browser.close(); // Parse HTML content using JSDOM const dom = new JSDOM(htmlContent, { url }); // Extract readable content using Mozilla's Readability API const reader = new Readability(dom.window.document); const article = reader.parse(); if (article) { const plainText = article.textContent; res.send({ website_plain_text: plainText }); } else { res.status(500).send({ error: 'Failed to extract readable content' }); } } catch (error) { console.error('Error scraping website:', error); res.status(500).send({ error: 'Failed to scrape website', }); } }, }); // Register an API route to create a document and start a background job for processing // Uses Python scripts to process files and generate document chunks for further use register({ method: Method.POST, subscription: '/createDocument', secureHandler: async ({ req, res }) => { const { file_path } = req.body; const public_path = path.join(publicDirectory, file_path); // Resolve the file path in the public directory const file_name = path.basename(file_path); // Extract the file name from the path try { // Read the file data and encode it as base64 const file_data = fs.readFileSync(public_path, { encoding: 'base64' }); // Generate a unique job ID for tracking const jobId = uuid.v4(); // Spawn the Python process and track its progress/output // eslint-disable-next-line no-use-before-define spawnPythonProcess(jobId, public_path); // Send the job ID back to the client for tracking res.send({ jobId }); } catch (error) { console.error('Error initiating document creation:', error); res.status(500).send({ error: 'Failed to initiate document creation', }); } }, }); // Register an API route to check the progress of a document creation job // Returns the current step and progress percentage register({ method: Method.GET, subscription: '/getProgress/:jobId', secureHandler: async ({ req, res }) => { const { jobId } = req.params; // Get the job ID from the URL parameters // Check if the job progress is available if (jobProgress[jobId]) { res.json(jobProgress[jobId]); } else { res.json({ step: 'Processing Document...', progress: '0', }); } }, }); // Register an API route to retrieve the final result of a document creation job // Returns the processed data or an error status if the job is incomplete register({ method: Method.GET, subscription: '/getResult/:jobId', secureHandler: async ({ req, res }) => { const { jobId } = req.params; if (jobResults[jobId]) { const result = jobResults[jobId] as AI_Document & { status: string }; if (result.chunks && Array.isArray(result.chunks)) { result.status = 'completed'; } else { result.status = 'pending'; } res.json(result); } else { res.status(202).send({ status: 'pending' }); } }, }); // Register an API route to format chunks of text or images for structured display // Converts raw chunk data into a structured format for frontend consumption register({ method: Method.POST, subscription: '/formatChunks', secureHandler: async ({ req, res }) => { const { relevantChunks } = req.body; // Get the relevant chunks from the request body // Initialize an array to hold the formatted content const content: { type: string; text?: string; image_url?: { url: string } }[] = [{ type: 'text', text: '' }]; await Promise.all( relevantChunks.map((chunk: { id: string; metadata: { type: string; text: TimeRanges; file_path: string } }) => { // Format each chunk by adding its metadata and content content.push({ type: 'text', text: ``, }); // If the chunk is an image or table, read the corresponding file and encode it as base64 if (chunk.metadata.type === 'image' || chunk.metadata.type === 'table') { try { const filePath = path.join(pathToDirectory(Directory.chunk_images), chunk.metadata.file_path); // Get the file path console.log(filePath); readFileAsync(filePath).then(imageBuffer => { const base64Image = imageBuffer.toString('base64'); // Convert the image to base64 // Add the base64-encoded image to the content array if (base64Image) { content.push({ type: 'image_url', image_url: { url: `data:image/jpeg;base64,${base64Image}`, }, }); } else { console.log(`Failed to encode image for chunk ${chunk.id}`); } }); } catch (error) { console.error(`Error reading image file for chunk ${chunk.id}:`, error); } } // Add the chunk's text content to the formatted content content.push({ type: 'text', text: `${chunk.metadata.text}\n\n` }); }) ); content.push({ type: 'text', text: '' }); // Send the formatted content back to the client res.send({ formattedChunks: content }); }, }); // Register an API route to create and save a CSV file on the server // Writes the CSV content to a unique file and provides a URL for download register({ method: Method.POST, subscription: '/createCSV', secureHandler: async ({ req, res }) => { const { filename, data } = req.body; // Validate that both the filename and data are provided if (!filename || !data) { res.status(400).send({ error: 'Filename and data fields are required.' }); return; } try { // Generate a UUID for the file to ensure unique naming const uuidv4 = uuid.v4(); const fullFilename = `${uuidv4}-${filename}`; // Prefix the file name with the UUID // Get the full server path where the file will be saved const serverFilePath = serverPathToFile(Directory.csv, fullFilename); // Write the CSV data (which is a raw string) to the file await writeFileAsync(serverFilePath, data, 'utf8'); // Construct the client-accessible URL for the file const fileUrl = clientPathToFile(Directory.csv, fullFilename); // Send the file URL and UUID back to the client res.send({ fileUrl, id: uuidv4 }); } catch (error) { console.error('Error creating CSV file:', error); res.status(500).send({ error: 'Failed to create CSV file.', }); } }, }); } } /** * Spawns a Python process to handle file processing tasks. * @param jobId The job ID for tracking progress. * @param file_name The name of the file to process. * @param file_path The filepath of the file to process. */ function spawnPythonProcess(jobId: string, file_path: string) { const venvPath = path.join(__dirname, '../chunker/venv'); const requirementsPath = path.join(__dirname, '../chunker/requirements.txt'); const pythonScriptPath = path.join(__dirname, '../chunker/pdf_chunker.py'); const outputDirectory = pathToDirectory(Directory.chunk_images); function runPythonScript() { const pythonPath = process.platform === 'win32' ? path.join(venvPath, 'Scripts', 'python') : path.join(venvPath, 'bin', 'python3'); const pythonProcess = spawn(pythonPath, [pythonScriptPath, jobId, file_path, outputDirectory]); let pythonOutput = ''; let stderrOutput = ''; pythonProcess.stdout.on('data', data => { pythonOutput += data.toString(); }); pythonProcess.stderr.on('data', data => { stderrOutput += data.toString(); const lines = stderrOutput.split('\n'); stderrOutput = lines.pop() || ''; // Save the last partial line back to stderrOutput lines.forEach(line => { if (line.trim()) { if (line.startsWith('PROGRESS:')) { const jsonString = line.substring('PROGRESS:'.length); try { const parsedOutput = JSON.parse(jsonString); if (parsedOutput.job_id && parsedOutput.progress !== undefined) { jobProgress[parsedOutput.job_id] = { step: parsedOutput.step, progress: parsedOutput.progress, }; } else if (parsedOutput.progress !== undefined) { jobProgress[jobId] = { step: parsedOutput.step, progress: parsedOutput.progress, }; } } catch (err) { console.error('Error parsing progress JSON:', jsonString, err); } } else { // Log other stderr output console.error('Python stderr:', line); } } }); }); pythonProcess.on('close', code => { if (code === 0) { try { const finalResult = JSON.parse(pythonOutput); jobResults[jobId] = finalResult; jobProgress[jobId] = { step: 'Complete', progress: 100 }; } catch (err) { console.error('Error parsing final JSON result:', err); jobResults[jobId] = { error: 'Failed to parse final result' }; } } else { console.error(`Python process exited with code ${code}`); // Check if there was an error message in stderr if (stderrOutput) { // Try to parse the last line as JSON const lines = stderrOutput.trim().split('\n'); const lastLine = lines[lines.length - 1]; try { const errorOutput = JSON.parse(lastLine); jobResults[jobId] = errorOutput; } catch { jobResults[jobId] = { error: 'Python process failed' }; } } else { jobResults[jobId] = { error: 'Python process failed' }; } } }); } // Check if venv exists if (!fs.existsSync(venvPath)) { console.log('Virtual environment not found. Creating and setting up...'); // Create venv // const createVenvProcess = spawn('python', ['-m', 'venv', venvPath]); const createVenvProcess = spawn('python3.10', ['-m', 'venv', venvPath]); createVenvProcess.on('close', code => { if (code !== 0) { console.error(`Failed to create virtual environment. Exit code: ${code}`); return; } console.log('Virtual environment created. Installing requirements...'); // Determine the pip path based on the OS const pipPath = process.platform === 'win32' ? path.join(venvPath, 'Scripts', 'pip.exe') : path.join(venvPath, 'bin', 'pip3'); // Try 'pip3' for Unix-like systems if (!fs.existsSync(pipPath)) { console.error(`pip executable not found at ${pipPath}`); return; } // Install requirements const installRequirementsProcess = spawn(pipPath, ['install', '-r', requirementsPath]); installRequirementsProcess.stdout.on('data', data => { console.log(`pip stdout: ${data}`); }); installRequirementsProcess.stderr.on('data', data => { console.error(`pip stderr: ${data}`); }); installRequirementsProcess.on('error', error => { console.error(`Error starting pip process: ${error}`); }); installRequirementsProcess.on('close', closecode => { if (closecode !== 0) { console.error(`Failed to install requirements. Exit code: ${closecode}`); return; } console.log('Requirements installed. Running Python script...'); runPythonScript(); }); }); } else { console.log('Virtual environment found. Running Python script...'); runPythonScript(); } }