|
1 | 1 | import io |
2 | 2 | import json |
3 | 3 | import os |
| 4 | +import re |
4 | 5 | import time |
5 | 6 | from pathlib import Path |
6 | | -from typing import Any, Dict, List |
| 7 | +from typing import Any, Dict, List, Tuple |
7 | 8 |
|
8 | 9 | import boto3 |
9 | 10 | import pandas as pd |
|
15 | 16 | AWS_SECRET_KEY, |
16 | 17 | PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS, |
17 | 18 | RUN_AWS_FUNCTIONS, |
| 19 | + SPLIT_PUNCTUATION_FROM_WORDS, |
18 | 20 | ) |
19 | 21 | from tools.custom_image_analyser_engine import CustomImageRecognizerResult, OCRResult |
20 | 22 | from tools.helper_functions import _generate_unique_ids |
@@ -202,6 +204,151 @@ def convert_pike_pdf_page_to_bytes(pdf: object, page_num: int): |
202 | 204 | return pdf_bytes |
203 | 205 |
|
204 | 206 |
|
| 207 | +def split_word_with_punctuation( |
| 208 | + word_text: str, |
| 209 | + bounding_box: Tuple[int, int, int, int], |
| 210 | + confidence: float, |
| 211 | +) -> List[Dict[str, Any]]: |
| 212 | + """ |
| 213 | + Split a word that may contain punctuation into separate word entries. |
| 214 | + Only separates punctuation at the start and end of words. |
| 215 | + Punctuation in the middle (e.g., in email addresses like user@example.com) |
| 216 | + is kept as part of the word. |
| 217 | +
|
| 218 | + Args: |
| 219 | + word_text: The text of the word (may contain punctuation) |
| 220 | + bounding_box: Tuple of (left, top, right, bottom) in pixels |
| 221 | + confidence: Confidence score for the original word |
| 222 | +
|
| 223 | + Returns: |
| 224 | + List of word dictionaries, each with text and bounding_box. |
| 225 | + Leading and trailing punctuation become separate entries, while |
| 226 | + the middle part (which may contain internal punctuation) remains intact. |
| 227 | + """ |
| 228 | + if not word_text: |
| 229 | + return [] |
| 230 | + |
| 231 | + # Extract leading punctuation (at the start of the word) |
| 232 | + leading_punct_match = re.match(r"^([^\w\s]+)", word_text) |
| 233 | + leading_punct = leading_punct_match.group(1) if leading_punct_match else "" |
| 234 | + |
| 235 | + # Extract trailing punctuation (at the end of the word) |
| 236 | + trailing_punct_match = re.search(r"([^\w\s]+)$", word_text) |
| 237 | + trailing_punct = trailing_punct_match.group(1) if trailing_punct_match else "" |
| 238 | + |
| 239 | + # Get the middle part (everything between leading and trailing punctuation) |
| 240 | + # This may contain punctuation (like @ or . in email addresses) which we keep |
| 241 | + start_idx = len(leading_punct) |
| 242 | + end_idx = len(word_text) - len(trailing_punct) if trailing_punct else len(word_text) |
| 243 | + middle_part = word_text[start_idx:end_idx] if start_idx < end_idx else "" |
| 244 | + |
| 245 | + # Build list of parts (leading punct, middle, trailing punct) |
| 246 | + parts = [] |
| 247 | + if leading_punct: |
| 248 | + parts.append(leading_punct) |
| 249 | + if middle_part: |
| 250 | + parts.append(middle_part) |
| 251 | + if trailing_punct: |
| 252 | + parts.append(trailing_punct) |
| 253 | + |
| 254 | + # If no parts to split, return original word |
| 255 | + if len(parts) == 0: |
| 256 | + return [ |
| 257 | + { |
| 258 | + "text": word_text, |
| 259 | + "confidence": confidence, |
| 260 | + "bounding_box": bounding_box, |
| 261 | + } |
| 262 | + ] |
| 263 | + |
| 264 | + # If only one part (no leading/trailing punctuation), return as-is |
| 265 | + if len(parts) == 1: |
| 266 | + return [ |
| 267 | + { |
| 268 | + "text": word_text, |
| 269 | + "confidence": confidence, |
| 270 | + "bounding_box": bounding_box, |
| 271 | + } |
| 272 | + ] |
| 273 | + |
| 274 | + # Calculate bounding box dimensions |
| 275 | + left, top, right, bottom = bounding_box |
| 276 | + width = right - left |
| 277 | + bottom - top |
| 278 | + |
| 279 | + # Calculate character width (assuming proportional distribution based on text length) |
| 280 | + total_chars = len(word_text) |
| 281 | + if total_chars == 0: |
| 282 | + return [] |
| 283 | + |
| 284 | + # Punctuation characters are typically narrower than alphanumeric characters |
| 285 | + # Use a scaling factor to make punctuation boxes thinner |
| 286 | + PUNCTUATION_WIDTH_SCALE = ( |
| 287 | + 0.5 # Punctuation is approximately 50% the width of alphanumeric chars |
| 288 | + ) |
| 289 | + |
| 290 | + # First pass: calculate effective character widths for each part |
| 291 | + # Alphanumeric parts get full width, punctuation parts get scaled width |
| 292 | + total_effective_chars = 0 |
| 293 | + part_info = [] |
| 294 | + |
| 295 | + for part in parts: |
| 296 | + if not part: |
| 297 | + continue |
| 298 | + # Check if part is punctuation-only (no alphanumeric characters) |
| 299 | + is_punctuation_only = not bool(re.search(r"[\w]", part)) |
| 300 | + if is_punctuation_only: |
| 301 | + effective_length = len(part) * PUNCTUATION_WIDTH_SCALE |
| 302 | + else: |
| 303 | + effective_length = len(part) |
| 304 | + part_info.append( |
| 305 | + { |
| 306 | + "text": part, |
| 307 | + "length": len(part), |
| 308 | + "effective_length": effective_length, |
| 309 | + "is_punctuation": is_punctuation_only, |
| 310 | + } |
| 311 | + ) |
| 312 | + total_effective_chars += effective_length |
| 313 | + |
| 314 | + if total_effective_chars == 0: |
| 315 | + return [] |
| 316 | + |
| 317 | + # Calculate base character width based on effective character count |
| 318 | + effective_char_width = width / total_effective_chars |
| 319 | + |
| 320 | + # Build separate word entries |
| 321 | + word_entries = [] |
| 322 | + current_pos = 0 |
| 323 | + |
| 324 | + for info in part_info: |
| 325 | + # Calculate actual width for this part based on effective length |
| 326 | + # (punctuation parts already have reduced effective_length) |
| 327 | + part_width = info["effective_length"] * effective_char_width |
| 328 | + |
| 329 | + # Calculate bounding box for this part |
| 330 | + part_left = left + current_pos |
| 331 | + part_right = part_left + part_width |
| 332 | + |
| 333 | + word_entries.append( |
| 334 | + { |
| 335 | + "text": info["text"], |
| 336 | + "confidence": confidence, |
| 337 | + "bounding_box": ( |
| 338 | + int(part_left), |
| 339 | + int(top), |
| 340 | + int(part_right), |
| 341 | + int(bottom), |
| 342 | + ), |
| 343 | + } |
| 344 | + ) |
| 345 | + |
| 346 | + # Move position forward by the effective width used |
| 347 | + current_pos += part_width |
| 348 | + |
| 349 | + return word_entries |
| 350 | + |
| 351 | + |
205 | 352 | def json_to_ocrresult( |
206 | 353 | json_data: dict, page_width: float, page_height: float, page_no: int |
207 | 354 | ): |
@@ -278,44 +425,62 @@ def _get_text_from_block(block, b_map): |
278 | 425 | word_block = block_map.get(child_id) |
279 | 426 | if word_block and word_block["BlockType"] == "WORD": |
280 | 427 | w_bbox = word_block["Geometry"]["BoundingBox"] |
281 | | - line_info["words"].append( |
282 | | - { |
283 | | - "text": word_block.get("Text", ""), |
284 | | - "confidence": round( |
285 | | - word_block.get("Confidence", 0.0), 0 |
286 | | - ), |
287 | | - "bounding_box": ( |
288 | | - int(w_bbox["Left"] * page_width), |
289 | | - int(w_bbox["Top"] * page_height), |
290 | | - int( |
291 | | - (w_bbox["Left"] + w_bbox["Width"]) |
292 | | - * page_width |
293 | | - ), |
294 | | - int( |
295 | | - (w_bbox["Top"] + w_bbox["Height"]) |
296 | | - * page_height |
297 | | - ), |
298 | | - ), |
299 | | - } |
| 428 | + word_text = word_block.get("Text", "") |
| 429 | + word_confidence = round( |
| 430 | + word_block.get("Confidence", 0.0), 0 |
300 | 431 | ) |
301 | | - if word_block.get("TextType") == "HANDWRITING": |
302 | | - rec_res = CustomImageRecognizerResult( |
303 | | - entity_type="HANDWRITING", |
304 | | - text=word_block.get("Text", ""), |
305 | | - score=round( |
306 | | - word_block.get("Confidence", 0.0), 0 |
307 | | - ), |
308 | | - start=0, |
309 | | - end=len(word_block.get("Text", "")), |
310 | | - left=int(w_bbox["Left"] * page_width), |
311 | | - top=int(w_bbox["Top"] * page_height), |
312 | | - width=int(w_bbox["Width"] * page_width), |
313 | | - height=int(w_bbox["Height"] * page_height), |
314 | | - ) |
315 | | - handwriting_recogniser_results.append(rec_res) |
316 | | - signature_or_handwriting_recogniser_results.append( |
317 | | - rec_res |
| 432 | + original_bounding_box = ( |
| 433 | + int(w_bbox["Left"] * page_width), |
| 434 | + int(w_bbox["Top"] * page_height), |
| 435 | + int( |
| 436 | + (w_bbox["Left"] + w_bbox["Width"]) * page_width |
| 437 | + ), |
| 438 | + int( |
| 439 | + (w_bbox["Top"] + w_bbox["Height"]) * page_height |
| 440 | + ), |
| 441 | + ) |
| 442 | + |
| 443 | + # Conditionally split word into alphanumeric parts and punctuation |
| 444 | + if SPLIT_PUNCTUATION_FROM_WORDS: |
| 445 | + split_words = split_word_with_punctuation( |
| 446 | + word_text, |
| 447 | + original_bounding_box, |
| 448 | + word_confidence, |
318 | 449 | ) |
| 450 | + else: |
| 451 | + # Original behavior: keep word as-is |
| 452 | + split_words = [ |
| 453 | + { |
| 454 | + "text": word_text, |
| 455 | + "confidence": word_confidence, |
| 456 | + "bounding_box": original_bounding_box, |
| 457 | + } |
| 458 | + ] |
| 459 | + |
| 460 | + # Add all word parts to the line |
| 461 | + for split_word in split_words: |
| 462 | + line_info["words"].append(split_word) |
| 463 | + |
| 464 | + # Handle handwriting - check if original word was handwriting |
| 465 | + if word_block.get("TextType") == "HANDWRITING": |
| 466 | + # For handwriting, create recognition results for each split part |
| 467 | + for split_word in split_words: |
| 468 | + split_bbox = split_word["bounding_box"] |
| 469 | + rec_res = CustomImageRecognizerResult( |
| 470 | + entity_type="HANDWRITING", |
| 471 | + text=split_word["text"], |
| 472 | + score=split_word["confidence"], |
| 473 | + start=0, |
| 474 | + end=len(split_word["text"]), |
| 475 | + left=split_bbox[0], |
| 476 | + top=split_bbox[1], |
| 477 | + width=split_bbox[2] - split_bbox[0], |
| 478 | + height=split_bbox[3] - split_bbox[1], |
| 479 | + ) |
| 480 | + handwriting_recogniser_results.append(rec_res) |
| 481 | + signature_or_handwriting_recogniser_results.append( |
| 482 | + rec_res |
| 483 | + ) |
319 | 484 | lines_data.append(line_info) |
320 | 485 |
|
321 | 486 | elif block_type == "SELECTION_ELEMENT": |
|
0 commit comments