2323
2424from guidellm .config import settings
2525from guidellm .scheduler .strategy import (
26- AsyncConstantStrategy ,
27- AsyncPoissonStrategy ,
28- ConcurrentStrategy ,
2926 SchedulingStrategy ,
3027 SynchronousStrategy ,
3128 ThroughputStrategy ,
@@ -398,55 +395,29 @@ async def _start_processes(
398395 multiprocessing .Queue ,
399396 multiprocessing .Queue ,
400397 ]:
401- cpu_cores = os .cpu_count () or 1
402-
403- worker_type : Literal ["sync" , "async" ]
404- requests_queue_limit : Optional [int ]
405- max_concurrency : int
406- num_processes : int
407-
408- if isinstance (self ._scheduling_strategy , SynchronousStrategy ):
409- worker_type = "sync"
410- requests_queue_limit = 2
411- num_processes = 1
412- max_concurrency = - 1
413- elif isinstance (self ._scheduling_strategy , ConcurrentStrategy ):
414- worker_type = "sync"
415- num_processes = self ._scheduling_strategy .streams
416- requests_queue_limit = (
417- num_processes * 2
418- ) # add 2 per process to ensure no idling
419- max_concurrency = - 1
420- elif isinstance (
421- self ._scheduling_strategy ,
422- (ThroughputStrategy , AsyncConstantStrategy , AsyncPoissonStrategy ),
423- ):
424- worker_type = "async"
425- num_processes = self ._num_processes or min (
426- max (1 , cpu_cores - 1 ), settings .max_worker_processes
427- )
428- max_concurrency = (
429- self ._scheduling_strategy .max_concurrency
430- if isinstance (self ._scheduling_strategy , ThroughputStrategy )
431- else None
432- ) or settings .max_concurrency
433- requests_queue_limit = (
434- max_concurrency
435- + num_processes # add 1 extra per process to ensure no idling
436- )
437- max_concurrency = max_concurrency // num_processes # convert to per process
438- else :
439- raise ValueError (
440- f"Invalid scheduling strategy: { self ._scheduling_strategy } "
441- )
398+ processing_mode = self ._scheduling_strategy .processing_mode
442399
443- requests_queue = manager .Queue (maxsize = requests_queue_limit )
400+ num_processes = self ._scheduling_strategy .processes_limit
401+ if num_processes is None :
402+ cpu_cores = os .cpu_count () or 1
403+ num_processes = min (max (1 , cpu_cores - 1 ), settings .max_worker_processes )
404+
405+ num_processing_requests = self ._scheduling_strategy .processing_requests_limit
406+ if num_processing_requests is None :
407+ num_processing_requests = settings .max_concurrency
408+ num_processing_requests_per_process = num_processing_requests // num_processes
409+
410+ num_queued_requests = self ._scheduling_strategy .queued_requests_limit
411+ if num_queued_requests is None :
412+ num_queued_requests = num_processing_requests + num_processes
413+
414+ requests_queue = manager .Queue (maxsize = num_queued_requests )
444415 responses_queue = manager .Queue ()
445416
446417 futures = []
447418 loop = asyncio .get_event_loop ()
448419 for process_id in range (num_processes ):
449- if worker_type == "sync" :
420+ if processing_mode == "sync" :
450421 futures .append (
451422 loop .run_in_executor (
452423 executor ,
@@ -456,19 +427,22 @@ async def _start_processes(
456427 process_id ,
457428 )
458429 )
459- elif worker_type == "async" :
430+ elif processing_mode == "async" :
460431 futures .append (
461432 loop .run_in_executor (
462433 executor ,
463434 self ._worker_process_async ,
464435 requests_queue ,
465436 responses_queue ,
466- max_concurrency ,
437+ num_processing_requests_per_process ,
467438 process_id ,
468439 )
469440 )
470441 else :
471- raise ValueError (f"Invalid worker type: { worker_type } " )
442+ raise ValueError (
443+ f"Invalid processing mode: { processing_mode } "
444+ f"for strategy: { self ._scheduling_strategy } "
445+ )
472446
473447 await asyncio .sleep (0.1 ) # give time for processes to start
474448
0 commit comments