Skip to content

Commit bf4558b

Browse files
committed
fix NEXUS-703: try to fix semaphore settings
1 parent 0112204 commit bf4558b

File tree

1 file changed

+18
-7
lines changed

1 file changed

+18
-7
lines changed

src/unstructured_client/_hooks/custom/split_pdf_hook.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,21 +57,31 @@
5757
HI_RES_STRATEGY = 'hi_res'
5858
MAX_PAGE_LENGTH = 4000
5959

60-
def _run_coroutines_in_separate_thread(
61-
coroutines_task: Coroutine[Any, Any, list[tuple[Any, httpx.Response]]]
62-
) -> list[httpx.Response]:
60+
def _get_asyncio_loop() -> asyncio.AbstractEventLoop:
6361
if sys.version_info < (3, 10):
6462
loop = asyncio.get_event_loop()
6563
else:
6664
try:
6765
loop = asyncio.get_running_loop()
6866
except RuntimeError:
6967
loop = asyncio.new_event_loop()
68+
asyncio.set_event_loop(loop)
69+
return loop
7070

71-
asyncio.set_event_loop(loop)
72-
71+
def _run_coroutines_in_separate_thread(
72+
coroutines_task: Coroutine[Any, Any, list[tuple[Any, httpx.Response]]]
73+
) -> list[httpx.Response]:
74+
loop = _get_asyncio_loop()
7375
return loop.run_until_complete(coroutines_task)
7476

77+
def _get_limiter(concurrency_level: int, executor: futures.ThreadPoolExecutor) -> asyncio.Semaphore:
78+
def _setup_limiter_in_thread_loop():
79+
_get_asyncio_loop()
80+
return asyncio.Semaphore(concurrency_level)
81+
with executor:
82+
return executor.submit(_setup_limiter_in_thread_loop).result()
83+
84+
7585

7686
async def _order_keeper(index: int, coro: Awaitable) -> Tuple[int, httpx.Response]:
7787
response = await coro
@@ -185,6 +195,7 @@ def __init__(self) -> None:
185195
self.allow_failed: bool = DEFAULT_ALLOW_FAILED
186196
self.cache_tmp_data_feature: bool = DEFAULT_CACHE_TMP_DATA
187197
self.cache_tmp_data_dir: str = DEFAULT_CACHE_TMP_DATA_DIR
198+
self.executor = futures.ThreadPoolExecutor(max_workers=1)
188199

189200
def sdk_init(
190201
self, base_url: str, client: HttpClient
@@ -333,7 +344,7 @@ def before_request(
333344
fallback_value=DEFAULT_CONCURRENCY_LEVEL,
334345
max_allowed=MAX_CONCURRENCY_LEVEL,
335346
)
336-
limiter = asyncio.Semaphore(concurrency_level)
347+
limiter = _get_limiter(concurrency_level, self.executor)
337348

338349
self.cache_tmp_data_feature = form_utils.get_split_pdf_cache_tmp_data(
339350
form_data,
@@ -621,7 +632,7 @@ def _await_elements(self, operation_id: str) -> Optional[list]:
621632

622633
# sending the coroutines to a separate thread to avoid blocking the current event loop
623634
# this operation should be removed when the SDK is updated to support async hooks
624-
with futures.ThreadPoolExecutor(max_workers=1) as executor:
635+
with self.executor as executor:
625636
task_responses_future = executor.submit(_run_coroutines_in_separate_thread, coroutines)
626637
task_responses = task_responses_future.result()
627638

0 commit comments

Comments
 (0)