import asyncio import concurrent import sys from tqdm.asyncio import tqdm_asyncio # Progress bar for async tasks import PIL from anthropic import Anthropic # For language model API from packaging.version import parse # Version checking import pytesseract # OCR library for text extraction from images import re import dotenv # For environment variable loading from lxml import etree # XML parsing from tqdm import tqdm # Progress bar for non-async tasks import fitz # PyMuPDF, PDF processing library from PIL import Image, ImageDraw # Image processing from typing import List, Dict, Any, TypedDict # Typing for function annotations from ultralyticsplus import YOLO # Object detection model (YOLO) import base64 import io import json import os import uuid # For generating unique IDs from enum import Enum # Enums for types like document type and purpose import openai import numpy as np from PyPDF2 import PdfReader # PDF text extraction from openai import OpenAI # OpenAI client for text completion from sklearn.cluster import KMeans # Clustering for summarization import warnings # Silence specific warnings warnings.filterwarnings('ignore', message="Valid config keys have changed") warnings.filterwarnings('ignore', message="torch.load") dotenv.load_dotenv() # Load environment variables # Fix for newer versions of PIL # if parse(PIL.__version__) >= parse('10.0.0'): # Image.LINEAR = Image.BILINEAR # Global dictionary to track progress of document processing jobs current_progress = {} def update_progress(job_id, step, progress_value): """ Output the progress in JSON format to stdout for the Node.js process to capture. :param job_id: The unique identifier for the processing job. :param step: The current step of the job. :param progress_value: The percentage of completion for the current step. """ progress_data = { "job_id": job_id, "step": step, "progress": progress_value } print(f"PROGRESS:{json.dumps(progress_data)}", file=sys.stderr) sys.stderr.flush() class ElementExtractor: """ A class that uses a YOLO model to extract tables and images from a PDF page. """ def __init__(self, output_folder: str, doc_id: str): """ Initializes the ElementExtractor with the output folder for saving images and the YOLO model. :param output_folder: Path to the folder where extracted elements will be saved. """ self.doc_id = doc_id self.output_folder = os.path.join(output_folder, doc_id) os.makedirs(self.output_folder, exist_ok=True) self.model = YOLO('keremberke/yolov8m-table-extraction') # Load YOLO model for table extraction self.model.overrides['conf'] = 0.25 # Set confidence threshold for detection self.model.overrides['iou'] = 0.45 # Set Intersection over Union (IoU) threshold self.padding = 5 # Padding around detected elements async def extract_elements(self, page, padding: int = 20) -> List[Dict[str, Any]]: """ Asynchronously extract tables and images from a PDF page. :param page: A Page object representing a PDF page. :param padding: Padding around the extracted elements. :return: A list of dictionaries containing the extracted elements. """ tasks = [ asyncio.create_task(self.extract_tables(page.image, page.page_num)), # Extract tables from the page asyncio.create_task(self.extract_images(page.page, page.image, page.page_num)) # Extract images from the page ] results = await asyncio.gather(*tasks) # Wait for both tasks to complete return [item for sublist in results for item in sublist] # Flatten and return results async def extract_tables(self, img: Image.Image, page_num: int) -> List[Dict[str, Any]]: """ Asynchronously extract tables from a given page image using the YOLO model. :param img: The image of the PDF page. :param page_num: The current page number. :return: A list of dictionaries with metadata about the detected tables. """ results = self.model.predict(img, verbose=False) # Predict table locations using YOLO tables = [] for idx, box in enumerate(results[0].boxes): x1, y1, x2, y2 = map(int, box.xyxy[0]) # Extract bounding box coordinates # Draw a red rectangle on the full page image around the table page_with_outline = img.copy() draw = ImageDraw.Draw(page_with_outline) draw.rectangle( [max(0, x1 + self.padding), max(0, y1 + self.padding), min(page_with_outline.width, x2 + self.padding), min(page_with_outline.height, y2 + self.padding)], outline="red", width=2) # Draw red outline # Save the full page with the red outline table_filename = f"table_page{page_num + 1}_{idx + 1}.png" table_path = os.path.join(self.output_folder, table_filename) page_with_outline.save(table_path) file_path_for_client = f"{self.doc_id}/{table_filename}" tables.append({ 'metadata': { "type": "table", "location": [x1 / img.width, y1 / img.height, x2 / img.width, y2 / img.height], "file_path": file_path_for_client, "start_page": page_num, "end_page": page_num, "base64_data": self.image_to_base64(page_with_outline) } }) return tables async def extract_images(self, page: fitz.Page, img: Image.Image, page_num: int) -> List[Dict[str, Any]]: """ Asynchronously extract embedded images from a PDF page. :param page: A fitz.Page object representing the PDF page. :param img: The image of the PDF page. :param page_num: The current page number. :return: A list of dictionaries with metadata about the detected images. """ images = [] image_list = page.get_images(full=True) # Get a list of images on the page if not image_list: return images for img_index, img_info in enumerate(image_list): xref = img_info[0] # XREF of the image in the PDF base_image = page.parent.extract_image(xref) # Extract the image by its XREF image_bytes = base_image["image"] image = Image.open(io.BytesIO(image_bytes)).convert("RGB") # Ensure it's RGB before saving as PNG width_ratio = img.width / page.rect.width # Scale factor for width height_ratio = img.height / page.rect.height # Scale factor for height # Get image coordinates or default to page rectangle rect_list = page.get_image_rects(xref) if rect_list: rect = rect_list[0] x1, y1, x2, y2 = rect else: rect = page.rect x1, y1, x2, y2 = rect # Draw a red rectangle on the full page image around the embedded image page_with_outline = img.copy() draw = ImageDraw.Draw(page_with_outline) draw.rectangle([x1 * width_ratio, y1 * height_ratio, x2 * width_ratio, y2 * height_ratio], outline="red", width=2) # Draw red outline # Save the full page with the red outline image_filename = f"image_page{page_num + 1}_{img_index + 1}.png" image_path = os.path.join(self.output_folder, image_filename) page_with_outline.save(image_path) file_path_for_client = f"{self.doc_id}/{image_filename}" images.append({ 'metadata': { "type": "image", "location": [x1 / page.rect.width, y1 / page.rect.height, x2 / page.rect.width, y2 / page.rect.height], "file_path": file_path_for_client, "start_page": page_num, "end_page": page_num, "base64_data": self.image_to_base64(image) } }) return images @staticmethod def image_to_base64(image: Image.Image) -> str: """ Convert a PIL image to a base64-encoded string. :param image: The PIL image to be converted. :return: The base64-encoded string of the image. """ buffered = io.BytesIO() image.save(buffered, format="PNG") # Save image as PNG to an in-memory buffer return base64.b64encode(buffered.getvalue()).decode('utf-8') # Convert to base64 and return class ChunkMetaData(TypedDict): """ A TypedDict that defines the metadata structure for chunks of text and visual elements. """ text: str type: str original_document: str file_path: str doc_id: str location: str start_page: int end_page: int base64_data: str class Chunk(TypedDict): """ A TypedDict that defines the structure for a document chunk, including metadata and embeddings. """ id: str values: List[float] metadata: ChunkMetaData class Page: """ A class that represents a single PDF page, handling its image representation and element masking. """ def __init__(self, page: fitz.Page, page_num: int): """ Initializes the Page with its page number and the image representation of the page. :param page: A fitz.Page object representing the PDF page. :param page_num: The number of the page in the PDF. """ self.page = page self.page_num = page_num # Get high-resolution image of the page (for table/image extraction) self.pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) self.image = Image.frombytes("RGB", [self.pix.width, self.pix.height], self.pix.samples) self.masked_image = self.image.copy() # Image with masked elements (tables/images) self.draw = ImageDraw.Draw(self.masked_image) self.elements = [] # List to store extracted elements def add_element(self, element): """ Adds a detected element (table/image) to the page and masks its location on the page image. :param element: A dictionary containing metadata about the detected element. """ self.elements.append(element) # Mask the element on the page image by drawing a white rectangle over its location x1, y1, x2, y2 = [coord * self.image.width if i % 2 == 0 else coord * self.image.height for i, coord in enumerate(element['metadata']['location'])] self.draw.rectangle([x1, y1, x2, y2], fill="white") # Draw a white rectangle to mask the element class PDFChunker: """ The main class responsible for chunking PDF files into text and visual elements (tables/images). """ def __init__(self, output_folder: str = "output", doc_id: str = '', image_batch_size: int = 5) -> None: """ Initializes the PDFChunker with an output folder and an element extractor for visual elements. :param output_folder: Folder to store the output files (extracted tables/images). :param image_batch_size: The batch size for processing visual elements. """ self.client = OpenAI() # ← replaces Anthropic() self.output_folder = output_folder self.image_batch_size = image_batch_size # Batch size for image processing self.doc_id = doc_id # Add doc_id self.element_extractor = ElementExtractor(output_folder, doc_id) async def chunk_pdf(self, file_data: bytes, file_name: str, doc_id: str, job_id: str) -> List[Dict[str, Any]]: """ Processes a PDF file, extracting text and visual elements, and returning structured chunks. :param file_data: The binary data of the PDF file. :param file_name: The name of the PDF file. :param doc_id: The unique document ID for this job. :param job_id: The unique job ID for the processing task. :return: A list of structured chunks containing text and visual elements. """ with fitz.open(stream=file_data, filetype="pdf") as pdf_document: num_pages = len(pdf_document) # Get the total number of pages in the PDF pages = [Page(pdf_document[i], i) for i in tqdm(range(num_pages), desc="Initializing Pages")] # Initialize each page update_progress(job_id, "Extracting tables and images...", 0) await self.extract_and_mask_elements(pages, job_id) # Extract and mask elements (tables/images) update_progress(job_id, "Processing tables and images...", 0) await self.process_visual_elements(pages, self.image_batch_size, job_id) # Process visual elements update_progress(job_id, "Extracting text...", 0) page_texts = await self.extract_text_from_masked_pages(pages, job_id) # Extract text from masked pages update_progress(job_id, "Processing text...", 0) text_chunks = self.chunk_text_with_metadata(page_texts, max_words=2000, job_id=job_id) # Chunk text into smaller parts # Combine text and visual elements into a unified structure (chunks) chunks = self.combine_chunks(text_chunks, [elem for page in pages for elem in page.elements], file_name, doc_id) return chunks async def extract_and_mask_elements(self, pages: List[Page], job_id: str): """ Extract visual elements (tables and images) from each page and mask them on the page. :param pages: A list of Page objects representing the PDF pages. :param job_id: The unique job ID for the processing task. """ total_pages = len(pages) tasks = [] for i, page in enumerate(pages): tasks.append(asyncio.create_task(self.element_extractor.extract_elements(page))) # Extract elements asynchronously progress = ((i + 1) / total_pages) * 100 # Calculate progress update_progress(job_id, "Extracting tables and images...", progress) # Gather all extraction results results = await asyncio.gather(*tasks) # Mask the detected elements on the page images for page, elements in zip(pages, results): for element in elements: page.add_element(element) # Mask each extracted element on the page async def process_visual_elements(self, pages: List[Page], image_batch_size: int, job_id: str) -> List[Dict[str, Any]]: """ Process extracted visual elements in batches, generating summaries or descriptions. :param pages: A list of Page objects representing the PDF pages. :param image_batch_size: The batch size for processing visual elements. :param job_id: The unique job ID for the processing task. :return: A list of processed elements with metadata and generated summaries. """ pre_elements = [element for page in pages for element in page.elements] # Flatten list of elements processed_elements = [] total_batches = (len(pre_elements) // image_batch_size) + 1 # Calculate total number of batches loop = asyncio.get_event_loop() with concurrent.futures.ThreadPoolExecutor() as executor: # Process elements in batches for i in tqdm(range(0, len(pre_elements), image_batch_size), desc="Processing Visual Elements"): batch = pre_elements[i:i + image_batch_size] # Run image summarization in a separate thread summaries = await loop.run_in_executor( executor, self.batch_summarize_images, {j + 1: element.get('metadata').get('base64_data') for j, element in enumerate(batch)} ) # Append generated summaries to the elements for j, elem in enumerate(batch, start=1): if j in summaries: elem['metadata']['text'] = re.sub(r'^(Image|Table):\s*', '', summaries[j]) elem['metadata']['base64_data'] = '' processed_elements.append(elem) progress = ((i // image_batch_size) + 1) / total_batches * 100 # Calculate progress update_progress(job_id, "Processing tables and images...", progress) return processed_elements async def extract_text_from_masked_pages(self, pages: List[Page], job_id: str) -> Dict[int, str]: """ Extract text from masked page images (where tables and images have been masked out). :param pages: A list of Page objects representing the PDF pages. :param job_id: The unique job ID for the processing task. :return: A dictionary mapping page numbers to extracted text. """ total_pages = len(pages) tasks = [] for i, page in enumerate(pages): tasks.append(asyncio.create_task(self.extract_text(page.masked_image, page.page_num))) # Perform OCR on each page progress = ((i + 1) / total_pages) * 100 # Calculate progress update_progress(job_id, "Extracting text...", progress) # Return extracted text from each page return dict(await asyncio.gather(*tasks)) @staticmethod async def extract_text(image: Image.Image, page_num: int) -> (int, str): """ Perform OCR on the provided image to extract text. :param image: The PIL image of the page. :param page_num: The current page number. :return: A tuple containing the page number and the extracted text. """ result = pytesseract.image_to_string(image) # Extract text using Tesseract OCR return page_num + 1, result.strip() # Return the page number and extracted text def chunk_text_with_metadata(self, page_texts: Dict[int, str], max_words: int, job_id: str) -> List[Dict[str, Any]]: """ Break the extracted text into smaller chunks with metadata (e.g., page numbers). :param page_texts: A dictionary mapping page numbers to extracted text. :param max_words: The maximum number of words allowed in a chunk. :param job_id: The unique job ID for the processing task. :return: A list of dictionaries containing text chunks with metadata. """ chunks = [] current_chunk = "" current_start_page = 0 total_words = 0 def add_chunk(chunk_text, start_page, end_page): # Add a chunk of text with metadata chunks.append({ "text": chunk_text.strip(), "start_page": start_page, "end_page": end_page }) total_pages = len(page_texts) for i, (page_num, text) in enumerate(tqdm(page_texts.items(), desc="Chunking Text")): sentences = self.split_into_sentences(text) for sentence in sentences: word_count = len(sentence.split()) # If adding this sentence exceeds max_words, create a new chunk if total_words + word_count > max_words: add_chunk(current_chunk, current_start_page, page_num) current_chunk = sentence + " " current_start_page = page_num total_words = word_count else: current_chunk += sentence + " " total_words += word_count current_chunk += "\n\n" progress = ((i + 1) / total_pages) * 100 # Calculate progress update_progress(job_id, "Processing text...", progress) # Add the last chunk if there is leftover text if current_chunk.strip(): add_chunk(current_chunk, current_start_page, page_num) return chunks @staticmethod def split_into_sentences(text): """ Split the text into sentences using regular expressions. :param text: The raw text to be split into sentences. :return: A list of sentences. """ return re.split(r'(?<=[.!?])\s+', text) @staticmethod def combine_chunks(text_chunks: List[Dict[str, Any]], visual_elements: List[Dict[str, Any]], pdf_path: str, doc_id: str) -> List[Chunk]: """ Combine text and visual chunks into a unified list. :param text_chunks: A list of dictionaries containing text chunks with metadata. :param visual_elements: A list of dictionaries containing visual elements (tables/images) with metadata. :param pdf_path: The path to the original PDF file. :param doc_id: The unique document ID for this job. :return: A list of Chunk objects representing the combined data. """ combined_chunks = [] # Add text chunks for text_chunk in text_chunks: chunk_metadata: ChunkMetaData = { "text": text_chunk["text"], "type": "text", "original_document": pdf_path, "file_path": "", "location": "", "start_page": text_chunk["start_page"], "end_page": text_chunk["end_page"], "base64_data": "", "doc_id": doc_id, } chunk_dict: Chunk = { "id": str(uuid.uuid4()), # Generate a unique ID for the chunk "values": [], "metadata": chunk_metadata, } combined_chunks.append(chunk_dict) # Add visual chunks (tables/images) for elem in visual_elements: visual_chunk_metadata: ChunkMetaData = { "type": elem['metadata']['type'], "file_path": elem['metadata']['file_path'], "text": elem['metadata'].get('text', ''), "start_page": elem['metadata']['start_page'], "end_page": elem['metadata']['end_page'], "location": str(elem['metadata']['location']), "base64_data": elem['metadata']['base64_data'], "doc_id": doc_id, "original_document": pdf_path, } visual_chunk_dict: Chunk = { "id": str(uuid.uuid4()), # Generate a unique ID for the visual chunk "values": [], "metadata": visual_chunk_metadata, } combined_chunks.append(visual_chunk_dict) return combined_chunks def batch_summarize_images(self, images: Dict[int, str]) -> Dict[int, str]: """ Summarise a batch of images/tables with GPT‑4o using Structured Outputs. :param images: {image_number: base64_png} :return: {image_number: summary_text} """ # -------- 1. Build the prompt ----------- content: list[dict] = [] for n, b64 in images.items(): content.append({"type": "text", "text": f"\nImage {n} (outlined in red on the page):"}) content.append({"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}}) messages = [ { "role": "system", "content": ( "You are generating retrieval‑ready summaries for each highlighted " "image or table. Start by identifying whether the element is an " "image or a table, then write one informative sentence that a vector " "search would find useful. Provide detail but limit to a couple of paragraphs per image." ), }, {"role": "user", "content": content}, ] schema = { "type": "object", "properties": { "summaries": { "type": "array", "items": { "type": "object", "properties": { "number": {"type": "integer"}, "type": {"type": "string", "enum": ["image", "table"]}, "summary": {"type": "string"} }, "required": ["number", "type", "summary"], "additionalProperties": False } } }, "required": ["summaries"], "additionalProperties": False } # ---------- OpenAI call ----------------------------------------------------- try: resp = self.client.chat.completions.create( model="gpt-4o", messages=messages, max_tokens=400 * len(images), temperature=0, response_format={ "type": "json_schema", "json_schema": { "name": "image_batch_summaries", # ← REQUIRED "schema": schema, # ← REQUIRED "strict": True # ← strongly recommended }, }, ) parsed = json.loads(resp.choices[0].message.content) # schema‑safe return {item["number"]: item["summary"] for item in parsed["summaries"]} except Exception as e: # Log and fall back gracefully print(json.dumps({"error": str(e)}), file=sys.stderr, flush=True) return {} class DocumentType(Enum): """ Enum representing different types of documents that can be processed. """ PDF = "pdf" # PDF file type CSV = "csv" # CSV file type TXT = "txt" # Plain text file type HTML = "html" # HTML file type class FileTypeNotSupportedException(Exception): """ Exception raised when a file type is unsupported during document processing. """ def __init__(self, file_extension: str): """ Initialize the exception with the unsupported file extension. :param file_extension: The file extension that triggered the exception. """ self.file_extension = file_extension self.message = f"File type '{file_extension}' is not supported." super().__init__(self.message) # Call the parent class constructor with the message class Document: """ Represents a document being processed, such as a PDF, handling chunking, embedding, and summarization. """ def __init__(self, file_path: str, file_name: str, job_id: str, output_folder: str, doc_id: str): """ Initialize the Document with file data, file name, and job ID. :param file_data: The binary data of the file being processed. :param file_name: The name of the file being processed. :param job_id: The job ID associated with this document processing task. """ self.output_folder = output_folder self.file_name = file_name self.file_path = file_path self.job_id = job_id self.type = self._get_document_type(file_name) # Determine the document type (PDF, CSV, etc.) self.doc_id = doc_id # Use the job ID as the document ID self.chunks = [] # List to hold text and visual chunks self.num_pages = 0 # Number of pages in the document (if applicable) self.summary = "" # The generated summary for the document self._process() # Start processing the document def _process(self): """ Process the document: extract chunks, embed them, and generate a summary. """ with open(self.file_path, 'rb') as file: pdf_data = file.read() pdf_chunker = PDFChunker(output_folder=self.output_folder, doc_id=self.doc_id) # Initialize PDFChunker self.chunks = asyncio.run(pdf_chunker.chunk_pdf(pdf_data, os.path.basename(self.file_path), self.doc_id, self.job_id)) # Extract chunks self.num_pages = self._get_pdf_pages(pdf_data) # Get the number of pages in the document self._embed_chunks() # Embed the text chunks into embeddings self.summary = self._generate_summary() # Generate a summary for the document def _get_pdf_pages(self, pdf_data: bytes) -> int: """ Get the total number of pages in the PDF document. """ pdf_file = io.BytesIO(pdf_data) # Convert the file data to an in-memory binary stream pdf_reader = PdfReader(pdf_file) # Initialize PDF reader return len(pdf_reader.pages) # Return the number of pages in the PDF def _get_document_type(self, file_name: str) -> DocumentType: """ Determine the document type based on its file extension. :param file_name: The name of the file being processed. :return: The DocumentType enum value corresponding to the file extension. """ _, extension = os.path.splitext(file_name) # Split the file name to get the extension extension = extension.lower().lstrip('.') # Convert to lowercase and remove leading period try: return DocumentType(extension) # Try to match the extension to a DocumentType except ValueError: raise FileTypeNotSupportedException(extension) # Raise exception if file type is unsupported def _embed_chunks(self) -> None: """ Embed the text chunks using the Cohere API. """ openai = OpenAI() # Initialize Cohere client with API key batch_size = 90 # Batch size for embedding chunks_len = len(self.chunks) # Total number of chunks to embed for i in tqdm(range(0, chunks_len, batch_size), desc="Embedding Chunks"): batch = self.chunks[i: min(i + batch_size, chunks_len)] # Get batch of chunks texts = [chunk['metadata']['text'] for chunk in batch] # Extract text from each chunk chunk_embs_batch = openai.embeddings.create( model="text-embedding-3-large", input=texts, encoding_format="float" ) for j, data_val in enumerate(chunk_embs_batch.data): self.chunks[i + j]['values'] = data_val.embedding # Store the embeddings in the corresponding chunks def _generate_summary(self) -> str: """ Generate a summary of the document using KMeans clustering and a language model. :return: The generated summary of the document. """ # num_clusters = min(10, len(self.chunks)) # Set number of clusters for KMeans, capped at 10 # kmeans = KMeans(n_clusters=num_clusters, random_state=42) # Initialize KMeans with 10 clusters # doc_chunks = [chunk['values'] for chunk in self.chunks if 'values' in chunk] # Extract embeddings # cluster_labels = kmeans.fit_predict(doc_chunks) # Assign each chunk to a cluster doc_chunks = [chunk['values'] for chunk in self.chunks if 'values' in chunk] if not doc_chunks: raise ValueError("No valid embedded chunks to summarize.") # Remove duplicates (e.g., from OCR-ed blank pages or repeated captions) unique_chunks = np.unique(np.array(doc_chunks), axis=0) # Dynamically scale number of clusters to available signal num_clusters = min(10, len(unique_chunks)) kmeans = KMeans(n_clusters=num_clusters, random_state=42).fit(unique_chunks) # Predict cluster labels for original chunks (not just unique ones) cluster_labels = kmeans.predict(np.array(doc_chunks)) # Select representative chunks from each cluster selected_chunks = [] for i in range(num_clusters): # cluster_chunks = [chunk for chunk, label in zip(self.chunks, cluster_labels) if label == i] # Get all chunks in this cluster # cluster_embs = [emb for emb, label in zip(doc_chunks, cluster_labels) if label == i] # Get embeddings for this cluster cluster_idxs = np.where(cluster_labels == i)[0] if len(cluster_idxs) == 0: continue # skip empty clusters (shouldn't happen after downsizing) centroid = kmeans.cluster_centers_[i] # Get the centroid of the cluster distances = [np.linalg.norm(doc_chunks[idx] - centroid) for idx in cluster_idxs] closest_idx = cluster_idxs[int(np.argmin(distances))] selected_chunks.append(self.chunks[closest_idx]) # distances = [np.linalg.norm(np.array(emb) - centroid) for emb in cluster_embs] # Compute distance to centroid # closest_chunk = cluster_chunks[np.argmin(distances)] # Select chunk closest to the centroid # selected_chunks.append(closest_chunk) # Combine selected chunks into a summary combined_text = "\n\n".join([chunk['metadata']['text'] for chunk in selected_chunks]) # Concatenate chunk texts client = OpenAI() # Initialize OpenAI client for text generation completion = client.chat.completions.create( model="gpt-4o", # Specify the language model messages=[ {"role": "system", "content": "You are an AI assistant tasked with summarizing a document. You are provided with important chunks from the document and provide a summary, as best you can, of what the document will contain overall. Be concise and brief with your response."}, {"role": "user", "content": f"""Please provide a comprehensive summary of what you think the document from which these chunks were sampled would be. Ensure the summary captures the main ideas and key points from all provided chunks. Be concise and brief and only provide the summary in paragraph form. Sample text chunks: ``` {combined_text} ``` ********** Summary: """} ], max_tokens=300 # Set max tokens for the summary ) return completion.choices[0].message.content.strip() # Return the generated summary def to_json(self) -> str: """ Return the document's data in JSON format. :return: JSON string representing the document's metadata, chunks, and summary. """ return json.dumps({ "file_name": self.file_name, "num_pages": self.num_pages, "summary": self.summary, "chunks": self.chunks, "type": self.type.value, "doc_id": self.doc_id }, indent=2) # Convert the document's attributes to JSON format def process_document(file_path, job_id, output_folder, doc_id): """ Top-level function to process a document and return the JSON output. :param file_path: The path to the file being processed. :param job_id: The job ID for this document processing task. :return: The processed document's data in JSON format. """ new_document = Document(file_path, file_path, job_id, output_folder, doc_id) return new_document.to_json() def main(): """ Main entry point for the script, called with arguments from Node.js. """ if len(sys.argv) != 5: print(json.dumps({"error": "Invalid arguments"}), file=sys.stderr) return job_id = sys.argv[1] file_path = sys.argv[2] output_folder = sys.argv[3] # Get the output folder from arguments doc_id = sys.argv[4] try: os.makedirs(output_folder, exist_ok=True) # Process the document document_result = process_document(file_path, job_id, output_folder,doc_id) # Pass output_folder # Output the final result as JSON to stdout print(document_result) sys.stdout.flush() except Exception as e: # Print errors to stderr so they don't interfere with JSON output print(json.dumps({"error": str(e)}), file=sys.stderr) sys.stderr.flush() if __name__ == "__main__": main() # Execute the main function when the script is run