1
+ import asyncio
1
2
import logging
2
3
from typing import Optional
3
4
@@ -56,6 +57,7 @@ def __init__(
56
57
category : Optional [str ] = None ,
57
58
use_content_understanding : bool = False ,
58
59
content_understanding_endpoint : Optional [str ] = None ,
60
+ concurrency : int = 10 ,
59
61
):
60
62
self .list_file_strategy = list_file_strategy
61
63
self .blob_manager = blob_manager
@@ -70,6 +72,7 @@ def __init__(
70
72
self .category = category
71
73
self .use_content_understanding = use_content_understanding
72
74
self .content_understanding_endpoint = content_understanding_endpoint
75
+ self .concurrency = concurrency
73
76
74
77
def setup_search_manager (self ):
75
78
self .search_manager = SearchManager (
@@ -98,9 +101,9 @@ async def setup(self):
98
101
99
102
async def run (self ):
100
103
self .setup_search_manager ()
101
- if self . document_action == DocumentAction . Add :
102
- files = self . list_file_strategy . list ()
103
- async for file in files :
104
+
105
+ async def process_file_worker ( semaphore : asyncio . Semaphore , file : File ):
106
+ async with semaphore :
104
107
try :
105
108
sections = await parse_file (file , self .file_processors , self .category , self .image_embeddings )
106
109
if sections :
@@ -112,6 +115,12 @@ async def run(self):
112
115
finally :
113
116
if file :
114
117
file .close ()
118
+
119
+ if self .document_action == DocumentAction .Add :
120
+ files = self .list_file_strategy .list ()
121
+ semaphore = asyncio .Semaphore (self .concurrency )
122
+ tasks = [process_file_worker (semaphore , file ) async for file in files ]
123
+ await asyncio .gather (* tasks )
115
124
elif self .document_action == DocumentAction .Remove :
116
125
paths = self .list_file_strategy .list_paths ()
117
126
async for path in paths :
0 commit comments