File tree Expand file tree Collapse file tree 1 file changed +10
-7
lines changed Expand file tree Collapse file tree 1 file changed +10
-7
lines changed Original file line number Diff line number Diff line change @@ -220,24 +220,27 @@ def process_loop_asynchronous(
220220 self ,
221221 requests_queue : multiprocessing .Queue ,
222222 results_queue : multiprocessing .Queue ,
223- max_concurrency : Optional [ int ] ,
223+ max_concurrency : int ,
224224 process_id : int ,
225225 ):
226226 async def _process_runner ():
227- pending = asyncio .Semaphore (max_concurrency ) if max_concurrency else None
227+ pending = asyncio .Semaphore (max_concurrency )
228+
229+ if pending .locked ():
230+ raise ValueError (
231+ "Async worker called with max_concurrency < 1"
232+ )
228233
229234 while (
230235 process_request := await self .get_request (requests_queue )
231236 ) is not None :
232237 dequeued_time = time .time ()
233238
234- if pending :
235- await pending .acquire ()
239+ await pending .acquire ()
236240
237241 def _task_done (_ : asyncio .Task ):
238242 nonlocal pending
239- if pending :
240- pending .release ()
243+ pending .release ()
241244
242245 task = asyncio .create_task (
243246 self .resolve_scheduler_request (
@@ -325,7 +328,7 @@ def process_loop_asynchronous(
325328 self ,
326329 requests_queue : multiprocessing .Queue ,
327330 results_queue : multiprocessing .Queue ,
328- max_concurrency : Optional [ int ] ,
331+ max_concurrency : int ,
329332 process_id : int ,
330333 ):
331334 asyncio .run (self .backend .validate ())
You can’t perform that action at this time.
0 commit comments