diff options
author | bobzel <zzzman@gmail.com> | 2025-03-10 16:13:04 -0400 |
---|---|---|
committer | bobzel <zzzman@gmail.com> | 2025-03-10 16:13:04 -0400 |
commit | b7989dded8bb001876de6cbca59bf77935f0daf7 (patch) | |
tree | 0dba0665674db7bb84770833df0a4100d0520701 /src/server/chunker/pdf_chunker.py | |
parent | 4979415d4604d280e81a162bf9a9d39c731d3738 (diff) | |
parent | 5bf944035c0ba94ad15245416f51ca0329a51bde (diff) |
Merge branch 'master' into alyssa-starter
Diffstat (limited to 'src/server/chunker/pdf_chunker.py')
-rw-r--r-- | src/server/chunker/pdf_chunker.py | 122 |
1 files changed, 61 insertions, 61 deletions
diff --git a/src/server/chunker/pdf_chunker.py b/src/server/chunker/pdf_chunker.py index 4fe3b9dbf..697550f2e 100644 --- a/src/server/chunker/pdf_chunker.py +++ b/src/server/chunker/pdf_chunker.py @@ -21,7 +21,7 @@ import json import os import uuid # For generating unique IDs from enum import Enum # Enums for types like document type and purpose -import cohere # Embedding client +import openai import numpy as np from PyPDF2 import PdfReader # PDF text extraction from openai import OpenAI # OpenAI client for text completion @@ -35,8 +35,8 @@ warnings.filterwarnings('ignore', message="torch.load") dotenv.load_dotenv() # Load environment variables # Fix for newer versions of PIL -if parse(PIL.__version__) >= parse('10.0.0'): - Image.LINEAR = Image.BILINEAR +# if parse(PIL.__version__) >= parse('10.0.0'): +# Image.LINEAR = Image.BILINEAR # Global dictionary to track progress of document processing jobs current_progress = {} @@ -54,8 +54,9 @@ def update_progress(job_id, step, progress_value): "step": step, "progress": progress_value } - print(json.dumps(progress_data), file=sys.stderr) # Use stderr for progress logs - sys.stderr.flush() # Ensure it's sent immediately + print(f"PROGRESS:{json.dumps(progress_data)}", file=sys.stderr) + sys.stderr.flush() + class ElementExtractor: @@ -63,13 +64,15 @@ class ElementExtractor: A class that uses a YOLO model to extract tables and images from a PDF page. """ - def __init__(self, output_folder: str): + def __init__(self, output_folder: str, doc_id: str): """ Initializes the ElementExtractor with the output folder for saving images and the YOLO model. :param output_folder: Path to the folder where extracted elements will be saved. """ - self.output_folder = output_folder + self.doc_id = doc_id + self.output_folder = os.path.join(output_folder, doc_id) + os.makedirs(self.output_folder, exist_ok=True) self.model = YOLO('keremberke/yolov8m-table-extraction') # Load YOLO model for table extraction self.model.overrides['conf'] = 0.25 # Set confidence threshold for detection self.model.overrides['iou'] = 0.45 # Set Intersection over Union (IoU) threshold @@ -116,17 +119,16 @@ class ElementExtractor: table_path = os.path.join(self.output_folder, table_filename) page_with_outline.save(table_path) - # Convert the full-page image with red outline to base64 - base64_data = self.image_to_base64(page_with_outline) + file_path_for_client = f"{self.doc_id}/{table_filename}" tables.append({ 'metadata': { "type": "table", "location": [x1 / img.width, y1 / img.height, x2 / img.width, y2 / img.height], - "file_path": table_path, + "file_path": file_path_for_client, "start_page": page_num, "end_page": page_num, - "base64_data": base64_data, + "base64_data": self.image_to_base64(page_with_outline) } }) @@ -175,18 +177,17 @@ class ElementExtractor: image_path = os.path.join(self.output_folder, image_filename) page_with_outline.save(image_path) - # Convert the full-page image with red outline to base64 - base64_data = self.image_to_base64(page_with_outline) + file_path_for_client = f"{self.doc_id}/{image_filename}" images.append({ 'metadata': { "type": "image", "location": [x1 / page.rect.width, y1 / page.rect.height, x2 / page.rect.width, y2 / page.rect.height], - "file_path": image_path, + "file_path": file_path_for_client, "start_page": page_num, "end_page": page_num, - "base64_data": base64_data, + "base64_data": self.image_to_base64(image) } }) @@ -268,7 +269,7 @@ class PDFChunker: The main class responsible for chunking PDF files into text and visual elements (tables/images). """ - def __init__(self, output_folder: str = "output", image_batch_size: int = 5) -> None: + def __init__(self, output_folder: str = "output", doc_id: str = '', image_batch_size: int = 5) -> None: """ Initializes the PDFChunker with an output folder and an element extractor for visual elements. @@ -278,7 +279,8 @@ class PDFChunker: self.client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) # Initialize the Anthropic API client self.output_folder = output_folder self.image_batch_size = image_batch_size # Batch size for image processing - self.element_extractor = ElementExtractor(output_folder) # Initialize the element extractor + self.doc_id = doc_id # Add doc_id + self.element_extractor = ElementExtractor(output_folder, doc_id) async def chunk_pdf(self, file_data: bytes, file_name: str, doc_id: str, job_id: str) -> List[Dict[str, Any]]: """ @@ -363,6 +365,7 @@ class PDFChunker: for j, elem in enumerate(batch, start=1): if j in summaries: elem['metadata']['text'] = re.sub(r'^(Image|Table):\s*', '', summaries[j]) + elem['metadata']['base64_data'] = '' processed_elements.append(elem) progress = ((i // image_batch_size) + 1) / total_batches * 100 # Calculate progress @@ -628,10 +631,11 @@ class PDFChunker: return summaries - except Exception: - #print(f"Error in batch_summarize_images: {str(e)}") - #print("Returning placeholder summaries") - return {number: "Error: No summary available" for number in images} + except Exception as e: + # Print errors to stderr so they don't interfere with JSON output + print(json.dumps({"error": str(e)}), file=sys.stderr) + sys.stderr.flush() + class DocumentType(Enum): """ @@ -664,7 +668,7 @@ class Document: Represents a document being processed, such as a PDF, handling chunking, embedding, and summarization. """ - def __init__(self, file_data: bytes, file_name: str, job_id: str): + def __init__(self, file_path: str, file_name: str, job_id: str, output_folder: str): """ Initialize the Document with file data, file name, and job ID. @@ -672,28 +676,38 @@ class Document: :param file_name: The name of the file being processed. :param job_id: The job ID associated with this document processing task. """ - self.file_data = file_data + self.output_folder = output_folder self.file_name = file_name + self.file_path = file_path self.job_id = job_id self.type = self._get_document_type(file_name) # Determine the document type (PDF, CSV, etc.) self.doc_id = job_id # Use the job ID as the document ID self.chunks = [] # List to hold text and visual chunks self.num_pages = 0 # Number of pages in the document (if applicable) self.summary = "" # The generated summary for the document - self._process() # Start processing the document def _process(self): """ Process the document: extract chunks, embed them, and generate a summary. """ - pdf_chunker = PDFChunker(output_folder="output") # Initialize the PDF chunker - self.chunks = asyncio.run(pdf_chunker.chunk_pdf(self.file_data, self.file_name, self.doc_id, self.job_id)) # Extract chunks - - self.num_pages = self._get_pdf_pages() # Get the number of pages in the document + with open(self.file_path, 'rb') as file: + pdf_data = file.read() + pdf_chunker = PDFChunker(output_folder=self.output_folder, doc_id=self.doc_id) # Initialize PDFChunker + self.chunks = asyncio.run(pdf_chunker.chunk_pdf(pdf_data, os.path.basename(self.file_path), self.doc_id, self.job_id)) # Extract chunks + self.num_pages = self._get_pdf_pages(pdf_data) # Get the number of pages in the document self._embed_chunks() # Embed the text chunks into embeddings self.summary = self._generate_summary() # Generate a summary for the document + def _get_pdf_pages(self, pdf_data: bytes) -> int: + """ + Get the total number of pages in the PDF document. + """ + pdf_file = io.BytesIO(pdf_data) # Convert the file data to an in-memory binary stream + pdf_reader = PdfReader(pdf_file) # Initialize PDF reader + return len(pdf_reader.pages) # Return the number of pages in the PDF + + def _get_document_type(self, file_name: str) -> DocumentType: """ Determine the document type based on its file extension. @@ -708,33 +722,24 @@ class Document: except ValueError: raise FileTypeNotSupportedException(extension) # Raise exception if file type is unsupported - def _get_pdf_pages(self) -> int: - """ - Get the total number of pages in the PDF document. - - :return: The number of pages in the PDF. - """ - pdf_file = io.BytesIO(self.file_data) # Convert the file data to an in-memory binary stream - pdf_reader = PdfReader(pdf_file) # Initialize PDF reader - return len(pdf_reader.pages) # Return the number of pages in the PDF def _embed_chunks(self) -> None: """ Embed the text chunks using the Cohere API. """ - co = cohere.Client(os.getenv("COHERE_API_KEY")) # Initialize Cohere client with API key + openai = OpenAI() # Initialize Cohere client with API key batch_size = 90 # Batch size for embedding chunks_len = len(self.chunks) # Total number of chunks to embed for i in tqdm(range(0, chunks_len, batch_size), desc="Embedding Chunks"): batch = self.chunks[i: min(i + batch_size, chunks_len)] # Get batch of chunks texts = [chunk['metadata']['text'] for chunk in batch] # Extract text from each chunk - chunk_embs_batch = co.embed( - texts=texts, - model="embed-english-v3.0", # Use Cohere's embedding model - input_type="search_document" # Specify input type + chunk_embs_batch = openai.embeddings.create( + model="text-embedding-3-large", + input=texts, + encoding_format="float" ) - for j, emb in enumerate(chunk_embs_batch.embeddings): - self.chunks[i + j]['values'] = emb # Store the embeddings in the corresponding chunks + for j, data_val in enumerate(chunk_embs_batch.data): + self.chunks[i + j]['values'] = data_val.embedding # Store the embeddings in the corresponding chunks def _generate_summary(self) -> str: """ @@ -796,38 +801,34 @@ class Document: "doc_id": self.doc_id }, indent=2) # Convert the document's attributes to JSON format - -def process_document(file_data, file_name, job_id): +def process_document(file_path, job_id, output_folder): """ Top-level function to process a document and return the JSON output. - :param file_data: The binary data of the file being processed. - :param file_name: The name of the file being processed. + :param file_path: The path to the file being processed. :param job_id: The job ID for this document processing task. :return: The processed document's data in JSON format. """ - new_document = Document(file_data, file_name, job_id) # Create a new Document object - return new_document.to_json() # Return the document's JSON data - + new_document = Document(file_path, file_path, job_id, output_folder) + return new_document.to_json() def main(): """ Main entry point for the script, called with arguments from Node.js. """ if len(sys.argv) != 4: - print(json.dumps({"error": "Invalid arguments"}), file=sys.stderr) # Print error if incorrect number of arguments + print(json.dumps({"error": "Invalid arguments"}), file=sys.stderr) return - job_id = sys.argv[1] # Get the job ID from command-line arguments - file_name = sys.argv[2] # Get the file name from command-line arguments - file_data = sys.argv[3] # Get the base64-encoded file data from command-line arguments + job_id = sys.argv[1] + file_path = sys.argv[2] + output_folder = sys.argv[3] # Get the output folder from arguments try: - # Decode the base64 file data - file_bytes = base64.b64decode(file_data) - + os.makedirs(output_folder, exist_ok=True) + # Process the document - document_result = process_document(file_bytes, file_name, job_id) + document_result = process_document(file_path, job_id, output_folder) # Pass output_folder # Output the final result as JSON to stdout print(document_result) @@ -838,6 +839,5 @@ def main(): print(json.dumps({"error": str(e)}), file=sys.stderr) sys.stderr.flush() - if __name__ == "__main__": - main() # Execute the main function when the script is run + main() # Execute the main function when the script is run
\ No newline at end of file |