Skip to content

Commit d6ab606

Browse files
committed
fix NEXUS-703: fix thread executors management
1 parent bf4558b commit d6ab606

File tree

1 file changed

+18
-8
lines changed

1 file changed

+18
-8
lines changed

src/unstructured_client/_hooks/custom/split_pdf_hook.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@
5959

6060
def _get_asyncio_loop() -> asyncio.AbstractEventLoop:
6161
if sys.version_info < (3, 10):
62-
loop = asyncio.get_event_loop()
62+
try:
63+
loop = asyncio.get_event_loop()
64+
except RuntimeError:
65+
loop = asyncio.new_event_loop()
6366
else:
6467
try:
6568
loop = asyncio.get_running_loop()
@@ -78,8 +81,7 @@ def _get_limiter(concurrency_level: int, executor: futures.ThreadPoolExecutor) -
7881
def _setup_limiter_in_thread_loop():
7982
_get_asyncio_loop()
8083
return asyncio.Semaphore(concurrency_level)
81-
with executor:
82-
return executor.submit(_setup_limiter_in_thread_loop).result()
84+
return executor.submit(_setup_limiter_in_thread_loop).result()
8385

8486

8587

@@ -191,11 +193,11 @@ def __init__(self) -> None:
191193
] = {}
192194
self.api_successful_responses: dict[str, list[httpx.Response]] = {}
193195
self.api_failed_responses: dict[str, list[httpx.Response]] = {}
196+
self.executors: dict[str, futures.ThreadPoolExecutor] = {}
194197
self.tempdirs: dict[str, tempfile.TemporaryDirectory] = {}
195198
self.allow_failed: bool = DEFAULT_ALLOW_FAILED
196199
self.cache_tmp_data_feature: bool = DEFAULT_CACHE_TMP_DATA
197200
self.cache_tmp_data_dir: str = DEFAULT_CACHE_TMP_DATA_DIR
198-
self.executor = futures.ThreadPoolExecutor(max_workers=1)
199201

200202
def sdk_init(
201203
self, base_url: str, client: HttpClient
@@ -344,7 +346,10 @@ def before_request(
344346
fallback_value=DEFAULT_CONCURRENCY_LEVEL,
345347
max_allowed=MAX_CONCURRENCY_LEVEL,
346348
)
347-
limiter = _get_limiter(concurrency_level, self.executor)
349+
350+
executor = futures.ThreadPoolExecutor(max_workers=1)
351+
self.executors[operation_id] = executor
352+
limiter = _get_limiter(concurrency_level, executor)
348353

349354
self.cache_tmp_data_feature = form_utils.get_split_pdf_cache_tmp_data(
350355
form_data,
@@ -632,9 +637,11 @@ def _await_elements(self, operation_id: str) -> Optional[list]:
632637

633638
# sending the coroutines to a separate thread to avoid blocking the current event loop
634639
# this operation should be removed when the SDK is updated to support async hooks
635-
with self.executor as executor:
636-
task_responses_future = executor.submit(_run_coroutines_in_separate_thread, coroutines)
637-
task_responses = task_responses_future.result()
640+
executor = self.executors.get(operation_id)
641+
if executor is None:
642+
raise RuntimeError("Executor not found for operation_id")
643+
task_responses_future = executor.submit(_run_coroutines_in_separate_thread, coroutines)
644+
task_responses = task_responses_future.result()
638645

639646
if task_responses is None:
640647
return None
@@ -741,6 +748,9 @@ def _clear_operation(self, operation_id: str) -> None:
741748
"""
742749
self.coroutines_to_execute.pop(operation_id, None)
743750
self.api_successful_responses.pop(operation_id, None)
751+
executor = self.executors.pop(operation_id, None)
752+
if executor is not None:
753+
executor.shutdown(wait=True)
744754
tempdir = self.tempdirs.pop(operation_id, None)
745755
if tempdir:
746756
tempdir.cleanup()

0 commit comments

Comments
 (0)