@@ -254,8 +254,15 @@ def __init__(
254254 "OCR Service initialized with 'none' backend - image-only processing"
255255 )
256256
257- # Initialize S3 client (used by all backends for image storage)
258- self .s3_client = boto3 .client ("s3" )
257+ # Initialize S3 client with connection pool matching max_workers
258+ s3_config = Config (
259+ retries = {"max_attempts" : 10 , "mode" : "adaptive" },
260+ max_pool_connections = max (self .max_workers , 10 ),
261+ )
262+ self .s3_client = boto3 .client ("s3" , config = s3_config )
263+ logger .info (
264+ f"S3 client initialized with { max (self .max_workers , 10 )} connection pool size"
265+ )
259266
260267 # Initialize document converter for non-PDF formats
261268 self .document_converter = DocumentConverter (dpi = self .dpi or 150 )
@@ -358,37 +365,51 @@ def process_document(self, document: Document) -> Document:
358365 for i in range (num_pages )
359366 }
360367
361- for future in concurrent .futures .as_completed (future_to_page ):
362- page_index = future_to_page [future ]
363- page_id = str (page_index + 1 )
364- try :
365- ocr_result , page_metering = future .result ()
366-
367- # Create Page object and add to document
368- document .pages [page_id ] = Page (
369- page_id = page_id ,
370- image_uri = ocr_result ["image_uri" ],
371- raw_text_uri = ocr_result ["raw_text_uri" ],
372- parsed_text_uri = ocr_result ["parsed_text_uri" ],
373- text_confidence_uri = ocr_result ["text_confidence_uri" ],
374- )
368+ # Start memory monitoring in background thread
369+ memory_monitor_shutdown = self ._start_memory_monitoring ()
370+ completed_pages = 0
375371
376- # Merge metering data
377- document .metering = utils .merge_metering_data (
378- document .metering , page_metering
379- )
380-
381- except Exception as e :
382- import traceback
383-
384- error_msg = (
385- f"Error processing page { page_index + 1 } : { str (e )} "
386- )
387- stack_trace = traceback .format_exc ()
388- logger .error (f"{ error_msg } \n Stack trace:\n { stack_trace } " )
389- document .errors .append (
390- f"{ error_msg } (see logs for full trace)"
391- )
372+ try :
373+ for future in concurrent .futures .as_completed (future_to_page ):
374+ page_index = future_to_page [future ]
375+ page_id = str (page_index + 1 )
376+ try :
377+ ocr_result , page_metering = future .result ()
378+
379+ # Create Page object and add to document
380+ document .pages [page_id ] = Page (
381+ page_id = page_id ,
382+ image_uri = ocr_result ["image_uri" ],
383+ raw_text_uri = ocr_result ["raw_text_uri" ],
384+ parsed_text_uri = ocr_result ["parsed_text_uri" ],
385+ text_confidence_uri = ocr_result [
386+ "text_confidence_uri"
387+ ],
388+ )
389+
390+ # Merge metering data
391+ document .metering = utils .merge_metering_data (
392+ document .metering , page_metering
393+ )
394+
395+ completed_pages += 1
396+
397+ except Exception as e :
398+ import traceback
399+
400+ error_msg = (
401+ f"Error processing page { page_index + 1 } : { str (e )} "
402+ )
403+ stack_trace = traceback .format_exc ()
404+ logger .error (
405+ f"{ error_msg } \n Stack trace:\n { stack_trace } "
406+ )
407+ document .errors .append (
408+ f"{ error_msg } (see logs for full trace)"
409+ )
410+ finally :
411+ # Stop memory monitoring
412+ memory_monitor_shutdown .set ()
392413
393414 pdf_document .close ()
394415
@@ -915,6 +936,51 @@ def _process_image_file_direct(
915936
916937 return result , metering
917938
939+ def _start_memory_monitoring (self ):
940+ """
941+ Start background memory monitoring that logs usage every 5 seconds.
942+
943+ Returns:
944+ Event object that can be set to stop monitoring
945+ """
946+ import threading
947+
948+ shutdown_event = threading .Event ()
949+
950+ def monitor_memory ():
951+ while not shutdown_event .is_set ():
952+ try :
953+ import os
954+
955+ import psutil
956+
957+ process = psutil .Process (os .getpid ())
958+ memory_info = process .memory_info ()
959+ memory_mb = memory_info .rss / (1024 * 1024 ) # Convert to MB
960+
961+ logger .info (f"Memory usage: { memory_mb :.1f} MB" )
962+
963+ # Warning if memory usage is getting high
964+ if memory_mb > 3500 :
965+ logger .warning (
966+ f"HIGH memory usage detected: { memory_mb :.1f} MB"
967+ )
968+
969+ except ImportError :
970+ logger .debug ("psutil not available, skipping memory monitoring" )
971+ break
972+ except Exception as e :
973+ logger .debug (f"Error monitoring memory: { str (e )} " )
974+
975+ # Wait 5 seconds or until shutdown
976+ shutdown_event .wait (5.0 )
977+
978+ # Start monitoring thread
979+ monitor_thread = threading .Thread (target = monitor_memory , daemon = True )
980+ monitor_thread .start ()
981+
982+ return shutdown_event
983+
918984 def _process_single_page_textract (
919985 self ,
920986 page_index : int ,
@@ -937,31 +1003,21 @@ def _process_single_page_textract(
9371003 t0 = time .time ()
9381004 page_id = page_index + 1
9391005
940- # Extract page image - use DPI only for PDF files to prevent upscaling of images
1006+ # Extract page image - now returns image at optimal size directly
9411007 page = pdf_document .load_page (page_index )
9421008 img_bytes = self ._extract_page_image (page , pdf_document .is_pdf , page_id )
9431009
944- # Upload original image to S3
1010+ # Upload processed image to S3 (already at target size if resize config exists)
9451011 image_key = f"{ prefix } /pages/{ page_id } /image.jpg"
9461012 s3 .write_content (img_bytes , output_bucket , image_key , content_type = "image/jpeg" )
9471013
9481014 t1 = time .time ()
9491015 logger .debug (
950- f"Time for image conversion (page { page_id } ): { t1 - t0 :.6f} seconds"
1016+ f"Time for image processing (page { page_id } ): { t1 - t0 :.6f} seconds"
9511017 )
9521018
953- # Resize image for OCR processing if configured
954- ocr_img_bytes = img_bytes # Default to original image
955- if self .resize_config :
956- from idp_common import image
957-
958- target_width = self .resize_config .get ("target_width" )
959- target_height = self .resize_config .get ("target_height" )
960-
961- ocr_img_bytes = image .resize_image (img_bytes , target_width , target_height )
962- logger .debug (
963- f"Resized image for OCR processing (page { page_id } ) to { target_width } x{ target_height } "
964- )
1019+ # Use the extracted image directly for OCR (no additional resize needed)
1020+ ocr_img_bytes = img_bytes
9651021
9661022 # Apply preprocessing if enabled (only for OCR processing, not saved image)
9671023 if self .preprocessing_config and self .preprocessing_config .get ("enabled" ):
@@ -980,6 +1036,15 @@ def _process_single_page_textract(
9801036 Document = {"Bytes" : ocr_img_bytes }
9811037 )
9821038
1039+ # Aggressive memory cleanup - clear large image variables immediately after OCR
1040+ img_bytes = None
1041+ ocr_img_bytes = None
1042+
1043+ # Force garbage collection after processing large images
1044+ import gc
1045+
1046+ gc .collect ()
1047+
9831048 # Extract metering data
9841049 feature_combo = self ._feature_combo ()
9851050 metering = {
@@ -1032,26 +1097,105 @@ def _process_single_page_textract(
10321097
10331098 def _extract_page_image (self , page : fitz .Page , is_pdf : bool , page_id : int ) -> bytes :
10341099 """
1035- Extract image bytes from a page, using DPI only for PDF files.
1100+ Extract image bytes from a page at optimal size to prevent memory issues.
1101+
1102+ If resize config is provided, images are extracted directly at target dimensions
1103+ to avoid creating oversized images that cause OutOfMemory errors.
10361104
10371105 Args:
10381106 page: PyMuPDF page object
10391107 is_pdf: Whether the document is a PDF file
10401108 page_id: Page number for logging
10411109
10421110 Returns:
1043- Image bytes in JPEG format
1111+ Image bytes in JPEG format (at target size if resize config exists)
10441112 """
1045- if is_pdf :
1046- # For PDF files, use specified DPI for quality rendering
1047- pix = page .get_pixmap (dpi = self .dpi )
1048- logger .debug (f"Processing PDF page { page_id } at { self .dpi } DPI" )
1049- else :
1050- # For image files (JPEG, PNG, etc.), preserve original dimensions
1051- pix = page .get_pixmap ()
1052- logger .debug (f"Processing image page { page_id } at original dimensions" )
1113+ pix = None
1114+ try :
1115+ # Check if we should extract at target size to avoid memory issues
1116+ if self .resize_config :
1117+ target_width = self .resize_config .get ("target_width" )
1118+ target_height = self .resize_config .get ("target_height" )
1119+
1120+ if target_width and target_height :
1121+ # Get page dimensions to calculate scaling
1122+ page_rect = page .rect
1123+
1124+ if is_pdf :
1125+ # For PDF files, calculate dimensions at specified DPI (default to 150 if None)
1126+ dpi = self .dpi or 150
1127+ original_width = int (page_rect .width * (dpi / 72 ))
1128+ original_height = int (page_rect .height * (dpi / 72 ))
1129+ else :
1130+ # For image files, use actual dimensions
1131+ original_width = int (page_rect .width )
1132+ original_height = int (page_rect .height )
1133+
1134+ # Apply same logic as image.resize_image - preserve aspect ratio, never upscale
1135+ width_ratio = target_width / original_width
1136+ height_ratio = target_height / original_height
1137+ scale_factor = min (
1138+ width_ratio , height_ratio
1139+ ) # Preserve aspect ratio
1140+
1141+ # Only resize if scale_factor < 1.0 (never upscale)
1142+ if scale_factor < 1.0 :
1143+ # Extract at reduced size using matrix transformation
1144+ if is_pdf :
1145+ # For PDF, combine DPI scaling with size reduction
1146+ dpi = self .dpi or 150
1147+ base_scale = dpi / 72 # Convert PDF points to pixels
1148+ final_scale = base_scale * scale_factor
1149+ matrix = fitz .Matrix (final_scale , final_scale )
1150+ else :
1151+ # For images, just apply the scale factor
1152+ matrix = fitz .Matrix (scale_factor , scale_factor )
1153+
1154+ pix = page .get_pixmap (matrix = matrix )
1155+
1156+ actual_width , actual_height = pix .width , pix .height
1157+ logger .info (
1158+ f"Extracted page { page_id } at target size: { actual_width } x{ actual_height } (scale: { scale_factor :.3f} )"
1159+ )
1160+
1161+ else :
1162+ # No resize needed - image is already smaller than targets
1163+ if is_pdf :
1164+ dpi = self .dpi or 150
1165+ pix = page .get_pixmap (dpi = dpi )
1166+ logger .info (
1167+ f"Page { page_id } already fits target size, using DPI { dpi } "
1168+ )
1169+ else :
1170+ pix = page .get_pixmap ()
1171+ logger .info (
1172+ f"Page { page_id } already fits target size, using original dimensions"
1173+ )
1174+ else :
1175+ # No valid target dimensions - use original extraction
1176+ if is_pdf :
1177+ dpi = self .dpi or 150
1178+ pix = page .get_pixmap (dpi = dpi )
1179+ else :
1180+ pix = page .get_pixmap ()
1181+ else :
1182+ # No resize config - extract at original size
1183+ if is_pdf :
1184+ dpi = self .dpi or 150
1185+ pix = page .get_pixmap (dpi = dpi )
1186+ logger .debug (f"Processing PDF page { page_id } at { dpi } DPI" )
1187+ else :
1188+ pix = page .get_pixmap ()
1189+ logger .debug (
1190+ f"Processing image page { page_id } at original dimensions"
1191+ )
10531192
1054- return pix .tobytes ("jpeg" )
1193+ image_bytes = pix .tobytes ("jpeg" )
1194+ return image_bytes
1195+ finally :
1196+ # Aggressive cleanup of PyMuPDF pixmap to prevent memory leaks
1197+ if pix is not None :
1198+ pix = None
10551199
10561200 def _process_single_page_bedrock (
10571201 self ,
@@ -1075,29 +1219,21 @@ def _process_single_page_bedrock(
10751219 t0 = time .time ()
10761220 page_id = page_index + 1
10771221
1078- # Extract page image at specified DPI (consistent with Textract)
1222+ # Extract page image - now returns image at optimal size directly
10791223 page = pdf_document .load_page (page_index )
10801224 img_bytes = self ._extract_page_image (page , pdf_document .is_pdf , page_id )
10811225
1082- # Upload image to S3
1226+ # Upload processed image to S3 (already at target size if resize config exists)
10831227 image_key = f"{ prefix } /pages/{ page_id } /image.jpg"
10841228 s3 .write_content (img_bytes , output_bucket , image_key , content_type = "image/jpeg" )
10851229
10861230 t1 = time .time ()
10871231 logger .debug (
1088- f"Time for image conversion (page { page_id } ): { t1 - t0 :.6f} seconds"
1232+ f"Time for image processing (page { page_id } ): { t1 - t0 :.6f} seconds"
10891233 )
10901234
1091- # Apply resize config if provided (consistent with Textract)
1092- ocr_img_bytes = img_bytes # Default to original image
1093- if self .resize_config :
1094- target_width = self .resize_config .get ("target_width" )
1095- target_height = self .resize_config .get ("target_height" )
1096-
1097- ocr_img_bytes = image .resize_image (img_bytes , target_width , target_height )
1098- logger .debug (
1099- f"Resized image for Bedrock OCR processing (page { page_id } ) to { target_width } x{ target_height } "
1100- )
1235+ # Use the extracted image directly for OCR (no additional resize needed)
1236+ ocr_img_bytes = img_bytes
11011237
11021238 # Apply preprocessing if enabled (only for OCR processing, not saved image)
11031239 if self .preprocessing_config and self .preprocessing_config .get ("enabled" ):
0 commit comments