|
5 | 5 | import requests
|
6 | 6 | import time
|
7 | 7 |
|
| 8 | +from dataclasses import dataclass |
8 | 9 | from requests.models import Response
|
9 |
| -from typing import Any, Optional, Dict, List |
| 10 | +from typing import Any, Dict, List, Optional, Tuple |
10 | 11 | from pathlib import Path
|
11 | 12 |
|
12 | 13 | from azure.storage.blob.aio import ContainerClient
|
|
15 | 16 | POLL_TIMEOUT_SECONDS = 120
|
16 | 17 |
|
17 | 18 |
|
| 19 | +@dataclass |
| 20 | +class ReferenceDocItem: |
| 21 | + filename: str |
| 22 | + file_path: str |
| 23 | + result_file_name: str |
| 24 | + result_file_path: Optional[str] = None |
| 25 | + |
| 26 | + |
18 | 27 | class AzureContentUnderstandingClient:
|
19 | 28 |
|
20 | 29 | PREBUILT_DOCUMENT_ANALYZER_ID: str = "prebuilt-documentAnalyzer"
|
@@ -428,72 +437,136 @@ async def generate_training_data_on_blob(
|
428 | 437 | f"Please ensure both files exist for '{filename}'."
|
429 | 438 | )
|
430 | 439 |
|
| 440 | + def _get_analyze_list( |
| 441 | + self, |
| 442 | + reference_docs_folder: str, |
| 443 | + ) -> List[ReferenceDocItem]: |
| 444 | + """ |
| 445 | + Returns a list of ReferenceDocItem objects for files in the given folder that need to be analyzed. |
| 446 | + """ |
| 447 | + analyze_list: List[ReferenceDocItem] = [] |
| 448 | + |
| 449 | + for dirpath, _, filenames in os.walk(reference_docs_folder): |
| 450 | + for filename in filenames: |
| 451 | + _, file_ext = os.path.splitext(filename) |
| 452 | + if self.is_supported_type_by_file_ext(file_ext, is_document=True): |
| 453 | + file_path = os.path.join(dirpath, filename) |
| 454 | + result_file_name = filename + self.OCR_RESULT_FILE_SUFFIX |
| 455 | + analyze_list.append( |
| 456 | + ReferenceDocItem( |
| 457 | + filename=filename, |
| 458 | + file_path=file_path, |
| 459 | + result_file_name=result_file_name, |
| 460 | + ) |
| 461 | + ) |
| 462 | + else: |
| 463 | + raise ValueError( |
| 464 | + f"File '{filename}' is not a supported document type, " |
| 465 | + f"please remove it or convert it to a supported type." |
| 466 | + ) |
| 467 | + |
| 468 | + return analyze_list |
| 469 | + |
| 470 | + def _get_upload_only_list( |
| 471 | + self, |
| 472 | + reference_docs_folder: str, |
| 473 | + ) -> List[ReferenceDocItem]: |
| 474 | + """ |
| 475 | + Returns a list of ReferenceDocItem objects for files in the given folder that already have OCR results |
| 476 | + """ |
| 477 | + upload_only_list: List[ReferenceDocItem] = [] |
| 478 | + |
| 479 | + for dirpath, _, filenames in os.walk(reference_docs_folder): |
| 480 | + for filename in filenames: |
| 481 | + _, file_ext = os.path.splitext(filename) |
| 482 | + if self.is_supported_type_by_file_ext(file_ext, is_document=True): |
| 483 | + file_path = os.path.join(dirpath, filename) |
| 484 | + result_file_name = filename + self.OCR_RESULT_FILE_SUFFIX |
| 485 | + result_file_path = os.path.join(dirpath, result_file_name) |
| 486 | + if not os.path.exists(result_file_path): |
| 487 | + raise FileNotFoundError( |
| 488 | + f"Result file '{result_file_name}' does not exist in '{dirpath}'. " |
| 489 | + f"Please run analyze first or remove this file from the folder." |
| 490 | + ) |
| 491 | + upload_only_list.append( |
| 492 | + ReferenceDocItem( |
| 493 | + filename=filename, |
| 494 | + file_path=file_path, |
| 495 | + result_file_name=result_file_name, |
| 496 | + result_file_path=result_file_path, |
| 497 | + ) |
| 498 | + ) |
| 499 | + elif filename.endswith(self.OCR_RESULT_FILE_SUFFIX): |
| 500 | + original_filename = filename.replace(self.OCR_RESULT_FILE_SUFFIX, "") |
| 501 | + if original_filename in filenames: |
| 502 | + # skip result.json files corresponding to the file with supported document type |
| 503 | + _, original_file_ext = os.path.splitext(original_filename) |
| 504 | + if self.is_supported_type_by_file_ext(original_file_ext, is_document=True): |
| 505 | + continue |
| 506 | + else: |
| 507 | + raise ValueError( |
| 508 | + f"The '{original_filename}' is not a supported document type, " |
| 509 | + f"please remove the result file '{filename}' and '{original_filename}'." |
| 510 | + ) |
| 511 | + else: |
| 512 | + raise ValueError( |
| 513 | + f"Result file '{filename}' is not corresponding to an original file, " |
| 514 | + f"please remove it." |
| 515 | + ) |
| 516 | + else: |
| 517 | + raise ValueError( |
| 518 | + f"File '{filename}' is not a supported document type, " |
| 519 | + f"please remove it or convert it to a supported type." |
| 520 | + ) |
| 521 | + |
| 522 | + return upload_only_list |
| 523 | + |
431 | 524 | async def generate_knowledge_base_on_blob(
|
432 | 525 | self,
|
433 | 526 | reference_docs_folder: str,
|
434 | 527 | storage_container_sas_url: str,
|
435 | 528 | storage_container_path_prefix: str,
|
436 | 529 | skip_analyze: bool = False,
|
437 | 530 | ) -> None:
|
| 531 | + """ |
| 532 | + Generates a knowledge base on Azure Blob Storage by analyzing or uploading files from the given folder. |
| 533 | + Args: |
| 534 | + reference_docs_folder (str): The path to the folder containing reference documents. |
| 535 | + storage_container_sas_url (str): The SAS URL of the Azure Blob Storage container. |
| 536 | + storage_container_path_prefix (str): The path prefix within the storage container where files will be |
| 537 | + skip_analyze (bool): If True, skips the analysis step and only uploads existing result files. |
| 538 | + """ |
438 | 539 | if not storage_container_path_prefix.endswith("/"):
|
439 | 540 | storage_container_path_prefix += "/"
|
440 |
| - |
| 541 | + |
441 | 542 | resources = []
|
442 | 543 | async with ContainerClient.from_container_url(storage_container_sas_url) as container_client:
|
443 |
| - for dirpath, _, filenames in os.walk(reference_docs_folder): |
444 |
| - for filename in filenames: |
445 |
| - filename_no_ext, file_ext = os.path.splitext(filename) |
446 |
| - if self.is_supported_type_by_file_ext(file_ext, is_document=True): |
447 |
| - file_path = os.path.join(dirpath, filename) |
448 |
| - result_file_name = filename_no_ext + self.OCR_RESULT_FILE_SUFFIX |
449 |
| - result_file_blob_path = storage_container_path_prefix + result_file_name |
450 |
| - # Get and upload result.json |
451 |
| - if not skip_analyze: |
452 |
| - self._logger.info(f"Analyzing result for {filename}") |
453 |
| - try: |
454 |
| - analyze_result = self.get_prebuilt_document_analyze_result(file_path) |
455 |
| - except Exception as e: |
456 |
| - self._logger.error( |
457 |
| - f"Error of getting analyze result of '{filename}'. " |
458 |
| - f"Please check the error message and consider retrying or removing this file." |
459 |
| - ) |
460 |
| - raise e |
461 |
| - await self._upload_json_to_blob(container_client, analyze_result, result_file_blob_path) |
462 |
| - else: |
463 |
| - self._logger.info(f"Using existing result.json for '{filename}'") |
464 |
| - result_file_path = os.path.join(dirpath, result_file_name) |
465 |
| - if not os.path.exists(result_file_path): |
466 |
| - raise FileNotFoundError( |
467 |
| - f"Result file '{result_file_name}' does not exist in '{dirpath}'. " |
468 |
| - f"Please run analyze first or remove this file from the folder." |
469 |
| - ) |
470 |
| - await self._upload_file_to_blob(container_client, result_file_path, result_file_blob_path) |
471 |
| - # Upload the original file |
472 |
| - file_blob_path = storage_container_path_prefix + filename |
473 |
| - await self._upload_file_to_blob(container_client, file_path, file_blob_path) |
474 |
| - resources.append({"file": filename, "resultFile": result_file_name}) |
475 |
| - elif filename.endswith(self.OCR_RESULT_FILE_SUFFIX) and skip_analyze: |
476 |
| - if filename.replace(self.OCR_RESULT_FILE_SUFFIX, "") in filenames: |
477 |
| - # skip result.json files corresponding to the file with supported document type |
478 |
| - original_filename = filename.replace(self.OCR_RESULT_FILE_SUFFIX, "") |
479 |
| - original_filename_no_ext, original_file_ext = os.path.splitext(original_filename) |
480 |
| - if self.is_supported_type_by_file_ext(original_file_ext, is_document=True): |
481 |
| - continue |
482 |
| - else: |
483 |
| - raise ValueError( |
484 |
| - f"The original file of '{filename}' is not a supported document type, " |
485 |
| - f"please remove the result file '{filename}' and '{original_filename}'." |
486 |
| - ) |
487 |
| - else: |
488 |
| - raise ValueError( |
489 |
| - f"Result file '{filename}' is not corresponding to an original file, " |
490 |
| - f"please remove it." |
| 544 | + if not skip_analyze: |
| 545 | + analyze_list = self._get_analyze_list(reference_docs_folder) |
| 546 | + for analyze_item in analyze_list: |
| 547 | + self._logger.info(f"Analyzing result for {analyze_item.filename}") |
| 548 | + try: |
| 549 | + analyze_result = self.get_prebuilt_document_analyze_result(analyze_item.file_path) |
| 550 | + except Exception as e: |
| 551 | + self._logger.error( |
| 552 | + f"Error of getting analyze result of '{analyze_item.filename}'. " |
| 553 | + f"Please check the error message and consider retrying or removing this file." |
491 | 554 | )
|
492 |
| - else: |
493 |
| - raise ValueError( |
494 |
| - f"File '{filename}' is not a supported document type, " |
495 |
| - f"please remove it or convert it to a supported type." |
496 |
| - ) |
| 555 | + result_file_blob_path = storage_container_path_prefix + analyze_item.result_file_name |
| 556 | + file_blob_path = storage_container_path_prefix + analyze_item.filename |
| 557 | + await self._upload_json_to_blob(container_client, analyze_result, result_file_blob_path) |
| 558 | + await self._upload_file_to_blob(container_client, analyze_item.file_path, file_blob_path) |
| 559 | + resources.append({"file": analyze_item.filename, "resultFile": analyze_item.result_file_name}) |
| 560 | + else: |
| 561 | + upload_list = self._get_upload_only_list(reference_docs_folder) |
| 562 | + for upload_item in upload_list: |
| 563 | + self._logger.info(f"Using existing result.json for '{upload_item.filename}'") |
| 564 | + result_file_blob_path = storage_container_path_prefix + upload_item.result_file_name |
| 565 | + file_blob_path = storage_container_path_prefix + upload_item.filename |
| 566 | + await self._upload_file_to_blob(container_client, upload_item.file_path, file_blob_path) |
| 567 | + await self._upload_file_to_blob(container_client, upload_item.result_file_path, result_file_blob_path) |
| 568 | + resources.append({"file": upload_item.filename, "resultFile": upload_item.result_file_name}) |
| 569 | + |
497 | 570 | # Upload sources.jsonl
|
498 | 571 | await self.upload_jsonl_to_blob(
|
499 | 572 | container_client, resources, storage_container_path_prefix + self.KNOWLEDGE_SOURCE_LIST_FILE_NAME)
|
|
0 commit comments