diff --git a/app/backend/prepdocs.py b/app/backend/prepdocs.py index f03baac0dc..9e2cecf0e4 100644 --- a/app/backend/prepdocs.py +++ b/app/backend/prepdocs.py @@ -311,13 +311,19 @@ async def main(strategy: Strategy, setup_index: bool = True): required=False, help="Search service system assigned Identity (Managed identity) (used for integrated vectorization)", ) + parser.add_argument( + "--concurrency", + type=int, + default=FileStrategy.DEFAULT_CONCURRENCY, + help="Max. number of concurrent tasks to run for processing files (file strategy only) (default: 4)", + ) parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") args = parser.parse_args() if args.verbose: logging.basicConfig(format="%(message)s", datefmt="[%X]", handlers=[RichHandler(rich_tracebacks=True)]) - # We only set the level to INFO for our logger, + # We only set the level to DEBUG for our logger, # to avoid seeing the noisy INFO level logs from the Azure SDKs logger.setLevel(logging.DEBUG) @@ -467,6 +473,7 @@ async def main(strategy: Strategy, setup_index: bool = True): category=args.category, use_content_understanding=use_content_understanding, content_understanding_endpoint=os.getenv("AZURE_CONTENTUNDERSTANDING_ENDPOINT"), + concurrency=args.concurrency, ) loop.run_until_complete(main(ingestion_strategy, setup_index=not args.remove and not args.removeall)) diff --git a/app/backend/prepdocslib/blobmanager.py b/app/backend/prepdocslib/blobmanager.py index d5c21e0d41..eb7097b0dc 100644 --- a/app/backend/prepdocslib/blobmanager.py +++ b/app/backend/prepdocslib/blobmanager.py @@ -56,7 +56,7 @@ async def upload_blob(self, file: File) -> Optional[list[str]]: if file.url is None: with open(file.content.name, "rb") as reopened_file: blob_name = BlobManager.blob_name_from_file_name(file.content.name) - logger.info("Uploading blob for whole file -> %s", blob_name) + logger.info("'%s': Uploading blob for file to '%s'", file.content.name, blob_name) blob_client = await container_client.upload_blob(blob_name, reopened_file, overwrite=True) file.url = blob_client.url @@ -64,7 +64,7 @@ async def upload_blob(self, file: File) -> Optional[list[str]]: if os.path.splitext(file.content.name)[1].lower() == ".pdf": return await self.upload_pdf_blob_images(service_client, container_client, file) else: - logger.info("File %s is not a PDF, skipping image upload", file.content.name) + logger.info("'%s': File is not a PDF, skipping image upload", file.content.name) return None diff --git a/app/backend/prepdocslib/embeddings.py b/app/backend/prepdocslib/embeddings.py index df56f39c08..da0ae6d024 100644 --- a/app/backend/prepdocslib/embeddings.py +++ b/app/backend/prepdocslib/embeddings.py @@ -114,12 +114,11 @@ async def create_embedding_batch(self, texts: list[str], dimensions_args: ExtraA model=self.open_ai_model_name, input=batch.texts, **dimensions_args ) embeddings.extend([data.embedding for data in emb_response.data]) - logger.info( + logger.debug( "Computed embeddings in batch. Batch size: %d, Token count: %d", len(batch.texts), batch.token_length, ) - return embeddings async def create_embedding_single(self, text: str, dimensions_args: ExtraArgs) -> list[float]: @@ -134,8 +133,7 @@ async def create_embedding_single(self, text: str, dimensions_args: ExtraArgs) - emb_response = await client.embeddings.create( model=self.open_ai_model_name, input=text, **dimensions_args ) - logger.info("Computed embedding for text section. Character count: %d", len(text)) - + logger.debug("Computed embedding for text section. Character count: %d", len(text)) return emb_response.data[0].embedding async def create_embeddings(self, texts: list[str]) -> list[list[float]]: diff --git a/app/backend/prepdocslib/filestrategy.py b/app/backend/prepdocslib/filestrategy.py index 37f399cf4b..cd4b571276 100644 --- a/app/backend/prepdocslib/filestrategy.py +++ b/app/backend/prepdocslib/filestrategy.py @@ -1,3 +1,4 @@ +import asyncio import logging from typing import Optional @@ -23,11 +24,11 @@ async def parse_file( key = file.file_extension().lower() processor = file_processors.get(key) if processor is None: - logger.info("Skipping '%s', no parser found.", file.filename()) + logger.info("'%s': Skipping, no parser found.", file.content.name) return [] - logger.info("Ingesting '%s'", file.filename()) + logger.info("'%s': Starting ingestion process", file.content.name) pages = [page async for page in processor.parser.parse(content=file.content)] - logger.info("Splitting '%s' into sections", file.filename()) + logger.info("'%s': Splitting into sections", file.content.name) if image_embeddings: logger.warning("Each page will be split into smaller chunks of text, but images will be of the entire page.") sections = [ @@ -41,6 +42,8 @@ class FileStrategy(Strategy): Strategy for ingesting documents into a search service from files stored either locally or in a data lake storage account """ + DEFAULT_CONCURRENCY = 4 + def __init__( self, list_file_strategy: ListFileStrategy, @@ -56,6 +59,7 @@ def __init__( category: Optional[str] = None, use_content_understanding: bool = False, content_understanding_endpoint: Optional[str] = None, + concurrency: int = DEFAULT_CONCURRENCY, ): self.list_file_strategy = list_file_strategy self.blob_manager = blob_manager @@ -70,6 +74,7 @@ def __init__( self.category = category self.use_content_understanding = use_content_understanding self.content_understanding_endpoint = content_understanding_endpoint + self.concurrency = concurrency def setup_search_manager(self): self.search_manager = SearchManager( @@ -98,9 +103,9 @@ async def setup(self): async def run(self): self.setup_search_manager() - if self.document_action == DocumentAction.Add: - files = self.list_file_strategy.list() - async for file in files: + + async def process_file_worker(semaphore: asyncio.Semaphore, file: File): + async with semaphore: try: sections = await parse_file(file, self.file_processors, self.category, self.image_embeddings) if sections: @@ -108,10 +113,19 @@ async def run(self): blob_image_embeddings: Optional[list[list[float]]] = None if self.image_embeddings and blob_sas_uris: blob_image_embeddings = await self.image_embeddings.create_embeddings(blob_sas_uris) + logger.info("'%s': Computing embeddings and updating search index", file.content.name) await self.search_manager.update_content(sections, blob_image_embeddings, url=file.url) finally: if file: + logger.info("'%s': Finished processing file", file.content.name) file.close() + + if self.document_action == DocumentAction.Add: + files = self.list_file_strategy.list() + logger.info("Running with concurrency: %d", self.concurrency) + semaphore = asyncio.Semaphore(self.concurrency) + tasks = [process_file_worker(semaphore, file) async for file in files] + await asyncio.gather(*tasks) elif self.document_action == DocumentAction.Remove: paths = self.list_file_strategy.list_paths() async for path in paths: diff --git a/app/backend/prepdocslib/htmlparser.py b/app/backend/prepdocslib/htmlparser.py index 719045b393..cfb5aafaba 100644 --- a/app/backend/prepdocslib/htmlparser.py +++ b/app/backend/prepdocslib/htmlparser.py @@ -39,7 +39,7 @@ async def parse(self, content: IO) -> AsyncGenerator[Page, None]: Returns: Page: The parsed html Page. """ - logger.info("Extracting text from '%s' using local HTML parser (BeautifulSoup)", content.name) + logger.info("'%s': Extracting text using local HTML parser (BeautifulSoup)", content.name) data = content.read() soup = BeautifulSoup(data, "html.parser") diff --git a/app/backend/prepdocslib/integratedvectorizerstrategy.py b/app/backend/prepdocslib/integratedvectorizerstrategy.py index 9e89facc4c..b444a83f15 100644 --- a/app/backend/prepdocslib/integratedvectorizerstrategy.py +++ b/app/backend/prepdocslib/integratedvectorizerstrategy.py @@ -129,7 +129,7 @@ async def create_embedding_skill(self, index_name: str) -> SearchIndexerSkillset return skillset async def setup(self): - logger.info("Setting up search index using integrated vectorization...") + logger.info("Setting up search index using integrated vectorization") search_manager = SearchManager( search_info=self.search_info, search_analyzer_name=self.search_analyzer_name, diff --git a/app/backend/prepdocslib/listfilestrategy.py b/app/backend/prepdocslib/listfilestrategy.py index bdceef0754..2a4c5b7f03 100644 --- a/app/backend/prepdocslib/listfilestrategy.py +++ b/app/backend/prepdocslib/listfilestrategy.py @@ -102,7 +102,7 @@ def check_md5(self, path: str) -> bool: stored_hash = md5_f.read() if stored_hash and stored_hash.strip() == existing_hash.strip(): - logger.info("Skipping %s, no changes detected.", path) + logger.info("'%s': Skipping, no changes detected.", path) return True # Write the hash diff --git a/app/backend/prepdocslib/mediadescriber.py b/app/backend/prepdocslib/mediadescriber.py index 5aae79232e..42235ce1f7 100644 --- a/app/backend/prepdocslib/mediadescriber.py +++ b/app/backend/prepdocslib/mediadescriber.py @@ -58,7 +58,7 @@ async def poll(): return await poll() async def create_analyzer(self): - logger.info("Creating analyzer '%s'...", self.analyzer_schema["analyzerId"]) + logger.info("Creating analyzer '%s'", self.analyzer_schema["analyzerId"]) token_provider = get_bearer_token_provider(self.credential, "https://cognitiveservices.azure.com/.default") token = await token_provider() @@ -84,7 +84,7 @@ async def create_analyzer(self): await self.poll_api(session, poll_url, headers) async def describe_image(self, image_bytes: bytes) -> str: - logger.info("Sending image to Azure Content Understanding service...") + logger.info("Sending image to Azure Content Understanding service") async with aiohttp.ClientSession() as session: token = await self.credential.get_token("https://cognitiveservices.azure.com/.default") headers = {"Authorization": "Bearer " + token.token} diff --git a/app/backend/prepdocslib/pdfparser.py b/app/backend/prepdocslib/pdfparser.py index c96980d21c..3956e546ed 100644 --- a/app/backend/prepdocslib/pdfparser.py +++ b/app/backend/prepdocslib/pdfparser.py @@ -33,7 +33,7 @@ class LocalPdfParser(Parser): """ async def parse(self, content: IO) -> AsyncGenerator[Page, None]: - logger.info("Extracting text from '%s' using local PDF parser (pypdf)", content.name) + logger.info("'%s': Extracting text using local PDF parser (pypdf)", content.name) reader = PdfReader(content) pages = reader.pages @@ -65,7 +65,7 @@ def __init__( self.content_understanding_endpoint = content_understanding_endpoint async def parse(self, content: IO) -> AsyncGenerator[Page, None]: - logger.info("Extracting text from '%s' using Azure Document Intelligence", content.name) + logger.info("'%s': Extracting text using Azure Document Intelligence", content.name) async with DocumentIntelligenceClient( endpoint=self.endpoint, credential=self.credential diff --git a/app/backend/prepdocslib/searchmanager.py b/app/backend/prepdocslib/searchmanager.py index e6ca925e24..0d3faf3de3 100644 --- a/app/backend/prepdocslib/searchmanager.py +++ b/app/backend/prepdocslib/searchmanager.py @@ -77,7 +77,7 @@ def __init__( self.search_images = search_images async def create_index(self): - logger.info("Checking whether search index %s exists...", self.search_info.index_name) + logger.info("Checking whether search index '%s' exists", self.search_info.index_name) async with self.search_info.create_search_index_client() as search_index_client: @@ -280,10 +280,10 @@ async def create_index(self): await search_index_client.create_index(index) else: - logger.info("Search index %s already exists", self.search_info.index_name) + logger.info("Search index '%s' already exists", self.search_info.index_name) existing_index = await search_index_client.get_index(self.search_info.index_name) if not any(field.name == "storageUrl" for field in existing_index.fields): - logger.info("Adding storageUrl field to index %s", self.search_info.index_name) + logger.info("Adding storageUrl field to index '%s'", self.search_info.index_name) existing_index.fields.append( SimpleField( name="storageUrl",