diff options
Diffstat (limited to 'src/server/ApiManagers/AssistantManager.ts')
-rw-r--r-- | src/server/ApiManagers/AssistantManager.ts | 401 |
1 files changed, 340 insertions, 61 deletions
diff --git a/src/server/ApiManagers/AssistantManager.ts b/src/server/ApiManagers/AssistantManager.ts index 8447a4934..af25722a4 100644 --- a/src/server/ApiManagers/AssistantManager.ts +++ b/src/server/ApiManagers/AssistantManager.ts @@ -15,14 +15,17 @@ import * as fs from 'fs'; import { writeFile } from 'fs'; import { google } from 'googleapis'; import { JSDOM } from 'jsdom'; +import OpenAI from 'openai'; import * as path from 'path'; import * as puppeteer from 'puppeteer'; import { promisify } from 'util'; import * as uuid from 'uuid'; import { AI_Document } from '../../client/views/nodes/chatbot/types/types'; +import { DashUploadUtils } from '../DashUploadUtils'; import { Method } from '../RouteManager'; import { filesDirectory, publicDirectory } from '../SocketData'; import ApiManager, { Registration } from './ApiManager'; +import { env } from 'process'; // Enumeration of directories where different file types are stored export enum Directory { @@ -87,6 +90,7 @@ export default class AssistantManager extends ApiManager { protected initialize(register: Registration): void { // Initialize Google Custom Search API const customsearch = google.customsearch('v1'); + const openai = new OpenAI({ apiKey: env.OPENAI_API_KEY }); // Register Wikipedia summary API route register({ @@ -115,29 +119,86 @@ export default class AssistantManager extends ApiManager { }, }); - // Register Google Web Search Results API route + // Register an API route to retrieve web search results using Google Custom Search + // This route filters results by checking their x-frame-options headers for security purposes register({ method: Method.POST, subscription: '/getWebSearchResults', secureHandler: async ({ req, res }) => { const { query, max_results } = req.body; - try { - // Fetch search results using Google Custom Search API - const response = await customsearch.cse.list({ + const MIN_VALID_RESULTS_RATIO = 0.75; // 3/4 threshold + let startIndex = 1; // Start at the first result initially + const fetchSearchResults = async (start: number) => { + return customsearch.cse.list({ q: query, cx: process.env._CLIENT_GOOGLE_SEARCH_ENGINE_ID, key: process.env._CLIENT_GOOGLE_API_KEY, safe: 'active', num: max_results, + start, // This controls which result index the search starts from }); + }; + + const filterResultsByXFrameOptions = async ( + results: { + url: string | null | undefined; + snippet: string | null | undefined; + }[] + ) => { + const filteredResults = await Promise.all( + results + .filter(result => result.url) + .map(async result => { + try { + const urlResponse = await axios.head(result.url!, { timeout: 5000 }); + const xFrameOptions = urlResponse.headers['x-frame-options']; + if (xFrameOptions && xFrameOptions.toUpperCase() === 'SAMEORIGIN') { + return result; + } + } catch (error) { + console.error(`Error checking x-frame-options for URL: ${result.url}`, error); + } + return null; // Exclude the result if it doesn't match + }) + ); + return filteredResults.filter(result => result !== null); // Remove null results + }; - const results = + try { + // Fetch initial search results + let response = await fetchSearchResults(startIndex); + const initialResults = response.data.items?.map(item => ({ url: item.link, snippet: item.snippet, })) || []; - res.send({ results }); + // Filter the initial results + let validResults = await filterResultsByXFrameOptions(initialResults); + + // If valid results are less than 3/4 of max_results, fetch more results + while (validResults.length < max_results * MIN_VALID_RESULTS_RATIO) { + // Increment the start index by the max_results to fetch the next set of results + startIndex += max_results; + response = await fetchSearchResults(startIndex); + + const additionalResults = + response.data.items?.map(item => ({ + url: item.link, + snippet: item.snippet, + })) || []; + + const additionalValidResults = await filterResultsByXFrameOptions(additionalResults); + validResults = [...validResults, ...additionalValidResults]; // Combine valid results + + // Break if no more results are available + if (additionalValidResults.length === 0 || response.data.items?.length === 0) { + break; + } + } + + // Return the filtered valid results + res.send({ results: validResults.slice(0, max_results) }); // Limit the results to max_results } catch (error) { console.error('Error performing web search:', error); res.status(500).send({ @@ -147,6 +208,187 @@ export default class AssistantManager extends ApiManager { }, }); + /** + * Converts a video file to audio format using ffmpeg. + * @param videoPath The path to the input video file. + * @param outputAudioPath The path to the output audio file. + * @returns A promise that resolves when the conversion is complete. + */ + function convertVideoToAudio(videoPath: string, outputAudioPath: string): Promise<void> { + return new Promise((resolve, reject) => { + const ffmpegProcess = spawn('ffmpeg', [ + '-i', + videoPath, // Input file + '-vn', // No video + '-acodec', + 'pcm_s16le', // Audio codec + '-ac', + '1', // Number of audio channels + '-ar', + '16000', // Audio sampling frequency + '-f', + 'wav', // Output format + outputAudioPath, // Output file + ]); + + ffmpegProcess.on('error', error => { + console.error('Error running ffmpeg:', error); + reject(error); + }); + + ffmpegProcess.on('close', code => { + if (code === 0) { + console.log('Audio extraction complete:', outputAudioPath); + resolve(); + } else { + reject(new Error(`ffmpeg exited with code ${code}`)); + } + }); + }); + } + + // Register an API route to process a media file (audio or video) + // Extracts audio from video files, transcribes the audio using OpenAI Whisper, and provides a summary + register({ + method: Method.POST, + subscription: '/processMediaFile', + secureHandler: async ({ req, res }) => { + const { fileName } = req.body; + + // Ensure the filename is provided + if (!fileName) { + res.status(400).send({ error: 'Filename is required' }); + return; + } + + try { + // Determine the file type and location + const isAudio = fileName.toLowerCase().endsWith('.mp3'); + const directory = isAudio ? Directory.audio : Directory.videos; + const filePath = serverPathToFile(directory, fileName); + + // Check if the file exists + if (!fs.existsSync(filePath)) { + res.status(404).send({ error: 'File not found' }); + return; + } + + console.log(`Processing ${isAudio ? 'audio' : 'video'} file: ${fileName}`); + + // Step 1: Extract audio if it's a video + let audioPath = filePath; + if (!isAudio) { + const audioFileName = `${path.basename(fileName, path.extname(fileName))}.wav`; + audioPath = path.join(pathToDirectory(Directory.audio), audioFileName); + + console.log('Extracting audio from video...'); + await convertVideoToAudio(filePath, audioPath); + } + + // Step 2: Transcribe audio using OpenAI Whisper + console.log('Transcribing audio...'); + const transcription = await openai.audio.transcriptions.create({ + file: fs.createReadStream(audioPath), + model: 'whisper-1', + response_format: 'verbose_json', + timestamp_granularities: ['segment'], + }); + + console.log('Audio transcription complete.'); + + // Step 3: Extract concise JSON + console.log('Extracting concise JSON...'); + const originalSegments = transcription.segments?.map((segment, index) => ({ + index: index.toString(), + text: segment.text, + start: segment.start, + end: segment.end, + })); + + interface ConciseSegment { + text: string; + indexes: string[]; + start: number | null; + end: number | null; + } + + const combinedSegments = []; + let currentGroup: ConciseSegment = { text: '', indexes: [], start: null, end: null }; + let currentDuration = 0; + + originalSegments?.forEach(segment => { + const segmentDuration = segment.end - segment.start; + + if (currentDuration + segmentDuration <= 4000) { + // Add segment to the current group + currentGroup.text += (currentGroup.text ? ' ' : '') + segment.text; + currentGroup.indexes.push(segment.index); + if (currentGroup.start === null) { + currentGroup.start = segment.start; + } + currentGroup.end = segment.end; + currentDuration += segmentDuration; + } else { + // Push the current group and start a new one + combinedSegments.push({ ...currentGroup }); + currentGroup = { + text: segment.text, + indexes: [segment.index], + start: segment.start, + end: segment.end, + }; + currentDuration = segmentDuration; + } + }); + + // Push the final group if it has content + if (currentGroup.text) { + combinedSegments.push({ ...currentGroup }); + } + const lastSegment = combinedSegments[combinedSegments.length - 1]; + + // Check if the last segment is too short and combine it with the second last + if (combinedSegments.length > 1 && lastSegment.end && lastSegment.start) { + const secondLastSegment = combinedSegments[combinedSegments.length - 2]; + const lastDuration = lastSegment.end - lastSegment.start; + + if (lastDuration < 30) { + // Combine the last segment with the second last + secondLastSegment.text += (secondLastSegment.text ? ' ' : '') + lastSegment.text; + secondLastSegment.indexes = secondLastSegment.indexes.concat(lastSegment.indexes); + secondLastSegment.end = lastSegment.end; + + // Remove the last segment from the array + combinedSegments.pop(); + } + } + + console.log('Segments combined successfully.'); + + console.log('Generating summary using GPT-4...'); + const combinedText = combinedSegments.map(segment => segment.text).join(' '); + + let summary = ''; + try { + const completion = await openai.chat.completions.create({ + messages: [{ role: 'system', content: `Summarize the following text in a concise paragraph:\n\n${combinedText}` }], + model: 'gpt-4o', + }); + console.log('Summary generation complete.'); + summary = completion.choices[0].message.content ?? 'Summary could not be generated.'; + } catch (summaryError) { + console.error('Error generating summary:', summaryError); + summary = 'Summary could not be generated.'; + } + // Step 5: Return the JSON result + res.send({ full: originalSegments, condensed: combinedSegments, summary }); + } catch (error) { + console.error('Error processing media file:', error); + res.status(500).send({ error: 'Failed to process media file' }); + } + }, + }); + // Axios instance with custom headers for scraping const axiosInstance = axios.create({ headers: { @@ -181,7 +423,38 @@ export default class AssistantManager extends ApiManager { } }; - // Register a proxy fetch API route + // Register an API route to generate an image using OpenAI's DALL-E model + // Uploads the generated image to the server and provides a URL for access + register({ + method: Method.POST, + subscription: '/generateImage', + secureHandler: async ({ req, res }) => { + const { image_prompt } = req.body; + + if (!image_prompt) { + res.status(400).send({ error: 'No prompt provided' }); + return; + } + + try { + const image = await openai.images.generate({ model: 'dall-e-3', prompt: image_prompt, response_format: 'url' }); + console.log(image); + const result = await DashUploadUtils.UploadImage(image.data[0].url!); + + const url = image.data[0].url; + + res.send({ result, url }); + } catch (error) { + console.error('Error fetching the URL:', error); + res.status(500).send({ + error: 'Failed to fetch the URL', + }); + } + }, + }); + + // Register an API route to fetch data from a URL using a proxy with retry logic + // Useful for bypassing rate limits or scraping inaccessible data register({ method: Method.POST, subscription: '/proxyFetch', @@ -206,6 +479,7 @@ export default class AssistantManager extends ApiManager { }); // Register an API route to scrape website content using Puppeteer and JSDOM + // Extracts and returns readable content from a given URL register({ method: Method.POST, subscription: '/scrapeWebsite', @@ -246,6 +520,8 @@ export default class AssistantManager extends ApiManager { }, }); + // Register an API route to create a document and start a background job for processing + // Uses Python scripts to process files and generate document chunks for further use register({ method: Method.POST, subscription: '/createDocument', @@ -263,7 +539,7 @@ export default class AssistantManager extends ApiManager { // Spawn the Python process and track its progress/output // eslint-disable-next-line no-use-before-define - spawnPythonProcess(jobId, file_name, file_data); + spawnPythonProcess(jobId, public_path); // Send the job ID back to the client for tracking res.send({ jobId }); @@ -277,6 +553,7 @@ export default class AssistantManager extends ApiManager { }); // Register an API route to check the progress of a document creation job + // Returns the current step and progress percentage register({ method: Method.GET, subscription: '/getProgress/:jobId', @@ -294,59 +571,30 @@ export default class AssistantManager extends ApiManager { }, }); - // Register an API route to get the final result of a document creation job + // Register an API route to retrieve the final result of a document creation job + // Returns the processed data or an error status if the job is incomplete register({ method: Method.GET, subscription: '/getResult/:jobId', secureHandler: async ({ req, res }) => { - const { jobId } = req.params; // Get the job ID from the URL parameters - // Check if the job result is available + const { jobId } = req.params; if (jobResults[jobId]) { const result = jobResults[jobId] as AI_Document & { status: string }; - // If the result contains image or table chunks, save the base64 data as image files if (result.chunks && Array.isArray(result.chunks)) { - await Promise.all( - result.chunks.map(chunk => { - if (chunk.metadata && (chunk.metadata.type === 'image' || chunk.metadata.type === 'table')) { - const files_directory = '/files/chunk_images/'; - const directory = path.join(publicDirectory, files_directory); - - // Ensure the directory exists or create it - if (!fs.existsSync(directory)) { - fs.mkdirSync(directory); - } - - const fileName = path.basename(chunk.metadata.file_path); // Get the file name from the path - const filePath = path.join(directory, fileName); // Create the full file path - - // Check if the chunk contains base64 encoded data - if (chunk.metadata.base64_data) { - // Decode the base64 data and write it to a file - const buffer = Buffer.from(chunk.metadata.base64_data, 'base64'); - fs.promises.writeFile(filePath, buffer).then(() => { - // Update the file path in the chunk's metadata - chunk.metadata.file_path = path.join(files_directory, fileName); - chunk.metadata.base64_data = undefined; // Remove the base64 data from the metadata - }); - } else { - console.warn(`No base64_data found for chunk: ${fileName}`); - } - } - }) - ); result.status = 'completed'; } else { result.status = 'pending'; } - res.json(result); // Send the result back to the client + res.json(result); } else { res.status(202).send({ status: 'pending' }); } }, }); - // Register an API route to format chunks (e.g., text or image chunks) for display + // Register an API route to format chunks of text or images for structured display + // Converts raw chunk data into a structured format for frontend consumption register({ method: Method.POST, subscription: '/formatChunks', @@ -367,7 +615,8 @@ export default class AssistantManager extends ApiManager { // If the chunk is an image or table, read the corresponding file and encode it as base64 if (chunk.metadata.type === 'image' || chunk.metadata.type === 'table') { try { - const filePath = serverPathToFile(Directory.chunk_images, chunk.metadata.file_path); // Get the file path + const filePath = path.join(pathToDirectory(Directory.chunk_images), chunk.metadata.file_path); // Get the file path + console.log(filePath); readFileAsync(filePath).then(imageBuffer => { const base64Image = imageBuffer.toString('base64'); // Convert the image to base64 @@ -401,6 +650,7 @@ export default class AssistantManager extends ApiManager { }); // Register an API route to create and save a CSV file on the server + // Writes the CSV content to a unique file and provides a URL for download register({ method: Method.POST, subscription: '/createCSV', @@ -440,15 +690,23 @@ export default class AssistantManager extends ApiManager { } } -function spawnPythonProcess(jobId: string, file_name: string, file_data: string) { +/** + * Spawns a Python process to handle file processing tasks. + * @param jobId The job ID for tracking progress. + * @param file_name The name of the file to process. + * @param file_path The filepath of the file to process. + */ +function spawnPythonProcess(jobId: string, file_path: string) { const venvPath = path.join(__dirname, '../chunker/venv'); const requirementsPath = path.join(__dirname, '../chunker/requirements.txt'); const pythonScriptPath = path.join(__dirname, '../chunker/pdf_chunker.py'); + const outputDirectory = pathToDirectory(Directory.chunk_images); + function runPythonScript() { const pythonPath = process.platform === 'win32' ? path.join(venvPath, 'Scripts', 'python') : path.join(venvPath, 'bin', 'python3'); - const pythonProcess = spawn(pythonPath, [pythonScriptPath, jobId, file_name, file_data]); + const pythonProcess = spawn(pythonPath, [pythonScriptPath, jobId, file_path, outputDirectory]); let pythonOutput = ''; let stderrOutput = ''; @@ -460,23 +718,30 @@ function spawnPythonProcess(jobId: string, file_name: string, file_data: string) pythonProcess.stderr.on('data', data => { stderrOutput += data.toString(); const lines = stderrOutput.split('\n'); + stderrOutput = lines.pop() || ''; // Save the last partial line back to stderrOutput lines.forEach(line => { if (line.trim()) { - try { - const parsedOutput = JSON.parse(line); - if (parsedOutput.job_id && parsedOutput.progress !== undefined) { - jobProgress[parsedOutput.job_id] = { - step: parsedOutput.step, - progress: parsedOutput.progress, - }; - } else if (parsedOutput.progress !== undefined) { - jobProgress[jobId] = { - step: parsedOutput.step, - progress: parsedOutput.progress, - }; + if (line.startsWith('PROGRESS:')) { + const jsonString = line.substring('PROGRESS:'.length); + try { + const parsedOutput = JSON.parse(jsonString); + if (parsedOutput.job_id && parsedOutput.progress !== undefined) { + jobProgress[parsedOutput.job_id] = { + step: parsedOutput.step, + progress: parsedOutput.progress, + }; + } else if (parsedOutput.progress !== undefined) { + jobProgress[jobId] = { + step: parsedOutput.step, + progress: parsedOutput.progress, + }; + } + } catch (err) { + console.error('Error parsing progress JSON:', jsonString, err); } - } catch (err) { - console.error('Progress log from Python:', line, err); + } else { + // Log other stderr output + console.error('Python stderr:', line); } } }); @@ -490,10 +755,24 @@ function spawnPythonProcess(jobId: string, file_name: string, file_data: string) jobProgress[jobId] = { step: 'Complete', progress: 100 }; } catch (err) { console.error('Error parsing final JSON result:', err); + jobResults[jobId] = { error: 'Failed to parse final result' }; } } else { console.error(`Python process exited with code ${code}`); - jobResults[jobId] = { error: 'Python process failed' }; + // Check if there was an error message in stderr + if (stderrOutput) { + // Try to parse the last line as JSON + const lines = stderrOutput.trim().split('\n'); + const lastLine = lines[lines.length - 1]; + try { + const errorOutput = JSON.parse(lastLine); + jobResults[jobId] = errorOutput; + } catch { + jobResults[jobId] = { error: 'Python process failed' }; + } + } else { + jobResults[jobId] = { error: 'Python process failed' }; + } } }); } |