Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,5 @@ static/
data/**/*.md5

.DS_Store

data_interm
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,36 @@ This solution creates a ChatGPT-like frontend experience over your own documents

This solution's backend is written in Python. There are also [**JavaScript**](https://aka.ms/azai/js/code), [**.NET**](https://aka.ms/azai/net/code), and [**Java**](https://aka.ms/azai/java/code) samples based on this one. Learn more about [developing AI apps using Azure AI Services](https://aka.ms/azai).

[![Open in GitHub Codespaces](https://img.shields.io/static/v1?style=for-the-badge&label=GitHub+Codespaces&message=Open&color=brightgreen&logo=github)](https://github.com/codespaces/new?hide_repo_select=true&ref=main&repo=599293758&machine=standardLinux32gb&devcontainer_path=.devcontainer%2Fdevcontainer.json&location=WestUs2)
# Notes for PIA

Be sure to pick the right repo after clicking this (PIA)

[![Open in GitHub Codespaces](https://img.shields.io/static/v1?style=for-the-badge&label=GitHub+Codespaces&message=Open&color=brightgreen&logo=github)](https://github.com/codespaces/new?hide_repo_select=false&ref=main&machine=standardLinux32gb&devcontainer_path=.devcontainer%2Fdevcontainer.json&location=WestUs2)
[![Open in Dev Containers](https://img.shields.io/static/v1?style=for-the-badge&label=Dev%20Containers&message=Open&color=blue&logo=visualstudiocode)](https://vscode.dev/redirect?url=vscode://ms-vscode-remote.remote-containers/cloneInVolume?url=https://github.com/azure-samples/azure-search-openai-demo)

## Working with the Search AI lab repo

1. Deploy that repo as instruction [here](https://github.com/Program-Integrity-Alliance/azure-ai-search-lab)
2. TODO (this will soon change): Ass fields to the 'blob_chunks' index there for sourcepage, sourcefile, storageUrl, category all string.

3. Then ...

Before running the azd deploy (see instructions below) ...

```
azd env set AZURE_OPENAI_CHATGPT_DEPLOYMENT_VERSION 2024-07-18
azd env set AZURE_OPENAI_CHATGPT_MODEL gpt-4o-mini

# To point at AI labs search service
azd env set AZURE_SEARCH_SERVICE srch-zan63y6zsbmxq-search
azd env set AZURE_SEARCH_SERVICE_RESOURCE_GROUP rg-aisearchlab-test-eus2-001
azd env set AZURE_SEARCH_INDEX blob-chunks
```

4. `azd up`, when asked for Environment and you want to deploy into existing resource group, drop the 'rg-' prefix as the build will add one.

TODO: While the two builds have different field names, need to add 'category' to the Azure search AI labs.

## Important Security Notice

This template, the application code and configuration it contains, has been built to showcase Microsoft Azure specific services and tools. We strongly advise our customers not to make this code part of their production environments without implementing or enabling additional security features. See our [productionizing guide](docs/productionizing.md) for tips, and consult the [Azure OpenAI Landing Zone reference architecture](https://techcommunity.microsoft.com/blog/azurearchitectureblog/azure-openai-landing-zone-reference-architecture/3882102) for more best practices.
Expand Down
96 changes: 79 additions & 17 deletions app/backend/prepdocs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
import logging
import os
import sys
from typing import Optional, Union

from azure.core.credentials import AzureKeyCredential
Expand All @@ -27,13 +28,18 @@
ADLSGen2ListFileStrategy,
ListFileStrategy,
LocalListFileStrategy,
BlobListFileStrategy
)
from prepdocslib.parser import Parser
from prepdocslib.pdfparser import DocumentAnalysisParser, LocalPdfParser
from prepdocslib.strategy import DocumentAction, SearchInfo, Strategy
from prepdocslib.textparser import TextParser
from prepdocslib.textsplitter import SentenceTextSplitter, SimpleTextSplitter

# mjh
from dotenv import load_dotenv
load_dotenv()

logger = logging.getLogger("scripts")


Expand Down Expand Up @@ -82,10 +88,12 @@ def setup_blob_manager(
def setup_list_file_strategy(
azure_credential: AsyncTokenCredential,
local_files: Union[str, None],
blob_files: Union[str, None],
datalake_storage_account: Union[str, None],
datalake_filesystem: Union[str, None],
datalake_path: Union[str, None],
datalake_key: Union[str, None],
force: Union[str, None]
):
list_file_strategy: ListFileStrategy
if datalake_storage_account:
Expand All @@ -99,11 +107,20 @@ def setup_list_file_strategy(
data_lake_path=datalake_path,
credential=adls_gen2_creds,
)
# mjh
elif blob_files:
logger.info("Using Blob Storage Account to find files: %s", os.environ["AZURE_STORAGE_ACCOUNT"])
list_file_strategy = BlobListFileStrategy(
storage_account=os.environ["AZURE_STORAGE_ACCOUNT"],
storage_container=os.environ["AZURE_STORAGE_CONTAINER"],
credential=azure_credential,
force=force
)
elif local_files:
logger.info("Using local files: %s", local_files)
list_file_strategy = LocalListFileStrategy(path_pattern=local_files)
list_file_strategy = LocalListFileStrategy(path_pattern=local_files, force=force)
else:
raise ValueError("Either local_files or datalake_storage_account must be provided.")
raise ValueError("Either local_files, blob_files or datalake_storage_account must be provided.")
return list_file_strategy


Expand Down Expand Up @@ -158,6 +175,7 @@ def setup_file_processors(
local_pdf_parser: bool = False,
local_html_parser: bool = False,
search_images: bool = False,
blob_manager_interim_files: BlobManager = None
):
sentence_text_splitter = SentenceTextSplitter(has_image_embeddings=search_images)

Expand All @@ -170,6 +188,7 @@ def setup_file_processors(
doc_int_parser = DocumentAnalysisParser(
endpoint=f"https://{document_intelligence_service}.cognitiveservices.azure.com/",
credential=documentintelligence_creds,
blob_manager_interim_files=blob_manager_interim_files
)

pdf_parser: Optional[Parser] = None
Expand Down Expand Up @@ -217,6 +236,11 @@ def setup_file_processors(
)
return file_processors

# mjh
def setup_interim_dir():
cache_dir = "./data_interim/"
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)

def setup_image_embeddings_service(
azure_credential: AsyncTokenCredential, vision_endpoint: Union[str, None], search_images: bool
Expand All @@ -243,7 +267,11 @@ async def main(strategy: Strategy, setup_index: bool = True):
parser = argparse.ArgumentParser(
description="Prepare documents by extracting content from PDFs, splitting content into sections, uploading to blob storage, and indexing in a search index."
)
parser.add_argument("files", nargs="?", help="Files to be processed")
parser.add_argument("blob_files", nargs="?", help="Use blob storage files")

parser.add_argument(
"--local_files", help="Look for local files"
)

parser.add_argument(
"--category", help="Value for the category field in the search index for all sections indexed in this run"
Expand Down Expand Up @@ -290,6 +318,9 @@ async def main(strategy: Strategy, setup_index: bool = True):
help="Search service system assigned Identity (Managed identity) (used for integrated vectorization)",
)

# mjh
parser.add_argument("--force", "-f", action="store_true", help="Force reprocessing of files")

parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
args = parser.parse_args()

Expand All @@ -299,7 +330,7 @@ async def main(strategy: Strategy, setup_index: bool = True):
# to avoid seeing the noisy INFO level logs from the Azure SDKs
logger.setLevel(logging.INFO)

load_azd_env()
#load_azd_env()

if os.getenv("AZURE_PUBLIC_NETWORK_ACCESS") == "Disabled":
logger.error("AZURE_PUBLIC_NETWORK_ACCESS is set to Disabled. Exiting.")
Expand All @@ -312,11 +343,11 @@ async def main(strategy: Strategy, setup_index: bool = True):

# Use the current user identity to connect to Azure services. See infra/main.bicep for role assignments.
if tenant_id := os.getenv("AZURE_TENANT_ID"):
logger.info("Connecting to Azure services using the azd credential for tenant %s", tenant_id)
azd_credential = AzureDeveloperCliCredential(tenant_id=tenant_id, process_timeout=60)
logger.info("Connecting to Azure services using the azd credential for tenant %s", tenant_id)
azd_credential = AzureDeveloperCliCredential(tenant_id=tenant_id, process_timeout=60)
else:
logger.info("Connecting to Azure services using the azd credential for home tenant")
azd_credential = AzureDeveloperCliCredential(process_timeout=60)
logger.info("Connecting to Azure services using the azd credential for home tenant")
azd_credential = AzureDeveloperCliCredential(process_timeout=60)

if args.removeall:
document_action = DocumentAction.RemoveAll
Expand All @@ -328,14 +359,17 @@ async def main(strategy: Strategy, setup_index: bool = True):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

search_info = loop.run_until_complete(
setup_search_info(
search_service=os.environ["AZURE_SEARCH_SERVICE"],
index_name=os.environ["AZURE_SEARCH_INDEX"],
azure_credential=azd_credential,
search_key=clean_key_if_exists(args.searchkey),
)
)
# mjh
#search_info = loop.run_until_complete(
# setup_search_info(
# search_service=os.environ["AZURE_SEARCH_SERVICE"],
# index_name=os.environ["AZURE_SEARCH_INDEX"],
# azure_credential=azd_credential,
# search_key=clean_key_if_exists(args.searchkey),
# )
#)
search_info=None

blob_manager = setup_blob_manager(
azure_credential=azd_credential,
storage_account=os.environ["AZURE_STORAGE_ACCOUNT"],
Expand All @@ -345,13 +379,35 @@ async def main(strategy: Strategy, setup_index: bool = True):
search_images=use_gptvision,
storage_key=clean_key_if_exists(args.storagekey),
)
# mjh. Blob manager for where we will save search documents
blob_manager_search_index_files = setup_blob_manager(
azure_credential=azd_credential,
storage_account=os.environ["AZURE_STORAGE_ACCOUNT_SEARCH_INDEX_FILES"],
storage_container=os.environ["AZURE_STORAGE_CONTAINER_SEARCH_INDEX_FILES"],
storage_resource_group=os.environ["AZURE_STORAGE_RESOURCE_GROUP_SEARCH_INDEX_FILES"],
subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"],
search_images=use_gptvision,
storage_key=clean_key_if_exists(args.storagekey),
)
# mjh. Blob manager for interim files, such as Doc intelligence objects
blob_manager_interim_files = setup_blob_manager(
azure_credential=azd_credential,
storage_account=os.environ["AZURE_STORAGE_ACCOUNT_INTERIM_FILES"],
storage_container=os.environ["AZURE_STORAGE_CONTAINER_INTERIM_FILES"],
storage_resource_group=os.environ["AZURE_STORAGE_RESOURCE_GROUP_INTERIM_FILES"],
subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"],
search_images=use_gptvision,
storage_key=clean_key_if_exists(args.storagekey),
)
list_file_strategy = setup_list_file_strategy(
azure_credential=azd_credential,
local_files=args.files,
local_files=args.local_files,
blob_files=args.blob_files,
datalake_storage_account=os.getenv("AZURE_ADLS_GEN2_STORAGE_ACCOUNT"),
datalake_filesystem=os.getenv("AZURE_ADLS_GEN2_FILESYSTEM"),
datalake_path=os.getenv("AZURE_ADLS_GEN2_FILESYSTEM_PATH"),
datalake_key=clean_key_if_exists(args.datalakekey),
force=args.force
)

openai_host = os.environ["OPENAI_HOST"]
Expand Down Expand Up @@ -406,6 +462,7 @@ async def main(strategy: Strategy, setup_index: bool = True):
local_pdf_parser=os.getenv("USE_LOCAL_PDF_PARSER") == "true",
local_html_parser=os.getenv("USE_LOCAL_HTML_PARSER") == "true",
search_images=use_gptvision,
blob_manager_interim_files=blob_manager_interim_files
)
image_embeddings_service = setup_image_embeddings_service(
azure_credential=azd_credential,
Expand All @@ -417,6 +474,8 @@ async def main(strategy: Strategy, setup_index: bool = True):
search_info=search_info,
list_file_strategy=list_file_strategy,
blob_manager=blob_manager,
blob_manager_search_index_files=blob_manager_search_index_files,
blob_manager_interim_files=blob_manager_interim_files,
file_processors=file_processors,
document_action=document_action,
embeddings=openai_embeddings_service,
Expand All @@ -426,5 +485,8 @@ async def main(strategy: Strategy, setup_index: bool = True):
category=args.category,
)

# mjh
setup_interim_dir()

loop.run_until_complete(main(ingestion_strategy, setup_index=not args.remove and not args.removeall))
loop.close()
50 changes: 50 additions & 0 deletions app/backend/prepdocslib/blobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import re
import pickle
from typing import List, Optional, Union

import fitz # type: ignore
Expand Down Expand Up @@ -162,6 +163,55 @@ async def remove_blob(self, path: Optional[str] = None):
logger.info("Removing blob %s", blob_path)
await container_client.delete_blob(blob_path)

# mjh
async def list_paths(self, path: Optional[str] = None) -> List[str]:
async with BlobServiceClient(
account_url=self.endpoint, credential=self.credential
) as service_client, service_client.get_container_client(self.container) as container_client:
if not await container_client.exists():
return []

# Set prefix for filtering blobs
prefix = os.path.splitext(os.path.basename(path))[0] if path else None

# List blobs with or without prefix
blobs = container_client.list_blob_names(name_starts_with=prefix)

# Consume the generator and collect blob names in a list
blob_list = []
async for blob_path in blobs:
logger.info("Found: %s", blob_path)
blob_list.append(blob_path)

# Debug information
logger.info("Total blobs found: %d", len(blob_list))
print(self.endpoint, self.container)

return blob_list

async def download_blob(self, blob_name):
async with BlobServiceClient(
account_url=self.endpoint, credential=self.credential
) as service_client, service_client.get_container_client(self.container) as container_client:
blob_client = container_client.get_blob_client(blob_name)
downloader = await blob_client.download_blob()
content = await downloader.readall()

# # Create a temporary file to store the blob's content
temp_file_path = os.path.join('./data_interim', os.path.basename(blob_name))
with open(temp_file_path, "wb") as file_stream:
stream = await blob_client.download_blob()
data = await stream.readall()
file_stream.write(data)

with open(temp_file_path, "rb") as f:
if '.pkl' in temp_file_path:
obj = pickle.load(f)
else:
obj = json.load(f)

return obj

@classmethod
def sourcepage_from_file_page(cls, filename, page=0) -> str:
if os.path.splitext(filename)[1].lower() == ".pdf":
Expand Down
Loading
Loading