99import tempfile
1010import uuid
1111from collections .abc import Awaitable
12+ from concurrent import futures
1213from functools import partial
1314from pathlib import Path
1415from typing import Any , Coroutine , Optional , Tuple , Union , cast , Generator , BinaryIO
1516
1617import aiofiles
1718import httpx
18- import nest_asyncio # type: ignore
1919from httpx import AsyncClient
2020from pypdf import PdfReader , PdfWriter
2121
5656HI_RES_STRATEGY = 'hi_res'
5757MAX_PAGE_LENGTH = 4000
5858
59+ def _run_coroutines_in_separate_thread (
60+ coroutines_task : Coroutine [Any , Any , list [tuple [int , httpx .Response ]]],
61+ ) -> list [tuple [int , httpx .Response ]]:
62+ return asyncio .run (coroutines_task )
63+
5964
6065async def _order_keeper (index : int , coro : Awaitable ) -> Tuple [int , httpx .Response ]:
6166 response = await coro
@@ -64,7 +69,8 @@ async def _order_keeper(index: int, coro: Awaitable) -> Tuple[int, httpx.Respons
6469
6570async def run_tasks (
6671 coroutines : list [partial [Coroutine [Any , Any , httpx .Response ]]],
67- allow_failed : bool = False
72+ allow_failed : bool = False ,
73+ concurrency_level : int = 10 ,
6874) -> list [tuple [int , httpx .Response ]]:
6975 """Run a list of coroutines in parallel and return the results in order.
7076
@@ -80,13 +86,14 @@ async def run_tasks(
8086 # Use a variable to adjust the httpx client timeout, or default to 30 minutes
8187 # When we're able to reuse the SDK to make these calls, we can remove this var
8288 # The SDK timeout will be controlled by parameter
89+ limiter = asyncio .Semaphore (concurrency_level )
8390 client_timeout_minutes = 60
8491 if timeout_var := os .getenv ("UNSTRUCTURED_CLIENT_TIMEOUT_MINUTES" ):
8592 client_timeout_minutes = int (timeout_var )
8693 client_timeout = httpx .Timeout (60 * client_timeout_minutes )
8794
8895 async with httpx .AsyncClient (timeout = client_timeout ) as client :
89- armed_coroutines = [coro (async_client = client ) for coro in coroutines ] # type: ignore
96+ armed_coroutines = [coro (async_client = client , limiter = limiter ) for coro in coroutines ] # type: ignore
9097 if allow_failed :
9198 responses = await asyncio .gather (* armed_coroutines , return_exceptions = False )
9299 return list (enumerate (responses , 1 ))
@@ -110,16 +117,6 @@ async def run_tasks(
110117 return sorted (results , key = lambda x : x [0 ])
111118
112119
113- def context_is_uvloop ():
114- """Return true if uvloop is installed and we're currently in a uvloop context. Our asyncio splitting code currently doesn't work under uvloop."""
115- try :
116- import uvloop # type: ignore[import] # pylint: disable=import-outside-toplevel
117- loop = asyncio .get_event_loop ()
118- return isinstance (loop , uvloop .Loop )
119- except (ImportError , RuntimeError ):
120- return False
121-
122-
123120def get_optimal_split_size (num_pages : int , concurrency_level : int ) -> int :
124121 """Distributes pages to workers evenly based on the number of pages and desired concurrency level."""
125122 if num_pages < MAX_PAGES_PER_SPLIT * concurrency_level :
@@ -163,8 +160,10 @@ def __init__(self) -> None:
163160 self .coroutines_to_execute : dict [
164161 str , list [partial [Coroutine [Any , Any , httpx .Response ]]]
165162 ] = {}
163+ self .concurrency_level : dict [str , int ] = {}
166164 self .api_successful_responses : dict [str , list [httpx .Response ]] = {}
167165 self .api_failed_responses : dict [str , list [httpx .Response ]] = {}
166+ self .executors : dict [str , futures .ThreadPoolExecutor ] = {}
168167 self .tempdirs : dict [str , tempfile .TemporaryDirectory ] = {}
169168 self .allow_failed : bool = DEFAULT_ALLOW_FAILED
170169 self .cache_tmp_data_feature : bool = DEFAULT_CACHE_TMP_DATA
@@ -264,14 +263,6 @@ def before_request(
264263 logger .warning ("HTTP client not accessible! Continuing without splitting." )
265264 return request
266265
267- if context_is_uvloop ():
268- logger .warning ("Splitting is currently incompatible with uvloop. Continuing without splitting." )
269- return request
270-
271- # This allows us to use an event loop in an env with an existing loop
272- # Temporary fix until we can improve the async splitting behavior
273- nest_asyncio .apply ()
274-
275266 # This is our key into coroutines_to_execute
276267 # We need to pass it on to after_success so
277268 # we know which results are ours
@@ -317,13 +308,15 @@ def before_request(
317308 fallback_value = DEFAULT_ALLOW_FAILED ,
318309 )
319310
320- concurrency_level = form_utils .get_split_pdf_concurrency_level_param (
311+ self . concurrency_level [ operation_id ] = form_utils .get_split_pdf_concurrency_level_param (
321312 form_data ,
322313 key = PARTITION_FORM_CONCURRENCY_LEVEL_KEY ,
323314 fallback_value = DEFAULT_CONCURRENCY_LEVEL ,
324315 max_allowed = MAX_CONCURRENCY_LEVEL ,
325316 )
326- limiter = asyncio .Semaphore (concurrency_level )
317+
318+ executor = futures .ThreadPoolExecutor (max_workers = 1 )
319+ self .executors [operation_id ] = executor
327320
328321 self .cache_tmp_data_feature = form_utils .get_split_pdf_cache_tmp_data (
329322 form_data ,
@@ -346,7 +339,7 @@ def before_request(
346339 page_count = page_range_end - page_range_start + 1
347340
348341 split_size = get_optimal_split_size (
349- num_pages = page_count , concurrency_level = concurrency_level
342+ num_pages = page_count , concurrency_level = self . concurrency_level [ operation_id ]
350343 )
351344
352345 # If the doc is small enough, and we aren't slicing it with a page range:
@@ -389,7 +382,6 @@ def before_request(
389382 # in `after_success`.
390383 coroutine = partial (
391384 self .call_api_partial ,
392- limiter = limiter ,
393385 operation_id = operation_id ,
394386 pdf_chunk_request = pdf_chunk_request ,
395387 pdf_chunk_file = pdf_chunk_file ,
@@ -607,10 +599,16 @@ def _await_elements(self, operation_id: str) -> Optional[list]:
607599 if tasks is None :
608600 return None
609601
610- ioloop = asyncio .get_event_loop ()
611- task_responses : list [tuple [int , httpx .Response ]] = ioloop .run_until_complete (
612- run_tasks (tasks , allow_failed = self .allow_failed )
613- )
602+ concurrency_level = self .concurrency_level .get (operation_id , DEFAULT_CONCURRENCY_LEVEL )
603+ coroutines = run_tasks (tasks , allow_failed = self .allow_failed , concurrency_level = concurrency_level )
604+
605+ # sending the coroutines to a separate thread to avoid blocking the current event loop
606+ # this operation should be removed when the SDK is updated to support async hooks
607+ executor = self .executors .get (operation_id )
608+ if executor is None :
609+ raise RuntimeError ("Executor not found for operation_id" )
610+ task_responses_future = executor .submit (_run_coroutines_in_separate_thread , coroutines )
611+ task_responses = task_responses_future .result ()
614612
615613 if task_responses is None :
616614 return None
@@ -714,6 +712,10 @@ def _clear_operation(self, operation_id: str) -> None:
714712 """
715713 self .coroutines_to_execute .pop (operation_id , None )
716714 self .api_successful_responses .pop (operation_id , None )
715+ self .concurrency_level .pop (operation_id , None )
716+ executor = self .executors .pop (operation_id , None )
717+ if executor is not None :
718+ executor .shutdown (wait = True )
717719 tempdir = self .tempdirs .pop (operation_id , None )
718720 if tempdir :
719721 tempdir .cleanup ()
0 commit comments