diff options
Diffstat (limited to 'src/server')
| -rw-r--r-- | src/server/ApiManagers/AssistantManager.ts | 158 | ||||
| -rw-r--r-- | src/server/chunker/pdf_chunker.py | 54 |
2 files changed, 179 insertions, 33 deletions
diff --git a/src/server/ApiManagers/AssistantManager.ts b/src/server/ApiManagers/AssistantManager.ts index 4d2068014..1fd88cbd6 100644 --- a/src/server/ApiManagers/AssistantManager.ts +++ b/src/server/ApiManagers/AssistantManager.ts @@ -24,6 +24,11 @@ import { Method } from '../RouteManager'; import { filesDirectory, publicDirectory } from '../SocketData'; import ApiManager, { Registration } from './ApiManager'; import { getServerPath } from '../../client/util/reportManager/reportManagerUtils'; +import { file } from 'jszip'; +import ffmpegInstaller from '@ffmpeg-installer/ffmpeg'; +import ffmpeg from 'fluent-ffmpeg'; +import OpenAI from 'openai'; +import * as xmlbuilder from 'xmlbuilder'; // Enumeration of directories where different file types are stored export enum Directory { @@ -88,6 +93,7 @@ export default class AssistantManager extends ApiManager { protected initialize(register: Registration): void { // Initialize Google Custom Search API const customsearch = google.customsearch('v1'); + const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY }); // Register Wikipedia summary API route register({ @@ -197,6 +203,148 @@ export default class AssistantManager extends ApiManager { } }, }); + function convertVideoToAudio(videoPath: string, outputAudioPath: string): Promise<void> { + return new Promise((resolve, reject) => { + const ffmpegProcess = spawn('ffmpeg', [ + '-i', + videoPath, // Input file + '-vn', // No video + '-acodec', + 'pcm_s16le', // Audio codec + '-ac', + '1', // Number of audio channels + '-ar', + '16000', // Audio sampling frequency + '-f', + 'wav', // Output format + outputAudioPath, // Output file + ]); + + ffmpegProcess.on('error', error => { + console.error('Error running ffmpeg:', error); + reject(error); + }); + + ffmpegProcess.on('close', code => { + if (code === 0) { + console.log('Audio extraction complete:', outputAudioPath); + resolve(); + } else { + reject(new Error(`ffmpeg exited with code ${code}`)); + } + }); + }); + } + + register({ + method: Method.POST, + subscription: '/processMediaFile', + secureHandler: async ({ req, res }) => { + const { fileName } = req.body; + + // Ensure the filename is provided + if (!fileName) { + res.status(400).send({ error: 'Filename is required' }); + return; + } + + try { + // Determine the file type and location + const isAudio = fileName.toLowerCase().endsWith('.mp3'); + const directory = isAudio ? Directory.audio : Directory.videos; + const filePath = serverPathToFile(directory, fileName); + + // Check if the file exists + if (!fs.existsSync(filePath)) { + res.status(404).send({ error: 'File not found' }); + return; + } + + console.log(`Processing ${isAudio ? 'audio' : 'video'} file: ${fileName}`); + + // Step 1: Extract audio if it's a video + let audioPath = filePath; + if (!isAudio) { + const audioFileName = `${path.basename(fileName, path.extname(fileName))}.wav`; + audioPath = path.join(pathToDirectory(Directory.audio), audioFileName); + + console.log('Extracting audio from video...'); + await convertVideoToAudio(filePath, audioPath); + } + + // Step 2: Transcribe audio using OpenAI Whisper + console.log('Transcribing audio...'); + const transcription = await openai.audio.transcriptions.create({ + file: fs.createReadStream(audioPath) as any, + model: 'whisper-1', + response_format: 'verbose_json', + timestamp_granularities: ['segment'], + }); + + console.log('Audio transcription complete.'); + + // Step 3: Extract concise JSON + console.log('Extracting concise JSON...'); + const conciseJSON = transcription.segments?.map((segment: any) => ({ + text: segment.text, + start: segment.start, + end: segment.end, + })); + + // Step 4: Combine segments with GPT-4 + console.log('Combining segments with GPT-4...'); + const schema = { + name: 'combine_segments_schema', + schema: { + type: 'object', + properties: { + combined_segments: { + type: 'array', + items: { + type: 'object', + properties: { + text: { type: 'string' }, + start: { type: 'number' }, + end: { type: 'number' }, + }, + required: ['text', 'start', 'end'], + }, + }, + }, + required: ['combined_segments'], + }, + }; + + const completion = await openai.chat.completions.create({ + model: 'gpt-4o-2024-08-06', + messages: [ + { + role: 'system', + content: 'Combine text segments into coherent sections, each between 5 and 10 seconds, based on their content. Return the result as JSON that follows the schema.', + }, + { + role: 'user', + content: JSON.stringify(conciseJSON), + }, + ], + response_format: { + type: 'json_schema', + json_schema: schema, + }, + }); + + const combinedSegments = JSON.parse(completion.choices[0].message?.content ?? '{"combined_segments": []}').combined_segments; + + console.log('Segments combined successfully.'); + + // Step 5: Return the JSON result + res.send(combinedSegments); + } catch (error) { + console.error('Error processing media file:', error); + res.status(500).send({ error: 'Failed to process media file' }); + } + }, + }); // Axios instance with custom headers for scraping const axiosInstance = axios.create({ @@ -314,7 +462,7 @@ export default class AssistantManager extends ApiManager { // Spawn the Python process and track its progress/output // eslint-disable-next-line no-use-before-define - spawnPythonProcess(jobId, file_name, file_data); + spawnPythonProcess(jobId, file_name, public_path); // Send the job ID back to the client for tracking res.send({ jobId }); @@ -388,6 +536,7 @@ export default class AssistantManager extends ApiManager { if (chunk.metadata.type === 'image' || chunk.metadata.type === 'table') { try { const filePath = path.join(pathToDirectory(Directory.chunk_images), chunk.metadata.file_path); // Get the file path + console.log(filePath); readFileAsync(filePath).then(imageBuffer => { const base64Image = imageBuffer.toString('base64'); // Convert the image to base64 @@ -460,7 +609,7 @@ export default class AssistantManager extends ApiManager { } } -function spawnPythonProcess(jobId: string, file_name: string, file_data: string) { +function spawnPythonProcess(jobId: string, file_name: string, file_path: string) { const venvPath = path.join(__dirname, '../chunker/venv'); const requirementsPath = path.join(__dirname, '../chunker/requirements.txt'); const pythonScriptPath = path.join(__dirname, '../chunker/pdf_chunker.py'); @@ -470,7 +619,7 @@ function spawnPythonProcess(jobId: string, file_name: string, file_data: string) function runPythonScript() { const pythonPath = process.platform === 'win32' ? path.join(venvPath, 'Scripts', 'python') : path.join(venvPath, 'bin', 'python3'); - const pythonProcess = spawn(pythonPath, [pythonScriptPath, jobId, file_name, file_data, outputDirectory]); + const pythonProcess = spawn(pythonPath, [pythonScriptPath, jobId, file_path, outputDirectory]); let pythonOutput = ''; let stderrOutput = ''; @@ -593,3 +742,6 @@ function spawnPythonProcess(jobId: string, file_name: string, file_data: string) runPythonScript(); } } +function customFfmpeg(filePath: string) { + throw new Error('Function not implemented.'); +} diff --git a/src/server/chunker/pdf_chunker.py b/src/server/chunker/pdf_chunker.py index 48b2dbf97..a9dbcbb0c 100644 --- a/src/server/chunker/pdf_chunker.py +++ b/src/server/chunker/pdf_chunker.py @@ -668,7 +668,7 @@ class Document: Represents a document being processed, such as a PDF, handling chunking, embedding, and summarization. """ - def __init__(self, file_data: bytes, file_name: str, job_id: str, output_folder: str): + def __init__(self, file_path: str, file_name: str, job_id: str, output_folder: str): """ Initialize the Document with file data, file name, and job ID. @@ -677,8 +677,8 @@ class Document: :param job_id: The job ID associated with this document processing task. """ self.output_folder = output_folder - self.file_data = file_data self.file_name = file_name + self.file_path = file_path self.job_id = job_id self.type = self._get_document_type(file_name) # Determine the document type (PDF, CSV, etc.) self.doc_id = job_id # Use the job ID as the document ID @@ -691,13 +691,23 @@ class Document: """ Process the document: extract chunks, embed them, and generate a summary. """ + with open(self.file_path, 'rb') as file: + pdf_data = file.read() pdf_chunker = PDFChunker(output_folder=self.output_folder, doc_id=self.doc_id) # Initialize PDFChunker - self.chunks = asyncio.run(pdf_chunker.chunk_pdf(self.file_data, self.file_name, self.doc_id, self.job_id)) # Extract chunks - - self.num_pages = self._get_pdf_pages() # Get the number of pages in the document + self.chunks = asyncio.run(pdf_chunker.chunk_pdf(pdf_data, os.path.basename(self.file_path), self.doc_id, self.job_id)) # Extract chunks + self.num_pages = self._get_pdf_pages(pdf_data) # Get the number of pages in the document self._embed_chunks() # Embed the text chunks into embeddings self.summary = self._generate_summary() # Generate a summary for the document + def _get_pdf_pages(self, pdf_data: bytes) -> int: + """ + Get the total number of pages in the PDF document. + """ + pdf_file = io.BytesIO(pdf_data) # Convert the file data to an in-memory binary stream + pdf_reader = PdfReader(pdf_file) # Initialize PDF reader + return len(pdf_reader.pages) # Return the number of pages in the PDF + + def _get_document_type(self, file_name: str) -> DocumentType: """ Determine the document type based on its file extension. @@ -712,15 +722,6 @@ class Document: except ValueError: raise FileTypeNotSupportedException(extension) # Raise exception if file type is unsupported - def _get_pdf_pages(self) -> int: - """ - Get the total number of pages in the PDF document. - - :return: The number of pages in the PDF. - """ - pdf_file = io.BytesIO(self.file_data) # Convert the file data to an in-memory binary stream - pdf_reader = PdfReader(pdf_file) # Initialize PDF reader - return len(pdf_reader.pages) # Return the number of pages in the PDF def _embed_chunks(self) -> None: """ @@ -800,39 +801,34 @@ class Document: "doc_id": self.doc_id }, indent=2) # Convert the document's attributes to JSON format -def process_document(file_data, file_name, job_id, output_folder): +def process_document(file_path, job_id, output_folder): """ Top-level function to process a document and return the JSON output. - :param file_data: The binary data of the file being processed. - :param file_name: The name of the file being processed. + :param file_path: The path to the file being processed. :param job_id: The job ID for this document processing task. :return: The processed document's data in JSON format. """ - new_document = Document(file_data, file_name, job_id, output_folder) + new_document = Document(file_path, file_path, job_id, output_folder) return new_document.to_json() def main(): """ Main entry point for the script, called with arguments from Node.js. """ - if len(sys.argv) != 5: + if len(sys.argv) != 4: print(json.dumps({"error": "Invalid arguments"}), file=sys.stderr) return job_id = sys.argv[1] - file_name = sys.argv[2] - file_data = sys.argv[3] - output_folder = sys.argv[4] # Get the output folder from arguments + file_path = sys.argv[2] + output_folder = sys.argv[3] # Get the output folder from arguments try: os.makedirs(output_folder, exist_ok=True) - - # Decode the base64 file data - file_bytes = base64.b64decode(file_data) - + # Process the document - document_result = process_document(file_bytes, file_name, job_id, output_folder) # Pass output_folder + document_result = process_document(file_path, job_id, output_folder) # Pass output_folder # Output the final result as JSON to stdout print(document_result) @@ -843,7 +839,5 @@ def main(): print(json.dumps({"error": str(e)}), file=sys.stderr) sys.stderr.flush() - - if __name__ == "__main__": - main() # Execute the main function when the script is run + main() # Execute the main function when the script is run
\ No newline at end of file |
