@@ -40,6 +40,9 @@ class WorkloadConfig:
40
40
# Whether to include user id in request header
41
41
enable_user_id : bool
42
42
43
+ # Max number of unfinished queries allowed (None means no limit)
44
+ max_unfinished_queries : Optional [int ]
45
+
43
46
44
47
@dataclass
45
48
class UserConfig :
@@ -419,6 +422,13 @@ def step(self, timestamp: float, executor: RequestExecutor):
419
422
if self .start_time is None :
420
423
self .start_time = timestamp
421
424
425
+ pending_queries = len ([s for s in self .sessions if s .has_unfinished_request ])
426
+ # Only check limit if max_unfinished_queries is set
427
+ if (self .workload_config .max_unfinished_queries is not None and
428
+ pending_queries > self .workload_config .max_unfinished_queries ):
429
+ logger .info (f"unfinished queries >{ self .workload_config .max_unfinished_queries } , waiting" )
430
+ return
431
+
422
432
if timestamp - self .last_user_join > self .gap_between_users :
423
433
self ._create_user_session ()
424
434
self .last_user_join = timestamp
@@ -625,6 +635,12 @@ def parse_arguments() -> WorkloadConfig:
625
635
parser .add_argument (
626
636
"--sharegpt" , action = "store_true" , help = "Whether to use ShareGPT dataset"
627
637
)
638
+ parser .add_argument (
639
+ "--max-unfinished-queries" ,
640
+ type = int ,
641
+ default = None ,
642
+ help = "Maximum number of unfinished queries allowed (default: no limit)" ,
643
+ )
628
644
args = parser .parse_args ()
629
645
return args
630
646
@@ -675,6 +691,7 @@ def main():
675
691
qps = args .qps ,
676
692
model = args .model ,
677
693
enable_user_id = args .request_with_user_id ,
694
+ max_unfinished_queries = args .max_unfinished_queries ,
678
695
)
679
696
680
697
manager = UserSessionManager (
0 commit comments