1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
|
import asyncio
import concurrent
import sys
from tqdm.asyncio import tqdm_asyncio # Progress bar for async tasks
import PIL
from anthropic import Anthropic # For language model API
from packaging.version import parse # Version checking
import pytesseract # OCR library for text extraction from images
import re
import dotenv # For environment variable loading
from lxml import etree # XML parsing
from tqdm import tqdm # Progress bar for non-async tasks
import fitz # PyMuPDF, PDF processing library
from PIL import Image, ImageDraw # Image processing
from typing import List, Dict, Any, TypedDict # Typing for function annotations
from ultralyticsplus import YOLO # Object detection model (YOLO)
import base64
import io
import json
import os
import uuid # For generating unique IDs
from enum import Enum # Enums for types like document type and purpose
import openai
import numpy as np
from PyPDF2 import PdfReader # PDF text extraction
from openai import OpenAI # OpenAI client for text completion
from sklearn.cluster import KMeans # Clustering for summarization
import warnings
# Silence specific warnings
warnings.filterwarnings('ignore', message="Valid config keys have changed")
warnings.filterwarnings('ignore', message="torch.load")
dotenv.load_dotenv() # Load environment variables
# Fix for newer versions of PIL
# if parse(PIL.__version__) >= parse('10.0.0'):
# Image.LINEAR = Image.BILINEAR
# Global dictionary to track progress of document processing jobs
current_progress = {}
def update_progress(job_id, step, progress_value):
"""
Output the progress in JSON format to stdout for the Node.js process to capture.
:param job_id: The unique identifier for the processing job.
:param step: The current step of the job.
:param progress_value: The percentage of completion for the current step.
"""
progress_data = {
"job_id": job_id,
"step": step,
"progress": progress_value
}
print(f"PROGRESS:{json.dumps(progress_data)}", file=sys.stderr)
sys.stderr.flush()
class ElementExtractor:
"""
A class that uses a YOLO model to extract tables and images from a PDF page.
"""
def __init__(self, output_folder: str, doc_id: str):
"""
Initializes the ElementExtractor with the output folder for saving images and the YOLO model.
:param output_folder: Path to the folder where extracted elements will be saved.
"""
self.doc_id = doc_id
self.output_folder = os.path.join(output_folder, doc_id)
os.makedirs(self.output_folder, exist_ok=True)
self.model = YOLO('keremberke/yolov8m-table-extraction') # Load YOLO model for table extraction
self.model.overrides['conf'] = 0.25 # Set confidence threshold for detection
self.model.overrides['iou'] = 0.45 # Set Intersection over Union (IoU) threshold
self.padding = 5 # Padding around detected elements
async def extract_elements(self, page, padding: int = 20) -> List[Dict[str, Any]]:
"""
Asynchronously extract tables and images from a PDF page.
:param page: A Page object representing a PDF page.
:param padding: Padding around the extracted elements.
:return: A list of dictionaries containing the extracted elements.
"""
tasks = [
asyncio.create_task(self.extract_tables(page.image, page.page_num)), # Extract tables from the page
asyncio.create_task(self.extract_images(page.page, page.image, page.page_num)) # Extract images from the page
]
results = await asyncio.gather(*tasks) # Wait for both tasks to complete
return [item for sublist in results for item in sublist] # Flatten and return results
async def extract_tables(self, img: Image.Image, page_num: int) -> List[Dict[str, Any]]:
"""
Asynchronously extract tables from a given page image using the YOLO model.
:param img: The image of the PDF page.
:param page_num: The current page number.
:return: A list of dictionaries with metadata about the detected tables.
"""
results = self.model.predict(img, verbose=False) # Predict table locations using YOLO
tables = []
for idx, box in enumerate(results[0].boxes):
x1, y1, x2, y2 = map(int, box.xyxy[0]) # Extract bounding box coordinates
# Draw a red rectangle on the full page image around the table
page_with_outline = img.copy()
draw = ImageDraw.Draw(page_with_outline)
draw.rectangle(
[max(0, x1 + self.padding), max(0, y1 + self.padding), min(page_with_outline.width, x2 + self.padding),
min(page_with_outline.height, y2 + self.padding)], outline="red", width=2) # Draw red outline
# Save the full page with the red outline
table_filename = f"table_page{page_num + 1}_{idx + 1}.png"
table_path = os.path.join(self.output_folder, table_filename)
page_with_outline.save(table_path)
file_path_for_client = f"{self.doc_id}/{table_filename}"
tables.append({
'metadata': {
"type": "table",
"location": [x1 / img.width, y1 / img.height, x2 / img.width, y2 / img.height],
"file_path": file_path_for_client,
"start_page": page_num,
"end_page": page_num,
"base64_data": self.image_to_base64(page_with_outline)
}
})
return tables
async def extract_images(self, page: fitz.Page, img: Image.Image, page_num: int) -> List[Dict[str, Any]]:
"""
Asynchronously extract embedded images from a PDF page.
:param page: A fitz.Page object representing the PDF page.
:param img: The image of the PDF page.
:param page_num: The current page number.
:return: A list of dictionaries with metadata about the detected images.
"""
images = []
image_list = page.get_images(full=True) # Get a list of images on the page
if not image_list:
return images
for img_index, img_info in enumerate(image_list):
xref = img_info[0] # XREF of the image in the PDF
base_image = page.parent.extract_image(xref) # Extract the image by its XREF
image_bytes = base_image["image"]
image = Image.open(io.BytesIO(image_bytes)).convert("RGB") # Ensure it's RGB before saving as PNG
width_ratio = img.width / page.rect.width # Scale factor for width
height_ratio = img.height / page.rect.height # Scale factor for height
# Get image coordinates or default to page rectangle
rect_list = page.get_image_rects(xref)
if rect_list:
rect = rect_list[0]
x1, y1, x2, y2 = rect
else:
rect = page.rect
x1, y1, x2, y2 = rect
# Draw a red rectangle on the full page image around the embedded image
page_with_outline = img.copy()
draw = ImageDraw.Draw(page_with_outline)
draw.rectangle([x1 * width_ratio, y1 * height_ratio, x2 * width_ratio, y2 * height_ratio],
outline="red", width=2) # Draw red outline
# Save the full page with the red outline
image_filename = f"image_page{page_num + 1}_{img_index + 1}.png"
image_path = os.path.join(self.output_folder, image_filename)
page_with_outline.save(image_path)
file_path_for_client = f"{self.doc_id}/{image_filename}"
images.append({
'metadata': {
"type": "image",
"location": [x1 / page.rect.width, y1 / page.rect.height, x2 / page.rect.width,
y2 / page.rect.height],
"file_path": file_path_for_client,
"start_page": page_num,
"end_page": page_num,
"base64_data": self.image_to_base64(image)
}
})
return images
@staticmethod
def image_to_base64(image: Image.Image) -> str:
"""
Convert a PIL image to a base64-encoded string.
:param image: The PIL image to be converted.
:return: The base64-encoded string of the image.
"""
buffered = io.BytesIO()
image.save(buffered, format="PNG") # Save image as PNG to an in-memory buffer
return base64.b64encode(buffered.getvalue()).decode('utf-8') # Convert to base64 and return
class ChunkMetaData(TypedDict):
"""
A TypedDict that defines the metadata structure for chunks of text and visual elements.
"""
text: str
type: str
original_document: str
file_path: str
doc_id: str
location: str
start_page: int
end_page: int
base64_data: str
class Chunk(TypedDict):
"""
A TypedDict that defines the structure for a document chunk, including metadata and embeddings.
"""
id: str
values: List[float]
metadata: ChunkMetaData
class Page:
"""
A class that represents a single PDF page, handling its image representation and element masking.
"""
def __init__(self, page: fitz.Page, page_num: int):
"""
Initializes the Page with its page number and the image representation of the page.
:param page: A fitz.Page object representing the PDF page.
:param page_num: The number of the page in the PDF.
"""
self.page = page
self.page_num = page_num
# Get high-resolution image of the page (for table/image extraction)
self.pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))
self.image = Image.frombytes("RGB", [self.pix.width, self.pix.height], self.pix.samples)
self.masked_image = self.image.copy() # Image with masked elements (tables/images)
self.draw = ImageDraw.Draw(self.masked_image)
self.elements = [] # List to store extracted elements
def add_element(self, element):
"""
Adds a detected element (table/image) to the page and masks its location on the page image.
:param element: A dictionary containing metadata about the detected element.
"""
self.elements.append(element)
# Mask the element on the page image by drawing a white rectangle over its location
x1, y1, x2, y2 = [coord * self.image.width if i % 2 == 0 else coord * self.image.height
for i, coord in enumerate(element['metadata']['location'])]
self.draw.rectangle([x1, y1, x2, y2], fill="white") # Draw a white rectangle to mask the element
class PDFChunker:
"""
The main class responsible for chunking PDF files into text and visual elements (tables/images).
"""
def __init__(self, output_folder: str = "output", doc_id: str = '', image_batch_size: int = 5) -> None:
"""
Initializes the PDFChunker with an output folder and an element extractor for visual elements.
:param output_folder: Folder to store the output files (extracted tables/images).
:param image_batch_size: The batch size for processing visual elements.
"""
self.client = OpenAI() # ← replaces Anthropic()
self.output_folder = output_folder
self.image_batch_size = image_batch_size # Batch size for image processing
self.doc_id = doc_id # Add doc_id
self.element_extractor = ElementExtractor(output_folder, doc_id)
async def chunk_pdf(self, file_data: bytes, file_name: str, doc_id: str, job_id: str) -> List[Dict[str, Any]]:
"""
Processes a PDF file, extracting text and visual elements, and returning structured chunks.
:param file_data: The binary data of the PDF file.
:param file_name: The name of the PDF file.
:param doc_id: The unique document ID for this job.
:param job_id: The unique job ID for the processing task.
:return: A list of structured chunks containing text and visual elements.
"""
with fitz.open(stream=file_data, filetype="pdf") as pdf_document:
num_pages = len(pdf_document) # Get the total number of pages in the PDF
pages = [Page(pdf_document[i], i) for i in tqdm(range(num_pages), desc="Initializing Pages")] # Initialize each page
update_progress(job_id, "Extracting tables and images...", 0)
await self.extract_and_mask_elements(pages, job_id) # Extract and mask elements (tables/images)
update_progress(job_id, "Processing tables and images...", 0)
await self.process_visual_elements(pages, self.image_batch_size, job_id) # Process visual elements
update_progress(job_id, "Extracting text...", 0)
page_texts = await self.extract_text_from_masked_pages(pages, job_id) # Extract text from masked pages
update_progress(job_id, "Processing text...", 0)
text_chunks = self.chunk_text_with_metadata(page_texts, max_words=1000, job_id=job_id) # Chunk text into smaller parts
# Combine text and visual elements into a unified structure (chunks)
chunks = self.combine_chunks(text_chunks, [elem for page in pages for elem in page.elements], file_name,
doc_id)
return chunks
async def extract_and_mask_elements(self, pages: List[Page], job_id: str):
"""
Extract visual elements (tables and images) from each page and mask them on the page.
:param pages: A list of Page objects representing the PDF pages.
:param job_id: The unique job ID for the processing task.
"""
total_pages = len(pages)
tasks = []
for i, page in enumerate(pages):
tasks.append(asyncio.create_task(self.element_extractor.extract_elements(page))) # Extract elements asynchronously
progress = ((i + 1) / total_pages) * 100 # Calculate progress
update_progress(job_id, "Extracting tables and images...", progress)
# Gather all extraction results
results = await asyncio.gather(*tasks)
# Mask the detected elements on the page images
for page, elements in zip(pages, results):
for element in elements:
page.add_element(element) # Mask each extracted element on the page
async def process_visual_elements(self, pages: List[Page], image_batch_size: int, job_id: str) -> List[Dict[str, Any]]:
"""
Process extracted visual elements in batches, generating summaries or descriptions.
:param pages: A list of Page objects representing the PDF pages.
:param image_batch_size: The batch size for processing visual elements.
:param job_id: The unique job ID for the processing task.
:return: A list of processed elements with metadata and generated summaries.
"""
pre_elements = [element for page in pages for element in page.elements] # Flatten list of elements
processed_elements = []
total_batches = (len(pre_elements) // image_batch_size) + 1 # Calculate total number of batches
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
# Process elements in batches
for i in tqdm(range(0, len(pre_elements), image_batch_size), desc="Processing Visual Elements"):
batch = pre_elements[i:i + image_batch_size]
# Run image summarization in a separate thread
summaries = await loop.run_in_executor(
executor, self.batch_summarize_images,
{j + 1: element.get('metadata').get('base64_data') for j, element in enumerate(batch)}
)
# Append generated summaries to the elements
for j, elem in enumerate(batch, start=1):
if j in summaries:
elem['metadata']['text'] = re.sub(r'^(Image|Table):\s*', '', summaries[j])
elem['metadata']['base64_data'] = ''
processed_elements.append(elem)
progress = ((i // image_batch_size) + 1) / total_batches * 100 # Calculate progress
update_progress(job_id, "Processing tables and images...", progress)
return processed_elements
async def extract_text_from_masked_pages(self, pages: List[Page], job_id: str) -> Dict[int, str]:
"""
Extract text from masked page images (where tables and images have been masked out).
:param pages: A list of Page objects representing the PDF pages.
:param job_id: The unique job ID for the processing task.
:return: A dictionary mapping page numbers to extracted text.
"""
total_pages = len(pages)
tasks = []
for i, page in enumerate(pages):
tasks.append(asyncio.create_task(self.extract_text(page.masked_image, page.page_num))) # Perform OCR on each page
progress = ((i + 1) / total_pages) * 100 # Calculate progress
update_progress(job_id, "Extracting text...", progress)
# Return extracted text from each page
return dict(await asyncio.gather(*tasks))
@staticmethod
async def extract_text(image: Image.Image, page_num: int) -> (int, str):
"""
Perform OCR on the provided image to extract text.
:param image: The PIL image of the page.
:param page_num: The current page number.
:return: A tuple containing the page number and the extracted text.
"""
result = pytesseract.image_to_string(image) # Extract text using Tesseract OCR
return page_num + 1, result.strip() # Return the page number and extracted text
def chunk_text_with_metadata(self, page_texts: Dict[int, str], max_words: int, job_id: str) -> List[Dict[str, Any]]:
"""
Break the extracted text into smaller chunks with metadata (e.g., page numbers).
:param page_texts: A dictionary mapping page numbers to extracted text.
:param max_words: The maximum number of words allowed in a chunk.
:param job_id: The unique job ID for the processing task.
:return: A list of dictionaries containing text chunks with metadata.
"""
chunks = []
current_chunk = ""
current_start_page = 0
total_words = 0
def add_chunk(chunk_text, start_page, end_page):
# Add a chunk of text with metadata
chunks.append({
"text": chunk_text.strip(),
"start_page": start_page,
"end_page": end_page
})
total_pages = len(page_texts)
for i, (page_num, text) in enumerate(tqdm(page_texts.items(), desc="Chunking Text")):
sentences = self.split_into_sentences(text)
for sentence in sentences:
word_count = len(sentence.split())
# If adding this sentence exceeds max_words, create a new chunk
if total_words + word_count > max_words:
add_chunk(current_chunk, current_start_page, page_num)
current_chunk = sentence + " "
current_start_page = page_num
total_words = word_count
else:
current_chunk += sentence + " "
total_words += word_count
current_chunk += "\n\n"
progress = ((i + 1) / total_pages) * 100 # Calculate progress
update_progress(job_id, "Processing text...", progress)
# Add the last chunk if there is leftover text
if current_chunk.strip():
add_chunk(current_chunk, current_start_page, page_num)
return chunks
@staticmethod
def split_into_sentences(text):
"""
Split the text into sentences using regular expressions.
:param text: The raw text to be split into sentences.
:return: A list of sentences.
"""
return re.split(r'(?<=[.!?])\s+', text)
@staticmethod
def combine_chunks(text_chunks: List[Dict[str, Any]], visual_elements: List[Dict[str, Any]], pdf_path: str,
doc_id: str) -> List[Chunk]:
"""
Combine text and visual chunks into a unified list.
:param text_chunks: A list of dictionaries containing text chunks with metadata.
:param visual_elements: A list of dictionaries containing visual elements (tables/images) with metadata.
:param pdf_path: The path to the original PDF file.
:param doc_id: The unique document ID for this job.
:return: A list of Chunk objects representing the combined data.
"""
combined_chunks = []
# Add text chunks
for text_chunk in text_chunks:
chunk_metadata: ChunkMetaData = {
"text": text_chunk["text"],
"type": "text",
"original_document": pdf_path,
"file_path": "",
"location": "",
"start_page": text_chunk["start_page"],
"end_page": text_chunk["end_page"],
"base64_data": "",
"doc_id": doc_id,
}
chunk_dict: Chunk = {
"id": str(uuid.uuid4()), # Generate a unique ID for the chunk
"values": [],
"metadata": chunk_metadata,
}
combined_chunks.append(chunk_dict)
# Add visual chunks (tables/images)
for elem in visual_elements:
visual_chunk_metadata: ChunkMetaData = {
"type": elem['metadata']['type'],
"file_path": elem['metadata']['file_path'],
"text": elem['metadata'].get('text', ''),
"start_page": elem['metadata']['start_page'],
"end_page": elem['metadata']['end_page'],
"location": str(elem['metadata']['location']),
"base64_data": elem['metadata']['base64_data'],
"doc_id": doc_id,
"original_document": pdf_path,
}
visual_chunk_dict: Chunk = {
"id": str(uuid.uuid4()), # Generate a unique ID for the visual chunk
"values": [],
"metadata": visual_chunk_metadata,
}
combined_chunks.append(visual_chunk_dict)
return combined_chunks
def batch_summarize_images(self, images: Dict[int, str]) -> Dict[int, str]:
"""
Summarise a batch of images/tables with GPT‑4o using Structured Outputs.
:param images: {image_number: base64_png}
:return: {image_number: summary_text}
"""
# -------- 1. Build the prompt -----------
content: list[dict] = []
for n, b64 in images.items():
content.append({"type": "text",
"text": f"\nImage {n} (outlined in red on the page):"})
content.append({"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{b64}"}})
messages = [
{
"role": "system",
"content": (
"You are generating retrieval‑ready summaries for each highlighted "
"image or table. Start by identifying whether the element is an "
"image or a table, then write one informative sentence that a vector "
"search would find useful. Provide detail but limit to a couple of paragraphs per image."
),
},
{"role": "user", "content": content},
]
schema = {
"type": "object",
"properties": {
"summaries": {
"type": "array",
"items": {
"type": "object",
"properties": {
"number": {"type": "integer"},
"type": {"type": "string", "enum": ["image", "table"]},
"summary": {"type": "string"}
},
"required": ["number", "type", "summary"],
"additionalProperties": False
}
}
},
"required": ["summaries"],
"additionalProperties": False
}
# ---------- OpenAI call -----------------------------------------------------
try:
resp = self.client.chat.completions.create(
model="gpt-4o",
messages=messages,
max_tokens=400 * len(images),
temperature=0,
response_format={
"type": "json_schema",
"json_schema": {
"name": "image_batch_summaries", # ← REQUIRED
"schema": schema, # ← REQUIRED
"strict": True # ← strongly recommended
},
},
)
parsed = json.loads(resp.choices[0].message.content) # schema‑safe
return {item["number"]: item["summary"]
for item in parsed["summaries"]}
except Exception as e:
# Log and fall back gracefully
print(json.dumps({"error": str(e)}), file=sys.stderr, flush=True)
return {}
class DocumentType(Enum):
"""
Enum representing different types of documents that can be processed.
"""
PDF = "pdf" # PDF file type
CSV = "csv" # CSV file type
TXT = "txt" # Plain text file type
HTML = "html" # HTML file type
class FileTypeNotSupportedException(Exception):
"""
Exception raised when a file type is unsupported during document processing.
"""
def __init__(self, file_extension: str):
"""
Initialize the exception with the unsupported file extension.
:param file_extension: The file extension that triggered the exception.
"""
self.file_extension = file_extension
self.message = f"File type '{file_extension}' is not supported."
super().__init__(self.message) # Call the parent class constructor with the message
class Document:
"""
Represents a document being processed, such as a PDF, handling chunking, embedding, and summarization.
"""
def __init__(self, file_path: str, file_name: str, job_id: str, output_folder: str, doc_id: str):
"""
Initialize the Document with file data, file name, and job ID.
:param file_data: The binary data of the file being processed.
:param file_name: The name of the file being processed.
:param job_id: The job ID associated with this document processing task.
"""
self.output_folder = output_folder
self.file_name = file_name
self.file_path = file_path
self.job_id = job_id
self.type = self._get_document_type(file_name) # Determine the document type (PDF, CSV, etc.)
self.doc_id = doc_id # Use the job ID as the document ID
self.chunks = [] # List to hold text and visual chunks
self.num_pages = 0 # Number of pages in the document (if applicable)
self.summary = "" # The generated summary for the document
self._process() # Start processing the document
def _process(self):
"""
Process the document: extract chunks, embed them, and generate a summary.
"""
with open(self.file_path, 'rb') as file:
pdf_data = file.read()
pdf_chunker = PDFChunker(output_folder=self.output_folder, doc_id=self.doc_id) # Initialize PDFChunker
self.chunks = asyncio.run(pdf_chunker.chunk_pdf(pdf_data, os.path.basename(self.file_path), self.doc_id, self.job_id)) # Extract chunks
self.num_pages = self._get_pdf_pages(pdf_data) # Get the number of pages in the document
self._embed_chunks() # Embed the text chunks into embeddings
self.summary = self._generate_summary() # Generate a summary for the document
def _get_pdf_pages(self, pdf_data: bytes) -> int:
"""
Get the total number of pages in the PDF document.
"""
pdf_file = io.BytesIO(pdf_data) # Convert the file data to an in-memory binary stream
pdf_reader = PdfReader(pdf_file) # Initialize PDF reader
return len(pdf_reader.pages) # Return the number of pages in the PDF
def _get_document_type(self, file_name: str) -> DocumentType:
"""
Determine the document type based on its file extension.
:param file_name: The name of the file being processed.
:return: The DocumentType enum value corresponding to the file extension.
"""
_, extension = os.path.splitext(file_name) # Split the file name to get the extension
extension = extension.lower().lstrip('.') # Convert to lowercase and remove leading period
try:
return DocumentType(extension) # Try to match the extension to a DocumentType
except ValueError:
raise FileTypeNotSupportedException(extension) # Raise exception if file type is unsupported
def _embed_chunks(self) -> None:
"""
Embed the text chunks using the Cohere API.
"""
openai = OpenAI() # Initialize Cohere client with API key
batch_size = 90 # Batch size for embedding
chunks_len = len(self.chunks) # Total number of chunks to embed
for i in tqdm(range(0, chunks_len, batch_size), desc="Embedding Chunks"):
batch = self.chunks[i: min(i + batch_size, chunks_len)] # Get batch of chunks
texts = [chunk['metadata']['text'] for chunk in batch] # Extract text from each chunk
chunk_embs_batch = openai.embeddings.create(
model="text-embedding-3-large",
input=texts,
encoding_format="float"
)
for j, data_val in enumerate(chunk_embs_batch.data):
self.chunks[i + j]['values'] = data_val.embedding # Store the embeddings in the corresponding chunks
def _generate_summary(self) -> str:
"""
Generate a summary of the document using KMeans clustering and a language model.
:return: The generated summary of the document.
"""
# num_clusters = min(10, len(self.chunks)) # Set number of clusters for KMeans, capped at 10
# kmeans = KMeans(n_clusters=num_clusters, random_state=42) # Initialize KMeans with 10 clusters
# doc_chunks = [chunk['values'] for chunk in self.chunks if 'values' in chunk] # Extract embeddings
# cluster_labels = kmeans.fit_predict(doc_chunks) # Assign each chunk to a cluster
doc_chunks = [chunk['values'] for chunk in self.chunks if 'values' in chunk]
if not doc_chunks:
raise ValueError("No valid embedded chunks to summarize.")
# Remove duplicates (e.g., from OCR-ed blank pages or repeated captions)
unique_chunks = np.unique(np.array(doc_chunks), axis=0)
# Dynamically scale number of clusters to available signal
num_clusters = min(10, len(unique_chunks))
kmeans = KMeans(n_clusters=num_clusters, random_state=42).fit(unique_chunks)
# Predict cluster labels for original chunks (not just unique ones)
cluster_labels = kmeans.predict(np.array(doc_chunks))
# Select representative chunks from each cluster
selected_chunks = []
for i in range(num_clusters):
# cluster_chunks = [chunk for chunk, label in zip(self.chunks, cluster_labels) if label == i] # Get all chunks in this cluster
# cluster_embs = [emb for emb, label in zip(doc_chunks, cluster_labels) if label == i] # Get embeddings for this cluster
cluster_idxs = np.where(cluster_labels == i)[0]
if len(cluster_idxs) == 0:
continue # skip empty clusters (shouldn't happen after downsizing)
centroid = kmeans.cluster_centers_[i] # Get the centroid of the cluster
distances = [np.linalg.norm(doc_chunks[idx] - centroid) for idx in cluster_idxs]
closest_idx = cluster_idxs[int(np.argmin(distances))]
selected_chunks.append(self.chunks[closest_idx])
# distances = [np.linalg.norm(np.array(emb) - centroid) for emb in cluster_embs] # Compute distance to centroid
# closest_chunk = cluster_chunks[np.argmin(distances)] # Select chunk closest to the centroid
# selected_chunks.append(closest_chunk)
# Combine selected chunks into a summary
combined_text = "\n\n".join([chunk['metadata']['text'] for chunk in selected_chunks]) # Concatenate chunk texts
client = OpenAI() # Initialize OpenAI client for text generation
completion = client.chat.completions.create(
model="gpt-4o", # Specify the language model
messages=[
{"role": "system",
"content": "You are an AI assistant tasked with summarizing a document. You are provided with important chunks from the document and provide a summary, as best you can, of what the document will contain overall. Be concise and brief with your response."},
{"role": "user", "content": f"""Please provide a comprehensive summary of what you think the document from which these chunks were sampled would be.
Ensure the summary captures the main ideas and key points from all provided chunks. Be concise and brief and only provide the summary in paragraph form.
Sample text chunks:
```
{combined_text}
```
**********
Summary:
"""}
],
max_tokens=300 # Set max tokens for the summary
)
return completion.choices[0].message.content.strip() # Return the generated summary
def to_json(self) -> str:
"""
Return the document's data in JSON format.
:return: JSON string representing the document's metadata, chunks, and summary.
"""
return json.dumps({
"file_name": self.file_name,
"num_pages": self.num_pages,
"summary": self.summary,
"chunks": self.chunks,
"type": self.type.value,
"doc_id": self.doc_id
}, indent=2) # Convert the document's attributes to JSON format
def process_document(file_path, job_id, output_folder, doc_id):
"""
Top-level function to process a document and return the JSON output.
:param file_path: The path to the file being processed.
:param job_id: The job ID for this document processing task.
:return: The processed document's data in JSON format.
"""
new_document = Document(file_path, file_path, job_id, output_folder, doc_id)
return new_document.to_json()
def main():
"""
Main entry point for the script, called with arguments from Node.js.
"""
if len(sys.argv) != 5:
print(json.dumps({"error": "Invalid arguments"}), file=sys.stderr)
return
job_id = sys.argv[1]
file_path = sys.argv[2]
output_folder = sys.argv[3] # Get the output folder from arguments
doc_id = sys.argv[4]
try:
os.makedirs(output_folder, exist_ok=True)
# Process the document
document_result = process_document(file_path, job_id, output_folder,doc_id) # Pass output_folder
# Output the final result as JSON to stdout
print(document_result)
sys.stdout.flush()
except Exception as e:
# Print errors to stderr so they don't interfere with JSON output
print(json.dumps({"error": str(e)}), file=sys.stderr)
sys.stderr.flush()
if __name__ == "__main__":
main() # Execute the main function when the script is run
|