aboutsummaryrefslogtreecommitdiff
path: root/src/server/chunker/pdf_chunker.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/chunker/pdf_chunker.py')
-rw-r--r--src/server/chunker/pdf_chunker.py122
1 files changed, 61 insertions, 61 deletions
diff --git a/src/server/chunker/pdf_chunker.py b/src/server/chunker/pdf_chunker.py
index 4fe3b9dbf..697550f2e 100644
--- a/src/server/chunker/pdf_chunker.py
+++ b/src/server/chunker/pdf_chunker.py
@@ -21,7 +21,7 @@ import json
import os
import uuid # For generating unique IDs
from enum import Enum # Enums for types like document type and purpose
-import cohere # Embedding client
+import openai
import numpy as np
from PyPDF2 import PdfReader # PDF text extraction
from openai import OpenAI # OpenAI client for text completion
@@ -35,8 +35,8 @@ warnings.filterwarnings('ignore', message="torch.load")
dotenv.load_dotenv() # Load environment variables
# Fix for newer versions of PIL
-if parse(PIL.__version__) >= parse('10.0.0'):
- Image.LINEAR = Image.BILINEAR
+# if parse(PIL.__version__) >= parse('10.0.0'):
+# Image.LINEAR = Image.BILINEAR
# Global dictionary to track progress of document processing jobs
current_progress = {}
@@ -54,8 +54,9 @@ def update_progress(job_id, step, progress_value):
"step": step,
"progress": progress_value
}
- print(json.dumps(progress_data), file=sys.stderr) # Use stderr for progress logs
- sys.stderr.flush() # Ensure it's sent immediately
+ print(f"PROGRESS:{json.dumps(progress_data)}", file=sys.stderr)
+ sys.stderr.flush()
+
class ElementExtractor:
@@ -63,13 +64,15 @@ class ElementExtractor:
A class that uses a YOLO model to extract tables and images from a PDF page.
"""
- def __init__(self, output_folder: str):
+ def __init__(self, output_folder: str, doc_id: str):
"""
Initializes the ElementExtractor with the output folder for saving images and the YOLO model.
:param output_folder: Path to the folder where extracted elements will be saved.
"""
- self.output_folder = output_folder
+ self.doc_id = doc_id
+ self.output_folder = os.path.join(output_folder, doc_id)
+ os.makedirs(self.output_folder, exist_ok=True)
self.model = YOLO('keremberke/yolov8m-table-extraction') # Load YOLO model for table extraction
self.model.overrides['conf'] = 0.25 # Set confidence threshold for detection
self.model.overrides['iou'] = 0.45 # Set Intersection over Union (IoU) threshold
@@ -116,17 +119,16 @@ class ElementExtractor:
table_path = os.path.join(self.output_folder, table_filename)
page_with_outline.save(table_path)
- # Convert the full-page image with red outline to base64
- base64_data = self.image_to_base64(page_with_outline)
+ file_path_for_client = f"{self.doc_id}/{table_filename}"
tables.append({
'metadata': {
"type": "table",
"location": [x1 / img.width, y1 / img.height, x2 / img.width, y2 / img.height],
- "file_path": table_path,
+ "file_path": file_path_for_client,
"start_page": page_num,
"end_page": page_num,
- "base64_data": base64_data,
+ "base64_data": self.image_to_base64(page_with_outline)
}
})
@@ -175,18 +177,17 @@ class ElementExtractor:
image_path = os.path.join(self.output_folder, image_filename)
page_with_outline.save(image_path)
- # Convert the full-page image with red outline to base64
- base64_data = self.image_to_base64(page_with_outline)
+ file_path_for_client = f"{self.doc_id}/{image_filename}"
images.append({
'metadata': {
"type": "image",
"location": [x1 / page.rect.width, y1 / page.rect.height, x2 / page.rect.width,
y2 / page.rect.height],
- "file_path": image_path,
+ "file_path": file_path_for_client,
"start_page": page_num,
"end_page": page_num,
- "base64_data": base64_data,
+ "base64_data": self.image_to_base64(image)
}
})
@@ -268,7 +269,7 @@ class PDFChunker:
The main class responsible for chunking PDF files into text and visual elements (tables/images).
"""
- def __init__(self, output_folder: str = "output", image_batch_size: int = 5) -> None:
+ def __init__(self, output_folder: str = "output", doc_id: str = '', image_batch_size: int = 5) -> None:
"""
Initializes the PDFChunker with an output folder and an element extractor for visual elements.
@@ -278,7 +279,8 @@ class PDFChunker:
self.client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) # Initialize the Anthropic API client
self.output_folder = output_folder
self.image_batch_size = image_batch_size # Batch size for image processing
- self.element_extractor = ElementExtractor(output_folder) # Initialize the element extractor
+ self.doc_id = doc_id # Add doc_id
+ self.element_extractor = ElementExtractor(output_folder, doc_id)
async def chunk_pdf(self, file_data: bytes, file_name: str, doc_id: str, job_id: str) -> List[Dict[str, Any]]:
"""
@@ -363,6 +365,7 @@ class PDFChunker:
for j, elem in enumerate(batch, start=1):
if j in summaries:
elem['metadata']['text'] = re.sub(r'^(Image|Table):\s*', '', summaries[j])
+ elem['metadata']['base64_data'] = ''
processed_elements.append(elem)
progress = ((i // image_batch_size) + 1) / total_batches * 100 # Calculate progress
@@ -628,10 +631,11 @@ class PDFChunker:
return summaries
- except Exception:
- #print(f"Error in batch_summarize_images: {str(e)}")
- #print("Returning placeholder summaries")
- return {number: "Error: No summary available" for number in images}
+ except Exception as e:
+ # Print errors to stderr so they don't interfere with JSON output
+ print(json.dumps({"error": str(e)}), file=sys.stderr)
+ sys.stderr.flush()
+
class DocumentType(Enum):
"""
@@ -664,7 +668,7 @@ class Document:
Represents a document being processed, such as a PDF, handling chunking, embedding, and summarization.
"""
- def __init__(self, file_data: bytes, file_name: str, job_id: str):
+ def __init__(self, file_path: str, file_name: str, job_id: str, output_folder: str):
"""
Initialize the Document with file data, file name, and job ID.
@@ -672,28 +676,38 @@ class Document:
:param file_name: The name of the file being processed.
:param job_id: The job ID associated with this document processing task.
"""
- self.file_data = file_data
+ self.output_folder = output_folder
self.file_name = file_name
+ self.file_path = file_path
self.job_id = job_id
self.type = self._get_document_type(file_name) # Determine the document type (PDF, CSV, etc.)
self.doc_id = job_id # Use the job ID as the document ID
self.chunks = [] # List to hold text and visual chunks
self.num_pages = 0 # Number of pages in the document (if applicable)
self.summary = "" # The generated summary for the document
-
self._process() # Start processing the document
def _process(self):
"""
Process the document: extract chunks, embed them, and generate a summary.
"""
- pdf_chunker = PDFChunker(output_folder="output") # Initialize the PDF chunker
- self.chunks = asyncio.run(pdf_chunker.chunk_pdf(self.file_data, self.file_name, self.doc_id, self.job_id)) # Extract chunks
-
- self.num_pages = self._get_pdf_pages() # Get the number of pages in the document
+ with open(self.file_path, 'rb') as file:
+ pdf_data = file.read()
+ pdf_chunker = PDFChunker(output_folder=self.output_folder, doc_id=self.doc_id) # Initialize PDFChunker
+ self.chunks = asyncio.run(pdf_chunker.chunk_pdf(pdf_data, os.path.basename(self.file_path), self.doc_id, self.job_id)) # Extract chunks
+ self.num_pages = self._get_pdf_pages(pdf_data) # Get the number of pages in the document
self._embed_chunks() # Embed the text chunks into embeddings
self.summary = self._generate_summary() # Generate a summary for the document
+ def _get_pdf_pages(self, pdf_data: bytes) -> int:
+ """
+ Get the total number of pages in the PDF document.
+ """
+ pdf_file = io.BytesIO(pdf_data) # Convert the file data to an in-memory binary stream
+ pdf_reader = PdfReader(pdf_file) # Initialize PDF reader
+ return len(pdf_reader.pages) # Return the number of pages in the PDF
+
+
def _get_document_type(self, file_name: str) -> DocumentType:
"""
Determine the document type based on its file extension.
@@ -708,33 +722,24 @@ class Document:
except ValueError:
raise FileTypeNotSupportedException(extension) # Raise exception if file type is unsupported
- def _get_pdf_pages(self) -> int:
- """
- Get the total number of pages in the PDF document.
-
- :return: The number of pages in the PDF.
- """
- pdf_file = io.BytesIO(self.file_data) # Convert the file data to an in-memory binary stream
- pdf_reader = PdfReader(pdf_file) # Initialize PDF reader
- return len(pdf_reader.pages) # Return the number of pages in the PDF
def _embed_chunks(self) -> None:
"""
Embed the text chunks using the Cohere API.
"""
- co = cohere.Client(os.getenv("COHERE_API_KEY")) # Initialize Cohere client with API key
+ openai = OpenAI() # Initialize Cohere client with API key
batch_size = 90 # Batch size for embedding
chunks_len = len(self.chunks) # Total number of chunks to embed
for i in tqdm(range(0, chunks_len, batch_size), desc="Embedding Chunks"):
batch = self.chunks[i: min(i + batch_size, chunks_len)] # Get batch of chunks
texts = [chunk['metadata']['text'] for chunk in batch] # Extract text from each chunk
- chunk_embs_batch = co.embed(
- texts=texts,
- model="embed-english-v3.0", # Use Cohere's embedding model
- input_type="search_document" # Specify input type
+ chunk_embs_batch = openai.embeddings.create(
+ model="text-embedding-3-large",
+ input=texts,
+ encoding_format="float"
)
- for j, emb in enumerate(chunk_embs_batch.embeddings):
- self.chunks[i + j]['values'] = emb # Store the embeddings in the corresponding chunks
+ for j, data_val in enumerate(chunk_embs_batch.data):
+ self.chunks[i + j]['values'] = data_val.embedding # Store the embeddings in the corresponding chunks
def _generate_summary(self) -> str:
"""
@@ -796,38 +801,34 @@ class Document:
"doc_id": self.doc_id
}, indent=2) # Convert the document's attributes to JSON format
-
-def process_document(file_data, file_name, job_id):
+def process_document(file_path, job_id, output_folder):
"""
Top-level function to process a document and return the JSON output.
- :param file_data: The binary data of the file being processed.
- :param file_name: The name of the file being processed.
+ :param file_path: The path to the file being processed.
:param job_id: The job ID for this document processing task.
:return: The processed document's data in JSON format.
"""
- new_document = Document(file_data, file_name, job_id) # Create a new Document object
- return new_document.to_json() # Return the document's JSON data
-
+ new_document = Document(file_path, file_path, job_id, output_folder)
+ return new_document.to_json()
def main():
"""
Main entry point for the script, called with arguments from Node.js.
"""
if len(sys.argv) != 4:
- print(json.dumps({"error": "Invalid arguments"}), file=sys.stderr) # Print error if incorrect number of arguments
+ print(json.dumps({"error": "Invalid arguments"}), file=sys.stderr)
return
- job_id = sys.argv[1] # Get the job ID from command-line arguments
- file_name = sys.argv[2] # Get the file name from command-line arguments
- file_data = sys.argv[3] # Get the base64-encoded file data from command-line arguments
+ job_id = sys.argv[1]
+ file_path = sys.argv[2]
+ output_folder = sys.argv[3] # Get the output folder from arguments
try:
- # Decode the base64 file data
- file_bytes = base64.b64decode(file_data)
-
+ os.makedirs(output_folder, exist_ok=True)
+
# Process the document
- document_result = process_document(file_bytes, file_name, job_id)
+ document_result = process_document(file_path, job_id, output_folder) # Pass output_folder
# Output the final result as JSON to stdout
print(document_result)
@@ -838,6 +839,5 @@ def main():
print(json.dumps({"error": str(e)}), file=sys.stderr)
sys.stderr.flush()
-
if __name__ == "__main__":
- main() # Execute the main function when the script is run
+ main() # Execute the main function when the script is run \ No newline at end of file