diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index 68dcab21..15c59bae 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -188,13 +188,25 @@ async def _start_processes( maxsize=scheduling_strategy.queued_requests_limit ) responses_queue = manager.Queue() - per_process_requests_limit = scheduling_strategy.processing_requests_limit // ( - scheduling_strategy.processes_limit + + num_processes = min( + scheduling_strategy.processes_limit, + scheduling_strategy.processing_requests_limit + ) + requests_limit_split = (scheduling_strategy.processing_requests_limit // + scheduling_strategy.processes_limit) + requests_limit_remain = (scheduling_strategy.processing_requests_limit % + scheduling_strategy.processes_limit) + process_ids = (id_ for id_ in range(num_processes)) + process_requests_limits = ( + requests_limit_split + 1 + if i < requests_limit_remain else requests_limit_split + for i in range(num_processes) ) futures = [] loop = asyncio.get_event_loop() - for process_id in range(scheduling_strategy.processes_limit): + for id_, requests_limit in zip(process_ids, process_requests_limits): if scheduling_strategy.processing_mode == "sync": futures.append( loop.run_in_executor( @@ -202,20 +214,20 @@ async def _start_processes( self.worker.process_loop_synchronous, requests_queue, responses_queue, - process_id, + id_, ) ) elif scheduling_strategy.processing_mode == "async": - futures.append( - loop.run_in_executor( - executor, - self.worker.process_loop_asynchronous, - requests_queue, - responses_queue, - per_process_requests_limit, - process_id, + futures.append( + loop.run_in_executor( + executor, + self.worker.process_loop_asynchronous, + requests_queue, + responses_queue, + requests_limit, + id_, + ) ) - ) else: raise ValueError( f"Invalid processing mode: {scheduling_strategy.processing_mode} " diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index 2d0846cb..efc84572 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -220,24 +220,27 @@ def process_loop_asynchronous( self, requests_queue: multiprocessing.Queue, results_queue: multiprocessing.Queue, - max_concurrency: Optional[int], + max_concurrency: int, process_id: int, ): async def _process_runner(): - pending = asyncio.Semaphore(max_concurrency) if max_concurrency else None + pending = asyncio.Semaphore(max_concurrency) + + if pending.locked(): + raise ValueError( + "Async worker called with max_concurrency < 1" + ) while ( process_request := await self.get_request(requests_queue) ) is not None: dequeued_time = time.time() - if pending: - await pending.acquire() + await pending.acquire() def _task_done(_: asyncio.Task): nonlocal pending - if pending: - pending.release() + pending.release() task = asyncio.create_task( self.resolve_scheduler_request( @@ -325,7 +328,7 @@ def process_loop_asynchronous( self, requests_queue: multiprocessing.Queue, results_queue: multiprocessing.Queue, - max_concurrency: Optional[int], + max_concurrency: int, process_id: int, ): asyncio.run(self.backend.validate())