diff --git a/.gitignore b/.gitignore index e51f3af2e2..29bdaffc13 100644 --- a/.gitignore +++ b/.gitignore @@ -149,3 +149,5 @@ static/ data/**/*.md5 .DS_Store + +data_interm diff --git a/README.md b/README.md index 1fb97c6784..0dd1501b97 100644 --- a/README.md +++ b/README.md @@ -23,9 +23,36 @@ This solution creates a ChatGPT-like frontend experience over your own documents This solution's backend is written in Python. There are also [**JavaScript**](https://aka.ms/azai/js/code), [**.NET**](https://aka.ms/azai/net/code), and [**Java**](https://aka.ms/azai/java/code) samples based on this one. Learn more about [developing AI apps using Azure AI Services](https://aka.ms/azai). -[](https://github.com/codespaces/new?hide_repo_select=true&ref=main&repo=599293758&machine=standardLinux32gb&devcontainer_path=.devcontainer%2Fdevcontainer.json&location=WestUs2) +# Notes for PIA + +Be sure to pick the right repo after clicking this (PIA) + +[](https://github.com/codespaces/new?hide_repo_select=false&ref=main&machine=standardLinux32gb&devcontainer_path=.devcontainer%2Fdevcontainer.json&location=WestUs2) [](https://vscode.dev/redirect?url=vscode://ms-vscode-remote.remote-containers/cloneInVolume?url=https://github.com/azure-samples/azure-search-openai-demo) +## Working with the Search AI lab repo + +1. Deploy that repo as instruction [here](https://github.com/Program-Integrity-Alliance/azure-ai-search-lab) +2. TODO (this will soon change): Ass fields to the 'blob_chunks' index there for sourcepage, sourcefile, storageUrl, category all string. + +3. Then ... + +Before running the azd deploy (see instructions below) ... + +``` +azd env set AZURE_OPENAI_CHATGPT_DEPLOYMENT_VERSION 2024-07-18 +azd env set AZURE_OPENAI_CHATGPT_MODEL gpt-4o-mini + +# To point at AI labs search service +azd env set AZURE_SEARCH_SERVICE srch-zan63y6zsbmxq-search +azd env set AZURE_SEARCH_SERVICE_RESOURCE_GROUP rg-aisearchlab-test-eus2-001 +azd env set AZURE_SEARCH_INDEX blob-chunks +``` + +4. `azd up`, when asked for Environment and you want to deploy into existing resource group, drop the 'rg-' prefix as the build will add one. + +TODO: While the two builds have different field names, need to add 'category' to the Azure search AI labs. + ## Important Security Notice This template, the application code and configuration it contains, has been built to showcase Microsoft Azure specific services and tools. We strongly advise our customers not to make this code part of their production environments without implementing or enabling additional security features. See our [productionizing guide](docs/productionizing.md) for tips, and consult the [Azure OpenAI Landing Zone reference architecture](https://techcommunity.microsoft.com/blog/azurearchitectureblog/azure-openai-landing-zone-reference-architecture/3882102) for more best practices. diff --git a/app/backend/prepdocs.py b/app/backend/prepdocs.py index d62a42f8cf..f616f0a59e 100644 --- a/app/backend/prepdocs.py +++ b/app/backend/prepdocs.py @@ -2,6 +2,7 @@ import asyncio import logging import os +import sys from typing import Optional, Union from azure.core.credentials import AzureKeyCredential @@ -27,6 +28,7 @@ ADLSGen2ListFileStrategy, ListFileStrategy, LocalListFileStrategy, + BlobListFileStrategy ) from prepdocslib.parser import Parser from prepdocslib.pdfparser import DocumentAnalysisParser, LocalPdfParser @@ -34,6 +36,10 @@ from prepdocslib.textparser import TextParser from prepdocslib.textsplitter import SentenceTextSplitter, SimpleTextSplitter +# mjh +from dotenv import load_dotenv +load_dotenv() + logger = logging.getLogger("scripts") @@ -82,10 +88,12 @@ def setup_blob_manager( def setup_list_file_strategy( azure_credential: AsyncTokenCredential, local_files: Union[str, None], + blob_files: Union[str, None], datalake_storage_account: Union[str, None], datalake_filesystem: Union[str, None], datalake_path: Union[str, None], datalake_key: Union[str, None], + force: Union[str, None] ): list_file_strategy: ListFileStrategy if datalake_storage_account: @@ -99,11 +107,20 @@ def setup_list_file_strategy( data_lake_path=datalake_path, credential=adls_gen2_creds, ) + # mjh + elif blob_files: + logger.info("Using Blob Storage Account to find files: %s", os.environ["AZURE_STORAGE_ACCOUNT"]) + list_file_strategy = BlobListFileStrategy( + storage_account=os.environ["AZURE_STORAGE_ACCOUNT"], + storage_container=os.environ["AZURE_STORAGE_CONTAINER"], + credential=azure_credential, + force=force + ) elif local_files: logger.info("Using local files: %s", local_files) - list_file_strategy = LocalListFileStrategy(path_pattern=local_files) + list_file_strategy = LocalListFileStrategy(path_pattern=local_files, force=force) else: - raise ValueError("Either local_files or datalake_storage_account must be provided.") + raise ValueError("Either local_files, blob_files or datalake_storage_account must be provided.") return list_file_strategy @@ -158,6 +175,7 @@ def setup_file_processors( local_pdf_parser: bool = False, local_html_parser: bool = False, search_images: bool = False, + blob_manager_interim_files: BlobManager = None ): sentence_text_splitter = SentenceTextSplitter(has_image_embeddings=search_images) @@ -170,6 +188,7 @@ def setup_file_processors( doc_int_parser = DocumentAnalysisParser( endpoint=f"https://{document_intelligence_service}.cognitiveservices.azure.com/", credential=documentintelligence_creds, + blob_manager_interim_files=blob_manager_interim_files ) pdf_parser: Optional[Parser] = None @@ -217,6 +236,11 @@ def setup_file_processors( ) return file_processors +# mjh +def setup_interim_dir(): + cache_dir = "./data_interim/" + if not os.path.exists(cache_dir): + os.makedirs(cache_dir) def setup_image_embeddings_service( azure_credential: AsyncTokenCredential, vision_endpoint: Union[str, None], search_images: bool @@ -243,7 +267,11 @@ async def main(strategy: Strategy, setup_index: bool = True): parser = argparse.ArgumentParser( description="Prepare documents by extracting content from PDFs, splitting content into sections, uploading to blob storage, and indexing in a search index." ) - parser.add_argument("files", nargs="?", help="Files to be processed") + parser.add_argument("blob_files", nargs="?", help="Use blob storage files") + + parser.add_argument( + "--local_files", help="Look for local files" + ) parser.add_argument( "--category", help="Value for the category field in the search index for all sections indexed in this run" @@ -290,6 +318,9 @@ async def main(strategy: Strategy, setup_index: bool = True): help="Search service system assigned Identity (Managed identity) (used for integrated vectorization)", ) + # mjh + parser.add_argument("--force", "-f", action="store_true", help="Force reprocessing of files") + parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") args = parser.parse_args() @@ -299,7 +330,7 @@ async def main(strategy: Strategy, setup_index: bool = True): # to avoid seeing the noisy INFO level logs from the Azure SDKs logger.setLevel(logging.INFO) - load_azd_env() + #load_azd_env() if os.getenv("AZURE_PUBLIC_NETWORK_ACCESS") == "Disabled": logger.error("AZURE_PUBLIC_NETWORK_ACCESS is set to Disabled. Exiting.") @@ -312,11 +343,11 @@ async def main(strategy: Strategy, setup_index: bool = True): # Use the current user identity to connect to Azure services. See infra/main.bicep for role assignments. if tenant_id := os.getenv("AZURE_TENANT_ID"): - logger.info("Connecting to Azure services using the azd credential for tenant %s", tenant_id) - azd_credential = AzureDeveloperCliCredential(tenant_id=tenant_id, process_timeout=60) + logger.info("Connecting to Azure services using the azd credential for tenant %s", tenant_id) + azd_credential = AzureDeveloperCliCredential(tenant_id=tenant_id, process_timeout=60) else: - logger.info("Connecting to Azure services using the azd credential for home tenant") - azd_credential = AzureDeveloperCliCredential(process_timeout=60) + logger.info("Connecting to Azure services using the azd credential for home tenant") + azd_credential = AzureDeveloperCliCredential(process_timeout=60) if args.removeall: document_action = DocumentAction.RemoveAll @@ -328,14 +359,17 @@ async def main(strategy: Strategy, setup_index: bool = True): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - search_info = loop.run_until_complete( - setup_search_info( - search_service=os.environ["AZURE_SEARCH_SERVICE"], - index_name=os.environ["AZURE_SEARCH_INDEX"], - azure_credential=azd_credential, - search_key=clean_key_if_exists(args.searchkey), - ) - ) + # mjh + #search_info = loop.run_until_complete( + # setup_search_info( + # search_service=os.environ["AZURE_SEARCH_SERVICE"], + # index_name=os.environ["AZURE_SEARCH_INDEX"], + # azure_credential=azd_credential, + # search_key=clean_key_if_exists(args.searchkey), + # ) + #) + search_info=None + blob_manager = setup_blob_manager( azure_credential=azd_credential, storage_account=os.environ["AZURE_STORAGE_ACCOUNT"], @@ -345,13 +379,35 @@ async def main(strategy: Strategy, setup_index: bool = True): search_images=use_gptvision, storage_key=clean_key_if_exists(args.storagekey), ) + # mjh. Blob manager for where we will save search documents + blob_manager_search_index_files = setup_blob_manager( + azure_credential=azd_credential, + storage_account=os.environ["AZURE_STORAGE_ACCOUNT_SEARCH_INDEX_FILES"], + storage_container=os.environ["AZURE_STORAGE_CONTAINER_SEARCH_INDEX_FILES"], + storage_resource_group=os.environ["AZURE_STORAGE_RESOURCE_GROUP_SEARCH_INDEX_FILES"], + subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"], + search_images=use_gptvision, + storage_key=clean_key_if_exists(args.storagekey), + ) + # mjh. Blob manager for interim files, such as Doc intelligence objects + blob_manager_interim_files = setup_blob_manager( + azure_credential=azd_credential, + storage_account=os.environ["AZURE_STORAGE_ACCOUNT_INTERIM_FILES"], + storage_container=os.environ["AZURE_STORAGE_CONTAINER_INTERIM_FILES"], + storage_resource_group=os.environ["AZURE_STORAGE_RESOURCE_GROUP_INTERIM_FILES"], + subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"], + search_images=use_gptvision, + storage_key=clean_key_if_exists(args.storagekey), + ) list_file_strategy = setup_list_file_strategy( azure_credential=azd_credential, - local_files=args.files, + local_files=args.local_files, + blob_files=args.blob_files, datalake_storage_account=os.getenv("AZURE_ADLS_GEN2_STORAGE_ACCOUNT"), datalake_filesystem=os.getenv("AZURE_ADLS_GEN2_FILESYSTEM"), datalake_path=os.getenv("AZURE_ADLS_GEN2_FILESYSTEM_PATH"), datalake_key=clean_key_if_exists(args.datalakekey), + force=args.force ) openai_host = os.environ["OPENAI_HOST"] @@ -406,6 +462,7 @@ async def main(strategy: Strategy, setup_index: bool = True): local_pdf_parser=os.getenv("USE_LOCAL_PDF_PARSER") == "true", local_html_parser=os.getenv("USE_LOCAL_HTML_PARSER") == "true", search_images=use_gptvision, + blob_manager_interim_files=blob_manager_interim_files ) image_embeddings_service = setup_image_embeddings_service( azure_credential=azd_credential, @@ -417,6 +474,8 @@ async def main(strategy: Strategy, setup_index: bool = True): search_info=search_info, list_file_strategy=list_file_strategy, blob_manager=blob_manager, + blob_manager_search_index_files=blob_manager_search_index_files, + blob_manager_interim_files=blob_manager_interim_files, file_processors=file_processors, document_action=document_action, embeddings=openai_embeddings_service, @@ -426,5 +485,8 @@ async def main(strategy: Strategy, setup_index: bool = True): category=args.category, ) + # mjh + setup_interim_dir() + loop.run_until_complete(main(ingestion_strategy, setup_index=not args.remove and not args.removeall)) loop.close() diff --git a/app/backend/prepdocslib/blobmanager.py b/app/backend/prepdocslib/blobmanager.py index e9f18e795a..6a43552d4f 100644 --- a/app/backend/prepdocslib/blobmanager.py +++ b/app/backend/prepdocslib/blobmanager.py @@ -3,6 +3,7 @@ import logging import os import re +import pickle from typing import List, Optional, Union import fitz # type: ignore @@ -162,6 +163,55 @@ async def remove_blob(self, path: Optional[str] = None): logger.info("Removing blob %s", blob_path) await container_client.delete_blob(blob_path) + # mjh + async def list_paths(self, path: Optional[str] = None) -> List[str]: + async with BlobServiceClient( + account_url=self.endpoint, credential=self.credential + ) as service_client, service_client.get_container_client(self.container) as container_client: + if not await container_client.exists(): + return [] + + # Set prefix for filtering blobs + prefix = os.path.splitext(os.path.basename(path))[0] if path else None + + # List blobs with or without prefix + blobs = container_client.list_blob_names(name_starts_with=prefix) + + # Consume the generator and collect blob names in a list + blob_list = [] + async for blob_path in blobs: + logger.info("Found: %s", blob_path) + blob_list.append(blob_path) + + # Debug information + logger.info("Total blobs found: %d", len(blob_list)) + print(self.endpoint, self.container) + + return blob_list + + async def download_blob(self, blob_name): + async with BlobServiceClient( + account_url=self.endpoint, credential=self.credential + ) as service_client, service_client.get_container_client(self.container) as container_client: + blob_client = container_client.get_blob_client(blob_name) + downloader = await blob_client.download_blob() + content = await downloader.readall() + + # # Create a temporary file to store the blob's content + temp_file_path = os.path.join('./data_interim', os.path.basename(blob_name)) + with open(temp_file_path, "wb") as file_stream: + stream = await blob_client.download_blob() + data = await stream.readall() + file_stream.write(data) + + with open(temp_file_path, "rb") as f: + if '.pkl' in temp_file_path: + obj = pickle.load(f) + else: + obj = json.load(f) + + return obj + @classmethod def sourcepage_from_file_page(cls, filename, page=0) -> str: if os.path.splitext(filename)[1].lower() == ".pdf": diff --git a/app/backend/prepdocslib/filestrategy.py b/app/backend/prepdocslib/filestrategy.py index 55b24b6f3a..de6c68371a 100644 --- a/app/backend/prepdocslib/filestrategy.py +++ b/app/backend/prepdocslib/filestrategy.py @@ -1,5 +1,8 @@ import logging from typing import List, Optional +import sys +import os +import json from .blobmanager import BlobManager from .embeddings import ImageEmbeddings, OpenAIEmbeddings @@ -42,6 +45,8 @@ def __init__( self, list_file_strategy: ListFileStrategy, blob_manager: BlobManager, + blob_manager_search_index_files: BlobManager, + blob_manager_interim_files: BlobManager, search_info: SearchInfo, file_processors: dict[str, FileProcessor], document_action: DocumentAction = DocumentAction.Add, @@ -53,6 +58,8 @@ def __init__( ): self.list_file_strategy = list_file_strategy self.blob_manager = blob_manager + self.blob_manager_search_index_files = blob_manager_search_index_files + self.blob_manager_interim_files = blob_manager_interim_files self.file_processors = file_processors self.document_action = document_action self.embeddings = embeddings @@ -63,15 +70,42 @@ def __init__( self.category = category async def setup(self): - search_manager = SearchManager( - self.search_info, - self.search_analyzer_name, - self.use_acls, - False, - self.embeddings, - search_images=self.image_embeddings is not None, - ) - await search_manager.create_index() + # mjh We are only saving files + logger.info("Not sending data to search, just creating files. Skipping index_update code.") + # search_manager = SearchManager( + # self.search_info, + # self.search_analyzer_name, + # self.use_acls, + # False, + # self.embeddings, + # search_images=self.image_embeddings is not None, + # ) + # await search_manager.create_index() + + async def cache_index_files(self, documents: List): + cache_dir = "./data_interim/" + for document in documents: + output_dir = os.path.join(cache_dir, document["sourcefile"]) + "_index_files" + if not os.path.exists(output_dir): + os.makedirs(output_dir) + file_name = document["sourcefile"] + "-" + document["id"] + base_name = output_dir + "/" + file_name + base_name += ".search_index_doc.json" + # Here set fields + if 'gao' in base_name.lower(): + document["dataSource"] = "gao" + else: + document["dataSource"] = "Unknown" + if "embedding" in document: + document["ContentVector"] = document["embedding"] + del document["embedding"] + with open(base_name, "w") as f: + f.write(json.dumps(document, indent=4)) + file = File(content=open(base_name, mode="rb")) + blob_sas_uris = await self.blob_manager_search_index_files.upload_blob(file) + # Remove file to save space + os.remove(base_name) + os.rmdir(output_dir) async def run(self): search_manager = SearchManager( @@ -79,15 +113,22 @@ async def run(self): ) if self.document_action == DocumentAction.Add: files = self.list_file_strategy.list() + async for file in files: try: sections = await parse_file(file, self.file_processors, self.category, self.image_embeddings) if sections: - blob_sas_uris = await self.blob_manager.upload_blob(file) + # mjh + #blob_sas_uris = await self.blob_manager.upload_blob(file) blob_image_embeddings: Optional[List[List[float]]] = None - if self.image_embeddings and blob_sas_uris: - blob_image_embeddings = await self.image_embeddings.create_embeddings(blob_sas_uris) - await search_manager.update_content(sections, blob_image_embeddings, url=file.url) + #if self.image_embeddings and blob_sas_uris: + if self.image_embeddings: + blob_image_embeddings = await self.image_embeddings.create_embeddings(blob_sas_uris) + documents = await search_manager.update_content(sections, blob_image_embeddings, url=file.url) + # Save the index files, these will be indexed by a search instance later + if documents: + await self.cache_index_files(documents) + finally: if file: file.close() diff --git a/app/backend/prepdocslib/integratedvectorizerstrategy.py b/app/backend/prepdocslib/integratedvectorizerstrategy.py index 66e8e4a346..302ef6ef31 100644 --- a/app/backend/prepdocslib/integratedvectorizerstrategy.py +++ b/app/backend/prepdocslib/integratedvectorizerstrategy.py @@ -128,7 +128,7 @@ async def setup(self): search_images=False, ) - await search_manager.create_index() + #await search_manager.create_index() ds_client = self.search_info.create_search_indexer_client() ds_container = SearchIndexerDataContainer(name=self.blob_manager.container) diff --git a/app/backend/prepdocslib/listfilestrategy.py b/app/backend/prepdocslib/listfilestrategy.py index 3c8fcd27b0..d172eb473e 100644 --- a/app/backend/prepdocslib/listfilestrategy.py +++ b/app/backend/prepdocslib/listfilestrategy.py @@ -12,6 +12,7 @@ from azure.storage.filedatalake.aio import ( DataLakeServiceClient, ) +from azure.storage.blob.aio import BlobServiceClient, BlobClient logger = logging.getLogger("scripts") @@ -65,8 +66,9 @@ class LocalListFileStrategy(ListFileStrategy): Concrete strategy for listing files that are located in a local filesystem """ - def __init__(self, path_pattern: str): + def __init__(self, path_pattern: str, force: bool = False): self.path_pattern = path_pattern + self.force = force async def list_paths(self) -> AsyncGenerator[str, None]: async for p in self._list_paths(self.path_pattern): @@ -83,7 +85,7 @@ async def _list_paths(self, path_pattern: str) -> AsyncGenerator[str, None]: async def list(self) -> AsyncGenerator[File, None]: async for path in self.list_paths(): - if not self.check_md5(path): + if self.force or not self.check_md5(path): yield File(content=open(path, mode="rb")) def check_md5(self, path: str) -> bool: @@ -175,3 +177,97 @@ async def list(self) -> AsyncGenerator[File, None]: os.remove(temp_file_path) except Exception as file_delete_exception: logger.error(f"\tGot an error while deleting {temp_file_path} -> {file_delete_exception}") + +class BlobListFileStrategy: + """ + Concrete strategy for listing files that are located in an Azure Blob Storage account + """ + + def __init__( + self, + storage_account: str, + storage_container: str, + credential: Union[AsyncTokenCredential, str], + force: bool = False + ): + self.storage_account = storage_account + self.storage_container = storage_container + self.credential = credential + self.temp_dir = './data' + self.force = force + + async def list(self, path: Optional[str] = None) -> AsyncGenerator[File, None]: + account_url = f"https://{self.storage_account}.blob.core.windows.net" + async with BlobServiceClient( + account_url=account_url, credential=self.credential + ) as service_client, service_client.get_container_client(self.storage_container) as container_client: + if not await container_client.exists(): + return + + prefix = os.path.splitext(os.path.basename(path))[0] if path else None + blobs = container_client.list_blobs(name_starts_with=prefix) + + async for blob in blobs: + # Ignore hash files + if blob.name.endswith(".md5"): + continue + + logger.info("Found: %s", blob.name) + + async with BlobClient( + account_url=account_url, + container_name=self.storage_container, + blob_name=blob.name, + credential=self.credential, + ) as blob_client: + # Check MD5 to determine if the blob has changed + if await self.check_md5(blob_client, blob.name) and self.force is False: + continue + + # Create a temporary file to store the blob's content + temp_file_path = os.path.join(self.temp_dir, os.path.basename(blob.name)) + + # Download blob content to the temporary file + with open(temp_file_path, "wb") as file_stream: + stream = await blob_client.download_blob() + data = await stream.readall() + file_stream.write(data) + + # Yield a File object with the downloaded content + yield File(content=open(temp_file_path, "rb"), url=f"{account_url}/{self.storage_container}/{blob.name}") + + async def check_md5(self, blob_client: BlobClient, blob_name: str) -> bool: + # Retrieve blob properties + properties = await blob_client.get_blob_properties() + blob_md5 = properties.content_settings.content_md5 + + if not blob_md5: + return False # If no MD5 hash is present, assume the blob has changed + + # Convert the MD5 to hex for comparison + blob_md5_hex = base64.b16encode(blob_md5).decode("ascii") + + # Generate a client for the .md5 blob + account_url = f"https://{self.storage_account}.blob.core.windows.net" + hash_blob_name = f"{blob_name}.md5" + async with BlobClient( + account_url=account_url, + container_name=self.storage_container, + blob_name=hash_blob_name, + credential=self.credential, + ) as hash_blob_client: + try: + # Check if the .md5 file exists and retrieve its content + stored_hash = await (await hash_blob_client.download_blob()).content_as_text() + stored_hash = stored_hash.strip() + except Exception: + stored_hash = None + + if stored_hash == blob_md5_hex: + logger.info("Skipping %s, no changes detected.", blob_name) + return True + + # Upload the new hash to the .md5 file in the blob storage + await hash_blob_client.upload_blob(blob_md5_hex, overwrite=True) + + return False \ No newline at end of file diff --git a/app/backend/prepdocslib/pdfparser.py b/app/backend/prepdocslib/pdfparser.py index 6604110020..34b1928dcc 100644 --- a/app/backend/prepdocslib/pdfparser.py +++ b/app/backend/prepdocslib/pdfparser.py @@ -7,6 +7,12 @@ from azure.core.credentials import AzureKeyCredential from azure.core.credentials_async import AsyncTokenCredential from pypdf import PdfReader +from .listfilestrategy import File +from prepdocslib.blobmanager import BlobManager +import pickle +import json +import os +import sys from .page import Page from .parser import Parser @@ -31,7 +37,6 @@ async def parse(self, content: IO) -> AsyncGenerator[Page, None]: yield Page(page_num=page_num, offset=offset, text=page_text) offset += len(page_text) - class DocumentAnalysisParser(Parser): """ Concrete parser backed by Azure AI Document Intelligence that can parse many document formats into pages @@ -39,23 +44,74 @@ class DocumentAnalysisParser(Parser): """ def __init__( - self, endpoint: str, credential: Union[AsyncTokenCredential, AzureKeyCredential], model_id="prebuilt-layout" + self, endpoint: str, credential: Union[AsyncTokenCredential, AzureKeyCredential], model_id="prebuilt-layout", + blob_manager_interim_files: BlobManager=None, ): self.model_id = model_id self.endpoint = endpoint self.credential = credential + self.blob_manager_interim_files = blob_manager_interim_files + self.cache_dir = "./data_interim/" + if not os.path.exists(self.cache_dir ): + os.makedirs(self.cache_dir ) + + async def cache_doc_intelligence_results(self, form_recognizer_results, content_name): + """ + Save Document intelligence to blob as JSON and pickle + """ + content_name = content_name.split("/")[-1] + base_name = f"{self.cache_dir}{content_name}" + pkl_file = f"{base_name}.doc_intel_obj.pkl" + json_file = f"{base_name}.doc_intel_dict.json" + with open(json_file, "w") as f: + form_recognizer_results_dict = form_recognizer_results.as_dict() + f.write(json.dumps(form_recognizer_results_dict, indent=4)) + with open(pkl_file, "wb") as file: + pickle.dump(form_recognizer_results, file) + print("Saving interim files") + for filename in [pkl_file, json_file]: + file = File(content=open(filename, mode="rb")) + blob_sas_uris = await self.blob_manager_interim_files.upload_blob(file) + # Remove file to save space + os.remove(filename) + + async def check_interim_files(self, base_name): + """ + Avoid reprocessing files that have already been processed + """ + pkl_file = f"{base_name}.doc_intel_obj.pkl" + pkl_file = pkl_file.split("/")[-1] + files = await self.blob_manager_interim_files.list_paths() + for filename in files: + if pkl_file in filename: + content = await self.blob_manager_interim_files.download_blob(filename) + os.remove(f"{self.cache_dir}/{pkl_file}") + return content async def parse(self, content: IO) -> AsyncGenerator[Page, None]: - logger.info("Extracting text from '%s' using Azure Document Intelligence", content.name) - + async with DocumentIntelligenceClient( endpoint=self.endpoint, credential=self.credential ) as document_intelligence_client: - poller = await document_intelligence_client.begin_analyze_document( - model_id=self.model_id, analyze_request=content, content_type="application/octet-stream" - ) - form_recognizer_results = await poller.result() - + + # mjh Load results from blob file if it exists + form_recognizer_results = await self.check_interim_files(content.name) + + if form_recognizer_results is None: + logger.info("Extracting text from '%s' using Azure Document Intelligence", content.name) + poller = await document_intelligence_client.begin_analyze_document( + model_id=self.model_id, + analyze_request=content, + content_type="application/octet-stream", + ) + form_recognizer_results = await poller.result() + + # mjh + await self.cache_doc_intelligence_results(form_recognizer_results, content.name) + else: + logger.info("Extracting text from '%s' using previously parsed results", content.name) + + # mjh Converted to use JSON rather than object, so we cache results offset = 0 for page_num, page in enumerate(form_recognizer_results.pages): tables_on_page = [ diff --git a/app/backend/prepdocslib/searchmanager.py b/app/backend/prepdocslib/searchmanager.py index f75af03514..849cd8d5e7 100644 --- a/app/backend/prepdocslib/searchmanager.py +++ b/app/backend/prepdocslib/searchmanager.py @@ -2,6 +2,7 @@ import logging import os from typing import List, Optional +import json from azure.search.documents.indexes.models import ( AzureOpenAIVectorizer, @@ -36,10 +37,11 @@ class Section: A section of a page that is stored in a search service. These sections are used as context by Azure OpenAI service """ - def __init__(self, split_page: SplitPage, content: File, category: Optional[str] = None): + def __init__(self, split_page: SplitPage, content: File, category: Optional[str] = None, SourceDocumentDataSource: Optional[str] = None): self.split_page = split_page self.content = content self.category = category + self.SourceDocumentDataSource = SourceDocumentDataSource class SearchManager: @@ -104,6 +106,7 @@ async def create_index(self, vectorizers: Optional[List[VectorSearchVectorizer]] vector_search_profile_name="embedding_config", ), SimpleField(name="category", type="Edm.String", filterable=True, facetable=True), + SimpleField(name="SourceDocumentDataSource", type="Edm.String", filterable=True, facetable=True), SimpleField( name="sourcepage", type="Edm.String", @@ -213,7 +216,7 @@ async def create_index(self, vectorizers: Optional[List[VectorSearchVectorizer]] ), ) - await search_index_client.create_index(index) + #await search_index_client.create_index(index) else: logger.info("Search index %s already exists", self.search_info.index_name) existing_index = await search_index_client.get_index(self.search_info.index_name) @@ -258,43 +261,47 @@ async def update_content( MAX_BATCH_SIZE = 1000 section_batches = [sections[i : i + MAX_BATCH_SIZE] for i in range(0, len(sections), MAX_BATCH_SIZE)] - async with self.search_info.create_search_client() as search_client: - for batch_index, batch in enumerate(section_batches): - documents = [ - { - "id": f"{section.content.filename_to_id()}-page-{section_index + batch_index * MAX_BATCH_SIZE}", - "content": section.split_page.text, - "category": section.category, - "sourcepage": ( - BlobManager.blob_image_name_from_file_page( - filename=section.content.filename(), - page=section.split_page.page_num, - ) - if image_embeddings - else BlobManager.sourcepage_from_file_page( - filename=section.content.filename(), - page=section.split_page.page_num, - ) - ), - "sourcefile": section.content.filename(), - **section.content.acls, - } - for section_index, section in enumerate(batch) - ] - if url: - for document in documents: - document["storageUrl"] = url - if self.embeddings: - embeddings = await self.embeddings.create_embeddings( - texts=[section.split_page.text for section in batch] - ) - for i, document in enumerate(documents): - document["embedding"] = embeddings[i] - if image_embeddings: - for i, (document, section) in enumerate(zip(documents, batch)): - document["imageEmbedding"] = image_embeddings[section.split_page.page_num] + #async with self.search_info.create_search_client() as search_client: + for batch_index, batch in enumerate(section_batches): + documents = [ + { + "id": f"{section.content.filename_to_id()}-page-{section_index + batch_index * MAX_BATCH_SIZE}", + "content": section.split_page.text, + "category": section.category, + "SourceDocumentDataSource": section.SourceDocumentDataSource, + "sourcepage": ( + BlobManager.blob_image_name_from_file_page( + filename=section.content.filename(), + page=section.split_page.page_num, + ) + if image_embeddings + else BlobManager.sourcepage_from_file_page( + filename=section.content.filename(), + page=section.split_page.page_num, + ) + ), + "sourcefile": section.content.filename(), + **section.content.acls, + } + for section_index, section in enumerate(batch) + ] + if url: + for document in documents: + document["storageUrl"] = url + if self.embeddings: + embeddings = await self.embeddings.create_embeddings( + texts=[section.split_page.text for section in batch] + ) + for i, document in enumerate(documents): + document["embedding"] = embeddings[i] + if image_embeddings: + for i, (document, section) in enumerate(zip(documents, batch)): + document["imageEmbedding"] = image_embeddings[section.split_page.page_num] + + # Not sending directly to search, just saving to blob + #await search_client.upload_documents(documents) - await search_client.upload_documents(documents) + return documents async def remove_content(self, path: Optional[str] = None, only_oid: Optional[str] = None): logger.info( diff --git a/data/Benefit_Options.pdf b/data/Benefit_Options.pdf deleted file mode 100644 index 6a4c07dc94..0000000000 Binary files a/data/Benefit_Options.pdf and /dev/null differ diff --git a/data/Contoso_Electronics_Company_Overview.md b/data/Contoso_Electronics_Company_Overview.md deleted file mode 100644 index 033d7dd84a..0000000000 --- a/data/Contoso_Electronics_Company_Overview.md +++ /dev/null @@ -1,48 +0,0 @@ -# Contoso Electronics - -*Disclaimer: This content is generated by AI and may not accurately represent factual information about any real entity. Use this information with caution and verify details from reliable sources.* - -## History - -Contoso Electronics, a pioneering force in the tech industry, was founded in 1985 by visionary entrepreneurs with a passion for innovation. Over the years, the company has played a pivotal role in shaping the landscape of consumer electronics. - -| Year | Milestone | -|------|-----------| -| 1985 | Company founded with a focus on cutting-edge technology | -| 1990 | Launched the first-ever handheld personal computer | -| 2000 | Introduced groundbreaking advancements in AI and robotics | -| 2015 | Expansion into sustainable and eco-friendly product lines | - -## Company Overview - -At Contoso Electronics, we take pride in fostering a dynamic and inclusive workplace. Our dedicated team of experts collaborates to create innovative solutions that empower and connect people globally. - -### Core Values - -- **Innovation:** Constantly pushing the boundaries of technology. -- **Diversity:** Embracing different perspectives for creative excellence. -- **Sustainability:** Committed to eco-friendly practices in our products. - -## Vacation Perks - -We believe in work-life balance and understand the importance of well-deserved breaks. Our vacation perks are designed to help our employees recharge and return with renewed enthusiasm. - -| Vacation Tier | Duration | Additional Benefits | -|---------------|----------|---------------------| -| Standard | 2 weeks | Health and wellness stipend | -| Senior | 4 weeks | Travel vouchers for a dream destination | -| Executive | 6 weeks | Luxury resort getaway with family | - -## Employee Recognition - -Recognizing the hard work and dedication of our employees is at the core of our culture. Here are some ways we celebrate achievements: - -- Monthly "Innovator of the Month" awards -- Annual gala with awards for outstanding contributions -- Team-building retreats for high-performing departments - -## Join Us! - -Contoso Electronics is always on the lookout for talented individuals who share our passion for innovation. If you're ready to be part of a dynamic team shaping the future of technology, check out our [careers page](http://www.contoso.com) for exciting opportunities. - -[Learn more about Contoso Electronics!](http://www.contoso.com) diff --git a/data/GPT4V_Examples/Financial Market Analysis Report 2023.pdf b/data/GPT4V_Examples/Financial Market Analysis Report 2023.pdf deleted file mode 100644 index eef17aad75..0000000000 Binary files a/data/GPT4V_Examples/Financial Market Analysis Report 2023.pdf and /dev/null differ diff --git a/data/Json_Examples/2189.json b/data/Json_Examples/2189.json deleted file mode 100644 index d7066c9fbd..0000000000 --- a/data/Json_Examples/2189.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "AreaPath": "SmartHotel360", - "AssignedTo": null, - "Categories": null, - "ChangedDate": "2023-12-13T23:08:38.69Z", - "ClosedDate": null, - "CreatedDate": "2023-12-13T23:08:38.69Z", - "Description": "As a customer, I would like to reserve a conference room such that:
Enter the guest's name to whom you would\nlike to send a confirmation, display the company, contact, source\nand agent associated\nwith the reservation.
", - "Id": 2190, - "State": "New", - "StateChangeDate": "2023-12-13T23:08:38.997Z", - "Tags": "Notification", - "Title": "As a reservation agent, I would like to send confirmations to guest" -} diff --git a/data/Json_Examples/2191.json b/data/Json_Examples/2191.json deleted file mode 100644 index 455e4c9a24..0000000000 --- a/data/Json_Examples/2191.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "AreaPath": "SmartHotel360", - "AssignedTo": null, - "Categories": null, - "ChangedDate": "2023-12-13T23:08:39.17Z", - "ClosedDate": null, - "CreatedDate": "2023-12-13T23:08:39.17Z", - "Description": "If you have not picked up\nyour vehicle you can remove or cancel your reservation by clicking here.