aboutsummaryrefslogtreecommitdiff
path: root/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'src/server')
-rw-r--r--src/server/ApiManagers/AssistantManager.ts243
-rw-r--r--src/server/chunker/pdf_chunker.py54
2 files changed, 261 insertions, 36 deletions
diff --git a/src/server/ApiManagers/AssistantManager.ts b/src/server/ApiManagers/AssistantManager.ts
index 86af756a4..c41f697db 100644
--- a/src/server/ApiManagers/AssistantManager.ts
+++ b/src/server/ApiManagers/AssistantManager.ts
@@ -15,11 +15,13 @@ import * as fs from 'fs';
import { writeFile } from 'fs';
import { google } from 'googleapis';
import { JSDOM } from 'jsdom';
+import OpenAI from 'openai';
import * as path from 'path';
import * as puppeteer from 'puppeteer';
import { promisify } from 'util';
import * as uuid from 'uuid';
import { AI_Document } from '../../client/views/nodes/chatbot/types/types';
+import { DashUploadUtils } from '../DashUploadUtils';
import { Method } from '../RouteManager';
import { filesDirectory, publicDirectory } from '../SocketData';
import ApiManager, { Registration } from './ApiManager';
@@ -87,6 +89,7 @@ export default class AssistantManager extends ApiManager {
protected initialize(register: Registration): void {
// Initialize Google Custom Search API
const customsearch = google.customsearch('v1');
+ const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
// Register Wikipedia summary API route
register({
@@ -115,6 +118,8 @@ export default class AssistantManager extends ApiManager {
},
});
+ // Register an API route to retrieve web search results using Google Custom Search
+ // This route filters results by checking their x-frame-options headers for security purposes
register({
method: Method.POST,
subscription: '/getWebSearchResults',
@@ -202,6 +207,187 @@ export default class AssistantManager extends ApiManager {
},
});
+ /**
+ * Converts a video file to audio format using ffmpeg.
+ * @param videoPath The path to the input video file.
+ * @param outputAudioPath The path to the output audio file.
+ * @returns A promise that resolves when the conversion is complete.
+ */
+ function convertVideoToAudio(videoPath: string, outputAudioPath: string): Promise<void> {
+ return new Promise((resolve, reject) => {
+ const ffmpegProcess = spawn('ffmpeg', [
+ '-i',
+ videoPath, // Input file
+ '-vn', // No video
+ '-acodec',
+ 'pcm_s16le', // Audio codec
+ '-ac',
+ '1', // Number of audio channels
+ '-ar',
+ '16000', // Audio sampling frequency
+ '-f',
+ 'wav', // Output format
+ outputAudioPath, // Output file
+ ]);
+
+ ffmpegProcess.on('error', error => {
+ console.error('Error running ffmpeg:', error);
+ reject(error);
+ });
+
+ ffmpegProcess.on('close', code => {
+ if (code === 0) {
+ console.log('Audio extraction complete:', outputAudioPath);
+ resolve();
+ } else {
+ reject(new Error(`ffmpeg exited with code ${code}`));
+ }
+ });
+ });
+ }
+
+ // Register an API route to process a media file (audio or video)
+ // Extracts audio from video files, transcribes the audio using OpenAI Whisper, and provides a summary
+ register({
+ method: Method.POST,
+ subscription: '/processMediaFile',
+ secureHandler: async ({ req, res }) => {
+ const { fileName } = req.body;
+
+ // Ensure the filename is provided
+ if (!fileName) {
+ res.status(400).send({ error: 'Filename is required' });
+ return;
+ }
+
+ try {
+ // Determine the file type and location
+ const isAudio = fileName.toLowerCase().endsWith('.mp3');
+ const directory = isAudio ? Directory.audio : Directory.videos;
+ const filePath = serverPathToFile(directory, fileName);
+
+ // Check if the file exists
+ if (!fs.existsSync(filePath)) {
+ res.status(404).send({ error: 'File not found' });
+ return;
+ }
+
+ console.log(`Processing ${isAudio ? 'audio' : 'video'} file: ${fileName}`);
+
+ // Step 1: Extract audio if it's a video
+ let audioPath = filePath;
+ if (!isAudio) {
+ const audioFileName = `${path.basename(fileName, path.extname(fileName))}.wav`;
+ audioPath = path.join(pathToDirectory(Directory.audio), audioFileName);
+
+ console.log('Extracting audio from video...');
+ await convertVideoToAudio(filePath, audioPath);
+ }
+
+ // Step 2: Transcribe audio using OpenAI Whisper
+ console.log('Transcribing audio...');
+ const transcription = await openai.audio.transcriptions.create({
+ file: fs.createReadStream(audioPath),
+ model: 'whisper-1',
+ response_format: 'verbose_json',
+ timestamp_granularities: ['segment'],
+ });
+
+ console.log('Audio transcription complete.');
+
+ // Step 3: Extract concise JSON
+ console.log('Extracting concise JSON...');
+ const originalSegments = transcription.segments?.map((segment, index) => ({
+ index: index.toString(),
+ text: segment.text,
+ start: segment.start,
+ end: segment.end,
+ }));
+
+ interface ConciseSegment {
+ text: string;
+ indexes: string[];
+ start: number | null;
+ end: number | null;
+ }
+
+ const combinedSegments = [];
+ let currentGroup: ConciseSegment = { text: '', indexes: [], start: null, end: null };
+ let currentDuration = 0;
+
+ originalSegments?.forEach(segment => {
+ const segmentDuration = segment.end - segment.start;
+
+ if (currentDuration + segmentDuration <= 4000) {
+ // Add segment to the current group
+ currentGroup.text += (currentGroup.text ? ' ' : '') + segment.text;
+ currentGroup.indexes.push(segment.index);
+ if (currentGroup.start === null) {
+ currentGroup.start = segment.start;
+ }
+ currentGroup.end = segment.end;
+ currentDuration += segmentDuration;
+ } else {
+ // Push the current group and start a new one
+ combinedSegments.push({ ...currentGroup });
+ currentGroup = {
+ text: segment.text,
+ indexes: [segment.index],
+ start: segment.start,
+ end: segment.end,
+ };
+ currentDuration = segmentDuration;
+ }
+ });
+
+ // Push the final group if it has content
+ if (currentGroup.text) {
+ combinedSegments.push({ ...currentGroup });
+ }
+ const lastSegment = combinedSegments[combinedSegments.length - 1];
+
+ // Check if the last segment is too short and combine it with the second last
+ if (combinedSegments.length > 1 && lastSegment.end && lastSegment.start) {
+ const secondLastSegment = combinedSegments[combinedSegments.length - 2];
+ const lastDuration = lastSegment.end - lastSegment.start;
+
+ if (lastDuration < 30) {
+ // Combine the last segment with the second last
+ secondLastSegment.text += (secondLastSegment.text ? ' ' : '') + lastSegment.text;
+ secondLastSegment.indexes = secondLastSegment.indexes.concat(lastSegment.indexes);
+ secondLastSegment.end = lastSegment.end;
+
+ // Remove the last segment from the array
+ combinedSegments.pop();
+ }
+ }
+
+ console.log('Segments combined successfully.');
+
+ console.log('Generating summary using GPT-4...');
+ const combinedText = combinedSegments.map(segment => segment.text).join(' ');
+
+ let summary = '';
+ try {
+ const completion = await openai.chat.completions.create({
+ messages: [{ role: 'system', content: `Summarize the following text in a concise paragraph:\n\n${combinedText}` }],
+ model: 'gpt-4o',
+ });
+ console.log('Summary generation complete.');
+ summary = completion.choices[0].message.content ?? 'Summary could not be generated.';
+ } catch (summaryError) {
+ console.error('Error generating summary:', summaryError);
+ summary = 'Summary could not be generated.';
+ }
+ // Step 5: Return the JSON result
+ res.send({ full: originalSegments, condensed: combinedSegments, summary });
+ } catch (error) {
+ console.error('Error processing media file:', error);
+ res.status(500).send({ error: 'Failed to process media file' });
+ }
+ },
+ });
+
// Axios instance with custom headers for scraping
const axiosInstance = axios.create({
headers: {
@@ -236,7 +422,38 @@ export default class AssistantManager extends ApiManager {
}
};
- // Register a proxy fetch API route
+ // Register an API route to generate an image using OpenAI's DALL-E model
+ // Uploads the generated image to the server and provides a URL for access
+ register({
+ method: Method.POST,
+ subscription: '/generateImage',
+ secureHandler: async ({ req, res }) => {
+ const { image_prompt } = req.body;
+
+ if (!image_prompt) {
+ res.status(400).send({ error: 'No prompt provided' });
+ return;
+ }
+
+ try {
+ const image = await openai.images.generate({ model: 'dall-e-3', prompt: image_prompt, response_format: 'url' });
+ console.log(image);
+ const result = await DashUploadUtils.UploadImage(image.data[0].url!);
+
+ const url = image.data[0].url;
+
+ res.send({ result, url });
+ } catch (error) {
+ console.error('Error fetching the URL:', error);
+ res.status(500).send({
+ error: 'Failed to fetch the URL',
+ });
+ }
+ },
+ });
+
+ // Register an API route to fetch data from a URL using a proxy with retry logic
+ // Useful for bypassing rate limits or scraping inaccessible data
register({
method: Method.POST,
subscription: '/proxyFetch',
@@ -261,6 +478,7 @@ export default class AssistantManager extends ApiManager {
});
// Register an API route to scrape website content using Puppeteer and JSDOM
+ // Extracts and returns readable content from a given URL
register({
method: Method.POST,
subscription: '/scrapeWebsite',
@@ -301,6 +519,8 @@ export default class AssistantManager extends ApiManager {
},
});
+ // Register an API route to create a document and start a background job for processing
+ // Uses Python scripts to process files and generate document chunks for further use
register({
method: Method.POST,
subscription: '/createDocument',
@@ -318,7 +538,7 @@ export default class AssistantManager extends ApiManager {
// Spawn the Python process and track its progress/output
// eslint-disable-next-line no-use-before-define
- spawnPythonProcess(jobId, file_name, file_data);
+ spawnPythonProcess(jobId, file_name, public_path);
// Send the job ID back to the client for tracking
res.send({ jobId });
@@ -332,6 +552,7 @@ export default class AssistantManager extends ApiManager {
});
// Register an API route to check the progress of a document creation job
+ // Returns the current step and progress percentage
register({
method: Method.GET,
subscription: '/getProgress/:jobId',
@@ -349,7 +570,8 @@ export default class AssistantManager extends ApiManager {
},
});
- // Register an API route to get the final result of a document creation job
+ // Register an API route to retrieve the final result of a document creation job
+ // Returns the processed data or an error status if the job is incomplete
register({
method: Method.GET,
subscription: '/getResult/:jobId',
@@ -370,7 +592,8 @@ export default class AssistantManager extends ApiManager {
},
});
- // Register an API route to format chunks (e.g., text or image chunks) for display
+ // Register an API route to format chunks of text or images for structured display
+ // Converts raw chunk data into a structured format for frontend consumption
register({
method: Method.POST,
subscription: '/formatChunks',
@@ -392,6 +615,7 @@ export default class AssistantManager extends ApiManager {
if (chunk.metadata.type === 'image' || chunk.metadata.type === 'table') {
try {
const filePath = path.join(pathToDirectory(Directory.chunk_images), chunk.metadata.file_path); // Get the file path
+ console.log(filePath);
readFileAsync(filePath).then(imageBuffer => {
const base64Image = imageBuffer.toString('base64'); // Convert the image to base64
@@ -425,6 +649,7 @@ export default class AssistantManager extends ApiManager {
});
// Register an API route to create and save a CSV file on the server
+ // Writes the CSV content to a unique file and provides a URL for download
register({
method: Method.POST,
subscription: '/createCSV',
@@ -464,7 +689,13 @@ export default class AssistantManager extends ApiManager {
}
}
-function spawnPythonProcess(jobId: string, file_name: string, file_data: string) {
+/**
+ * Spawns a Python process to handle file processing tasks.
+ * @param jobId The job ID for tracking progress.
+ * @param file_name The name of the file to process.
+ * @param file_path The filepath of the file to process.
+ */
+function spawnPythonProcess(jobId: string, file_name: string, file_path: string) {
const venvPath = path.join(__dirname, '../chunker/venv');
const requirementsPath = path.join(__dirname, '../chunker/requirements.txt');
const pythonScriptPath = path.join(__dirname, '../chunker/pdf_chunker.py');
@@ -474,7 +705,7 @@ function spawnPythonProcess(jobId: string, file_name: string, file_data: string)
function runPythonScript() {
const pythonPath = process.platform === 'win32' ? path.join(venvPath, 'Scripts', 'python') : path.join(venvPath, 'bin', 'python3');
- const pythonProcess = spawn(pythonPath, [pythonScriptPath, jobId, file_name, file_data, outputDirectory]);
+ const pythonProcess = spawn(pythonPath, [pythonScriptPath, jobId, file_path, outputDirectory]);
let pythonOutput = '';
let stderrOutput = '';
diff --git a/src/server/chunker/pdf_chunker.py b/src/server/chunker/pdf_chunker.py
index 48b2dbf97..a9dbcbb0c 100644
--- a/src/server/chunker/pdf_chunker.py
+++ b/src/server/chunker/pdf_chunker.py
@@ -668,7 +668,7 @@ class Document:
Represents a document being processed, such as a PDF, handling chunking, embedding, and summarization.
"""
- def __init__(self, file_data: bytes, file_name: str, job_id: str, output_folder: str):
+ def __init__(self, file_path: str, file_name: str, job_id: str, output_folder: str):
"""
Initialize the Document with file data, file name, and job ID.
@@ -677,8 +677,8 @@ class Document:
:param job_id: The job ID associated with this document processing task.
"""
self.output_folder = output_folder
- self.file_data = file_data
self.file_name = file_name
+ self.file_path = file_path
self.job_id = job_id
self.type = self._get_document_type(file_name) # Determine the document type (PDF, CSV, etc.)
self.doc_id = job_id # Use the job ID as the document ID
@@ -691,13 +691,23 @@ class Document:
"""
Process the document: extract chunks, embed them, and generate a summary.
"""
+ with open(self.file_path, 'rb') as file:
+ pdf_data = file.read()
pdf_chunker = PDFChunker(output_folder=self.output_folder, doc_id=self.doc_id) # Initialize PDFChunker
- self.chunks = asyncio.run(pdf_chunker.chunk_pdf(self.file_data, self.file_name, self.doc_id, self.job_id)) # Extract chunks
-
- self.num_pages = self._get_pdf_pages() # Get the number of pages in the document
+ self.chunks = asyncio.run(pdf_chunker.chunk_pdf(pdf_data, os.path.basename(self.file_path), self.doc_id, self.job_id)) # Extract chunks
+ self.num_pages = self._get_pdf_pages(pdf_data) # Get the number of pages in the document
self._embed_chunks() # Embed the text chunks into embeddings
self.summary = self._generate_summary() # Generate a summary for the document
+ def _get_pdf_pages(self, pdf_data: bytes) -> int:
+ """
+ Get the total number of pages in the PDF document.
+ """
+ pdf_file = io.BytesIO(pdf_data) # Convert the file data to an in-memory binary stream
+ pdf_reader = PdfReader(pdf_file) # Initialize PDF reader
+ return len(pdf_reader.pages) # Return the number of pages in the PDF
+
+
def _get_document_type(self, file_name: str) -> DocumentType:
"""
Determine the document type based on its file extension.
@@ -712,15 +722,6 @@ class Document:
except ValueError:
raise FileTypeNotSupportedException(extension) # Raise exception if file type is unsupported
- def _get_pdf_pages(self) -> int:
- """
- Get the total number of pages in the PDF document.
-
- :return: The number of pages in the PDF.
- """
- pdf_file = io.BytesIO(self.file_data) # Convert the file data to an in-memory binary stream
- pdf_reader = PdfReader(pdf_file) # Initialize PDF reader
- return len(pdf_reader.pages) # Return the number of pages in the PDF
def _embed_chunks(self) -> None:
"""
@@ -800,39 +801,34 @@ class Document:
"doc_id": self.doc_id
}, indent=2) # Convert the document's attributes to JSON format
-def process_document(file_data, file_name, job_id, output_folder):
+def process_document(file_path, job_id, output_folder):
"""
Top-level function to process a document and return the JSON output.
- :param file_data: The binary data of the file being processed.
- :param file_name: The name of the file being processed.
+ :param file_path: The path to the file being processed.
:param job_id: The job ID for this document processing task.
:return: The processed document's data in JSON format.
"""
- new_document = Document(file_data, file_name, job_id, output_folder)
+ new_document = Document(file_path, file_path, job_id, output_folder)
return new_document.to_json()
def main():
"""
Main entry point for the script, called with arguments from Node.js.
"""
- if len(sys.argv) != 5:
+ if len(sys.argv) != 4:
print(json.dumps({"error": "Invalid arguments"}), file=sys.stderr)
return
job_id = sys.argv[1]
- file_name = sys.argv[2]
- file_data = sys.argv[3]
- output_folder = sys.argv[4] # Get the output folder from arguments
+ file_path = sys.argv[2]
+ output_folder = sys.argv[3] # Get the output folder from arguments
try:
os.makedirs(output_folder, exist_ok=True)
-
- # Decode the base64 file data
- file_bytes = base64.b64decode(file_data)
-
+
# Process the document
- document_result = process_document(file_bytes, file_name, job_id, output_folder) # Pass output_folder
+ document_result = process_document(file_path, job_id, output_folder) # Pass output_folder
# Output the final result as JSON to stdout
print(document_result)
@@ -843,7 +839,5 @@ def main():
print(json.dumps({"error": str(e)}), file=sys.stderr)
sys.stderr.flush()
-
-
if __name__ == "__main__":
- main() # Execute the main function when the script is run
+ main() # Execute the main function when the script is run \ No newline at end of file