From 8ae66280680ef7714f043b8ff021eead006acbe1 Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Thu, 17 Apr 2025 19:54:37 -0400 Subject: [PATCH 1/2] Determine number of tasks on per-process basis --- src/guidellm/scheduler/scheduler.py | 38 +++++++++++++++++++---------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index 68dcab21..15c59bae 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -188,13 +188,25 @@ async def _start_processes( maxsize=scheduling_strategy.queued_requests_limit ) responses_queue = manager.Queue() - per_process_requests_limit = scheduling_strategy.processing_requests_limit // ( - scheduling_strategy.processes_limit + + num_processes = min( + scheduling_strategy.processes_limit, + scheduling_strategy.processing_requests_limit + ) + requests_limit_split = (scheduling_strategy.processing_requests_limit // + scheduling_strategy.processes_limit) + requests_limit_remain = (scheduling_strategy.processing_requests_limit % + scheduling_strategy.processes_limit) + process_ids = (id_ for id_ in range(num_processes)) + process_requests_limits = ( + requests_limit_split + 1 + if i < requests_limit_remain else requests_limit_split + for i in range(num_processes) ) futures = [] loop = asyncio.get_event_loop() - for process_id in range(scheduling_strategy.processes_limit): + for id_, requests_limit in zip(process_ids, process_requests_limits): if scheduling_strategy.processing_mode == "sync": futures.append( loop.run_in_executor( @@ -202,20 +214,20 @@ async def _start_processes( self.worker.process_loop_synchronous, requests_queue, responses_queue, - process_id, + id_, ) ) elif scheduling_strategy.processing_mode == "async": - futures.append( - loop.run_in_executor( - executor, - self.worker.process_loop_asynchronous, - requests_queue, - responses_queue, - per_process_requests_limit, - process_id, + futures.append( + loop.run_in_executor( + executor, + self.worker.process_loop_asynchronous, + requests_queue, + responses_queue, + requests_limit, + id_, + ) ) - ) else: raise ValueError( f"Invalid processing mode: {scheduling_strategy.processing_mode} " From a691329b31b77fc39b749e407e8f05a8f9b2df89 Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Thu, 17 Apr 2025 19:56:14 -0400 Subject: [PATCH 2/2] Bounds check async concurrency value --- src/guidellm/scheduler/worker.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index 2d0846cb..efc84572 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -220,24 +220,27 @@ def process_loop_asynchronous( self, requests_queue: multiprocessing.Queue, results_queue: multiprocessing.Queue, - max_concurrency: Optional[int], + max_concurrency: int, process_id: int, ): async def _process_runner(): - pending = asyncio.Semaphore(max_concurrency) if max_concurrency else None + pending = asyncio.Semaphore(max_concurrency) + + if pending.locked(): + raise ValueError( + "Async worker called with max_concurrency < 1" + ) while ( process_request := await self.get_request(requests_queue) ) is not None: dequeued_time = time.time() - if pending: - await pending.acquire() + await pending.acquire() def _task_done(_: asyncio.Task): nonlocal pending - if pending: - pending.release() + pending.release() task = asyncio.create_task( self.resolve_scheduler_request( @@ -325,7 +328,7 @@ def process_loop_asynchronous( self, requests_queue: multiprocessing.Queue, results_queue: multiprocessing.Queue, - max_concurrency: Optional[int], + max_concurrency: int, process_id: int, ): asyncio.run(self.backend.validate())