/** * @file AssistantManager.ts * @description This file defines the AssistantManager class, responsible for managing various * API routes related to the Assistant functionality. It provides features such as file handling, * web scraping, and integration with third-party APIs like OpenAI and Google Custom Search. * It also handles job tracking and progress reporting for tasks like document creation and web scraping. * Utility functions for path manipulation and file operations are included, along with * a mechanism for handling retry logic during API calls. */ import { Readability } from '@mozilla/readability'; import axios from 'axios'; import { spawn } from 'child_process'; import * as fs from 'fs'; import { writeFile } from 'fs'; import { google } from 'googleapis'; import { JSDOM } from 'jsdom'; import * as path from 'path'; import * as puppeteer from 'puppeteer'; import { promisify } from 'util'; import * as uuid from 'uuid'; import { AI_Document } from '../../client/views/nodes/chatbot/types/types'; import { Method } from '../RouteManager'; import { filesDirectory, publicDirectory } from '../SocketData'; import ApiManager, { Registration } from './ApiManager'; // Enumeration of directories where different file types are stored export enum Directory { parsed_files = 'parsed_files', images = 'images', videos = 'videos', pdfs = 'pdfs', text = 'text', pdf_thumbnails = 'pdf_thumbnails', audio = 'audio', csv = 'csv', chunk_images = 'chunk_images', scrape_images = 'scrape_images', } // In-memory job tracking const jobResults: { [key: string]: unknown } = {}; const jobProgress: { [key: string]: unknown } = {}; /** * Constructs a normalized path to a file in the server's file system. * @param directory The directory where the file is stored. * @param filename The name of the file. * @returns The full normalized path to the file. */ export function serverPathToFile(directory: Directory, filename: string) { return path.normalize(`${filesDirectory}/${directory}/${filename}`); } /** * Constructs a normalized path to a directory in the server's file system. * @param directory The directory to access. * @returns The full normalized path to the directory. */ export function pathToDirectory(directory: Directory) { return path.normalize(`${filesDirectory}/${directory}`); } /** * Constructs the client-accessible URL for a file. * @param directory The directory where the file is stored. * @param filename The name of the file. * @returns The URL path to the file. */ export function clientPathToFile(directory: Directory, filename: string) { return `/files/${directory}/${filename}`; } // Promisified versions of filesystem functions const writeFileAsync = promisify(writeFile); const readFileAsync = promisify(fs.readFile); /** * Class responsible for handling various API routes related to the Assistant functionality. * This class extends `ApiManager` and handles registration of routes and secure request handlers. */ export default class AssistantManager extends ApiManager { /** * Registers all API routes and initializes necessary services like OpenAI and Google Custom Search. * @param register The registration method to register routes and handlers. */ protected initialize(register: Registration): void { // Initialize Google Custom Search API const customsearch = google.customsearch('v1'); // Register Wikipedia summary API route register({ method: Method.POST, subscription: '/getWikipediaSummary', secureHandler: async ({ req, res }) => { const { title } = req.body; try { // Fetch summary from Wikipedia using axios const response = await axios.get('https://en.wikipedia.org/w/api.php', { params: { action: 'query', list: 'search', srsearch: title, format: 'json', }, }); const summary = response.data.query.search[0]?.snippet || 'No article found with that title.'; res.send({ text: summary }); } catch (error) { console.error('Error retrieving Wikipedia summary:', error); res.status(500).send({ error: 'Error retrieving article summary from Wikipedia.', }); } }, }); // Register Google Web Search Results API route register({ method: Method.POST, subscription: '/getWebSearchResults', secureHandler: async ({ req, res }) => { const { query, max_results } = req.body; try { // Fetch search results using Google Custom Search API const response = await customsearch.cse.list({ q: query, cx: process.env._CLIENT_GOOGLE_SEARCH_ENGINE_ID, key: process.env._CLIENT_GOOGLE_API_KEY, safe: 'active', num: max_results, }); const results = response.data.items?.map(item => ({ url: item.link, snippet: item.snippet, })) || []; res.send({ results }); } catch (error) { console.error('Error performing web search:', error); res.status(500).send({ error: 'Failed to perform web search', }); } }, }); // Axios instance with custom headers for scraping const axiosInstance = axios.create({ headers: { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', }, }); /** * Utility function to introduce delay (used for retries). * @param ms Delay in milliseconds. */ const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); /** * Function to fetch a URL with retry logic, handling rate limits. * Retries a request if it fails due to rate limits (HTTP status 429). * @param url The URL to fetch. * @param retries The number of retry attempts. * @param backoff Initial backoff time in milliseconds. */ const fetchWithRetry = async (url: string, retries = 3, backoff = 300): Promise => { try { const response = await axiosInstance.get(url); return response.data; } catch (error) { if (retries > 0 && (error as { response: { status: number } }).response?.status === 429) { // bcz: don't know the error type console.log(`Rate limited. Retrying in ${backoff}ms...`); await delay(backoff); return fetchWithRetry(url, retries - 1, backoff * 2); } // prettier-ignore throw error; } }; // Register a proxy fetch API route register({ method: Method.POST, subscription: '/proxyFetch', secureHandler: async ({ req, res }) => { const { url } = req.body; if (!url) { res.status(400).send({ error: 'No URL provided' }); return; } try { const data = await fetchWithRetry(url); res.send({ data }); } catch (error) { console.error('Error fetching the URL:', error); res.status(500).send({ error: 'Failed to fetch the URL', }); } }, }); // Register an API route to scrape website content using Puppeteer and JSDOM register({ method: Method.POST, subscription: '/scrapeWebsite', secureHandler: async ({ req, res }) => { const { url } = req.body; try { // Launch Puppeteer browser to navigate to the webpage const browser = await puppeteer.launch({ args: ['--no-sandbox', '--disable-setuid-sandbox'], }); const page = await browser.newPage(); await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'); await page.goto(url, { waitUntil: 'networkidle2' }); // Extract HTML content const htmlContent = await page.content(); await browser.close(); // Parse HTML content using JSDOM const dom = new JSDOM(htmlContent, { url }); // Extract readable content using Mozilla's Readability API const reader = new Readability(dom.window.document); const article = reader.parse(); if (article) { const plainText = article.textContent; res.send({ website_plain_text: plainText }); } else { res.status(500).send({ error: 'Failed to extract readable content' }); } } catch (error) { console.error('Error scraping website:', error); res.status(500).send({ error: 'Failed to scrape website', }); } }, }); register({ method: Method.POST, subscription: '/createDocument', secureHandler: async ({ req, res }) => { const { file_path } = req.body; const public_path = path.join(publicDirectory, file_path); // Resolve the file path in the public directory const file_name = path.basename(file_path); // Extract the file name from the path try { // Read the file data and encode it as base64 const file_data: string = fs.readFileSync(public_path, { encoding: 'base64' }); // Generate a unique job ID for tracking const jobId = uuid.v4(); // Spawn the Python process and track its progress/output // eslint-disable-next-line no-use-before-define spawnPythonProcess(jobId, file_name, file_data); // Send the job ID back to the client for tracking res.send({ jobId }); } catch (error) { console.error('Error initiating document creation:', error); res.status(500).send({ error: 'Failed to initiate document creation', }); } }, }); // Register an API route to check the progress of a document creation job register({ method: Method.GET, subscription: '/getProgress/:jobId', secureHandler: async ({ req, res }) => { const { jobId } = req.params; // Get the job ID from the URL parameters // Check if the job progress is available if (jobProgress[jobId]) { res.json(jobProgress[jobId]); } else { res.json({ step: 'Processing Document...', progress: '0', }); } }, }); // Register an API route to get the final result of a document creation job register({ method: Method.GET, subscription: '/getResult/:jobId', secureHandler: async ({ req, res }) => { const { jobId } = req.params; // Get the job ID from the URL parameters // Check if the job result is available if (jobResults[jobId]) { const result = jobResults[jobId] as AI_Document & { status: string }; // If the result contains image or table chunks, save the base64 data as image files if (result.chunks && Array.isArray(result.chunks)) { await Promise.all( result.chunks.map(chunk => { if (chunk.metadata && (chunk.metadata.type === 'image' || chunk.metadata.type === 'table')) { const files_directory = '/files/chunk_images/'; const directory = path.join(publicDirectory, files_directory); // Ensure the directory exists or create it if (!fs.existsSync(directory)) { fs.mkdirSync(directory); } const fileName = path.basename(chunk.metadata.file_path); // Get the file name from the path const filePath = path.join(directory, fileName); // Create the full file path // Check if the chunk contains base64 encoded data if (chunk.metadata.base64_data) { // Decode the base64 data and write it to a file const buffer = Buffer.from(chunk.metadata.base64_data, 'base64'); fs.promises.writeFile(filePath, buffer).then(() => { // Update the file path in the chunk's metadata chunk.metadata.file_path = path.join(files_directory, fileName); chunk.metadata.base64_data = undefined; // Remove the base64 data from the metadata }); } else { console.warn(`No base64_data found for chunk: ${fileName}`); } } }) ); result.status = 'completed'; } else { result.status = 'pending'; } res.json(result); // Send the result back to the client } else { res.status(202).send({ status: 'pending' }); } }, }); // Register an API route to format chunks (e.g., text or image chunks) for display register({ method: Method.POST, subscription: '/formatChunks', secureHandler: async ({ req, res }) => { const { relevantChunks } = req.body; // Get the relevant chunks from the request body // Initialize an array to hold the formatted content const content: { type: string; text?: string; image_url?: { url: string } }[] = [{ type: 'text', text: '' }]; await Promise.all( relevantChunks.map((chunk: { id: string; metadata: { type: string; text: TimeRanges; file_path: string } }) => { // Format each chunk by adding its metadata and content content.push({ type: 'text', text: ``, }); // If the chunk is an image or table, read the corresponding file and encode it as base64 if (chunk.metadata.type === 'image' || chunk.metadata.type === 'table') { try { const filePath = serverPathToFile(Directory.chunk_images, chunk.metadata.file_path); // Get the file path readFileAsync(filePath).then(imageBuffer => { const base64Image = imageBuffer.toString('base64'); // Convert the image to base64 // Add the base64-encoded image to the content array if (base64Image) { content.push({ type: 'image_url', image_url: { url: `data:image/jpeg;base64,${base64Image}`, }, }); } else { console.log(`Failed to encode image for chunk ${chunk.id}`); } }); } catch (error) { console.error(`Error reading image file for chunk ${chunk.id}:`, error); } } // Add the chunk's text content to the formatted content content.push({ type: 'text', text: `${chunk.metadata.text}\n\n` }); }) ); content.push({ type: 'text', text: '' }); // Send the formatted content back to the client res.send({ formattedChunks: content }); }, }); // Register an API route to create and save a CSV file on the server register({ method: Method.POST, subscription: '/createCSV', secureHandler: async ({ req, res }) => { const { filename, data } = req.body; // Validate that both the filename and data are provided if (!filename || !data) { res.status(400).send({ error: 'Filename and data fields are required.' }); return; } try { // Generate a UUID for the file to ensure unique naming const uuidv4 = uuid.v4(); const fullFilename = `${uuidv4}-${filename}`; // Prefix the file name with the UUID // Get the full server path where the file will be saved const serverFilePath = serverPathToFile(Directory.csv, fullFilename); // Write the CSV data (which is a raw string) to the file await writeFileAsync(serverFilePath, data, 'utf8'); // Construct the client-accessible URL for the file const fileUrl = clientPathToFile(Directory.csv, fullFilename); // Send the file URL and UUID back to the client res.send({ fileUrl, id: uuidv4 }); } catch (error) { console.error('Error creating CSV file:', error); res.status(500).send({ error: 'Failed to create CSV file.', }); } }, }); } } function spawnPythonProcess(jobId: string, file_name: string, file_data: string) { const venvPath = path.join(__dirname, '../chunker/venv'); const requirementsPath = path.join(__dirname, '../chunker/requirements.txt'); const pythonScriptPath = path.join(__dirname, '../chunker/pdf_chunker.py'); function runPythonScript() { const pythonPath = process.platform === 'win32' ? path.join(venvPath, 'Scripts', 'python') : path.join(venvPath, 'bin', 'python3'); const pythonProcess = spawn(pythonPath, [pythonScriptPath, jobId, file_name, file_data]); let pythonOutput = ''; let stderrOutput = ''; pythonProcess.stdout.on('data', data => { pythonOutput += data.toString(); }); pythonProcess.stderr.on('data', data => { stderrOutput += data.toString(); const lines = stderrOutput.split('\n'); lines.forEach(line => { if (line.trim()) { try { const parsedOutput = JSON.parse(line); if (parsedOutput.job_id && parsedOutput.progress !== undefined) { jobProgress[parsedOutput.job_id] = { step: parsedOutput.step, progress: parsedOutput.progress, }; } else if (parsedOutput.progress !== undefined) { jobProgress[jobId] = { step: parsedOutput.step, progress: parsedOutput.progress, }; } } catch (err) { console.error('Progress log from Python:', line, err); } } }); }); pythonProcess.on('close', code => { if (code === 0) { try { const finalResult = JSON.parse(pythonOutput); jobResults[jobId] = finalResult; jobProgress[jobId] = { step: 'Complete', progress: 100 }; } catch (err) { console.error('Error parsing final JSON result:', err); } } else { console.error(`Python process exited with code ${code}`); jobResults[jobId] = { error: 'Python process failed' }; } }); } // Check if venv exists if (!fs.existsSync(venvPath)) { console.log('Virtual environment not found. Creating and setting up...'); // Create venv const createVenvProcess = spawn('python', ['-m', 'venv', venvPath]); createVenvProcess.on('close', code => { if (code !== 0) { console.error(`Failed to create virtual environment. Exit code: ${code}`); return; } console.log('Virtual environment created. Installing requirements...'); // Determine the pip path based on the OS const pipPath = process.platform === 'win32' ? path.join(venvPath, 'Scripts', 'pip.exe') : path.join(venvPath, 'bin', 'pip3'); // Try 'pip3' for Unix-like systems if (!fs.existsSync(pipPath)) { console.error(`pip executable not found at ${pipPath}`); return; } // Install requirements const installRequirementsProcess = spawn(pipPath, ['install', '-r', requirementsPath]); installRequirementsProcess.stdout.on('data', data => { console.log(`pip stdout: ${data}`); }); installRequirementsProcess.stderr.on('data', data => { console.error(`pip stderr: ${data}`); }); installRequirementsProcess.on('error', error => { console.error(`Error starting pip process: ${error}`); }); installRequirementsProcess.on('close', closecode => { if (closecode !== 0) { console.error(`Failed to install requirements. Exit code: ${closecode}`); return; } console.log('Requirements installed. Running Python script...'); runPythonScript(); }); }); } else { console.log('Virtual environment found. Running Python script...'); runPythonScript(); } }