15
15
from loguru import logger
16
16
17
17
from guidellm .config import settings
18
- from guidellm .request .loader import InfiniteDatasetError
18
+ from guidellm .request .loader import GetInfiniteDatasetLengthError
19
19
from guidellm .scheduler .result import (
20
20
SchedulerRequestResult ,
21
21
SchedulerResult ,
@@ -101,24 +101,15 @@ async def run(
101
101
:param max_duration: The maximum duration for the scheduling run.
102
102
If None, then no limit is set and either the iterator must be exhaustible
103
103
or the max_number must be set.
104
- :param max_error_rate: The maximum error rate after which the scheduler shuts down.
104
+ :param max_error_rate: The maximum error rate after which the
105
+ scheduler shuts down.
105
106
Only applicable in benchmarks with finite deterministic number of requests.
106
107
If None or not applicable then scheduler will continue regardless of errors.
107
108
:return: An asynchronous generator that yields SchedulerResult objects.
108
109
Each SchedulerResult object contains information about the request,
109
110
the response, and the run information.
110
111
"""
111
- if scheduling_strategy is None or not isinstance (
112
- scheduling_strategy , SchedulingStrategy
113
- ):
114
- raise ValueError (f"Invalid scheduling strategy: { scheduling_strategy } " )
115
-
116
- if max_number is not None and max_number < 1 :
117
- raise ValueError (f"Invalid max_number: { max_number } " )
118
- if max_duration is not None and max_duration < 0 :
119
- raise ValueError (f"Invalid max_duration: { max_duration } " )
120
- if max_error_rate is not None and (max_error_rate < 0 or max_error_rate > 1 ):
121
- raise ValueError (f"Invalid max_error_rate: { max_error_rate } " )
112
+ self ._validate_scheduler_params (scheduling_strategy , max_duration , max_error_rate , max_number )
122
113
123
114
with (
124
115
multiprocessing .Manager () as manager ,
@@ -127,11 +118,13 @@ async def run(
127
118
) as executor ,
128
119
):
129
120
requests_iter : Optional [Iterator [Any ]] = None
130
- futures , requests_queue , responses_queue , shutdown_event = await self ._start_processes (
131
- manager , executor , scheduling_strategy , max_error_rate is not None
132
- )
133
- if shutdown_event :
134
- assert not shutdown_event .is_set (), "shutdown_event is set before starting scheduling"
121
+ futures , requests_queue , responses_queue , shutdown_event = \
122
+ await self ._start_processes (
123
+ manager , executor , scheduling_strategy , max_error_rate is not None )
124
+ if shutdown_event and shutdown_event .is_set ():
125
+ raise RuntimeError (
126
+ "shutdown_event is set before starting scheduling"
127
+ )
135
128
run_info , requests_iter , times_iter = self ._run_setup (
136
129
futures , scheduling_strategy , max_number , max_duration , max_error_rate
137
130
)
@@ -169,17 +162,14 @@ async def run(
169
162
run_info ,
170
163
)
171
164
if iter_result is not None :
172
- if iter_result .request_info .errored and not iter_result .request_info .canceled :
173
- current_error_rate = run_info .errored_requests / run_info .end_number
174
- is_over_max_error_rate = run_info .max_error_rate < current_error_rate
175
-
176
- if is_over_max_error_rate :
177
- shutdown_event .set ()
178
- max_error_rate_reached = True
179
- logger .info (f"Max error rate of ({ iter_result .run_info .max_error_rate } ) "
180
- f"reached, sending shutdown signal" )
181
- else :
182
- logger .debug (f"Current error rate: { current_error_rate } " )
165
+ if iter_result .request_info .errored \
166
+ and not iter_result .request_info .canceled \
167
+ and self ._is_max_error_rate_reached (iter_result .run_info ):
168
+ shutdown_event .set ()
169
+ max_error_rate_reached = True
170
+ logger .info (f"Max error rate of "
171
+ f"({ iter_result .run_info .max_error_rate } ) "
172
+ f"reached, sending shutdown signal" )
183
173
yield iter_result
184
174
185
175
# yield control to the event loop
@@ -194,6 +184,28 @@ async def run(
194
184
195
185
await self ._stop_processes (futures , requests_queue )
196
186
187
+ def _validate_scheduler_params (
188
+ self ,
189
+ scheduling_strategy : SchedulingStrategy ,
190
+ max_duration : Optional [float ],
191
+ max_error_rate : Optional [float ],
192
+ max_number : Optional [int ]
193
+ ) -> None :
194
+ if scheduling_strategy is None or not isinstance (
195
+ scheduling_strategy , SchedulingStrategy
196
+ ):
197
+ raise ValueError (f"Invalid scheduling strategy: { scheduling_strategy } " )
198
+ if max_number is not None and max_number < 1 :
199
+ raise ValueError (f"Invalid max_number: { max_number } " )
200
+ if max_duration is not None and max_duration < 0 :
201
+ raise ValueError (f"Invalid max_duration: { max_duration } " )
202
+ if max_error_rate is not None and (max_error_rate < 0 or max_error_rate > 1 ):
203
+ raise ValueError (f"Invalid max_error_rate: { max_error_rate } " )
204
+
205
+ def _is_max_error_rate_reached (self , run_info ) -> bool :
206
+ current_error_rate = run_info .errored_requests / run_info .end_number
207
+ return run_info .max_error_rate < current_error_rate
208
+
197
209
async def _start_processes (
198
210
self ,
199
211
manager ,
@@ -282,10 +294,13 @@ def _run_setup(
282
294
start_time = time .time ()
283
295
times_iter = iter (scheduling_strategy .request_times ())
284
296
end_time = time .time () + (max_duration or math .inf )
285
- end_number = self ._determine_total_requests_count (scheduling_strategy , max_duration , max_number )
297
+ end_number = self ._determine_total_requests_count (
298
+ scheduling_strategy , max_duration , max_number
299
+ )
286
300
287
301
if end_number == math .inf and max_error_rate is not None :
288
- logger .warning ("max_error_rate will be ignored because end_number can not be determined." )
302
+ logger .warning ("max_error_rate will be ignored "
303
+ "because end_number can not be determined." )
289
304
290
305
if end_number == math .inf and end_time is None :
291
306
logger .warning (
@@ -312,17 +327,19 @@ def _determine_total_requests_count(
312
327
) -> int :
313
328
end_number = max_number or math .inf
314
329
try :
315
- # update end number if the request loader is finite and less than max
330
+ # update end_number if the request_loader is finite and less than max_number
316
331
iter_length = len (self .request_loader ) # type: ignore[arg-type]
317
332
if 0 < iter_length < end_number :
318
333
end_number = iter_length
319
- except InfiniteDatasetError :
320
- # Only when RPS is constant and duration is capped we can determine the total
321
- # amount of requests that are supposed to be sent
334
+ except GetInfiniteDatasetLengthError :
335
+ # Only when RPS is constant and duration is
336
+ # capped we can determine the total amount of requests
337
+ # that are supposed to be sent
322
338
if scheduling_strategy .type_ == "constant" and max_duration is not None :
323
- total_requests_in_max_duration = int (scheduling_strategy .rate * max_duration )
324
- if total_requests_in_max_duration < end_number :
325
- assert total_requests_in_max_duration > 0
339
+ total_requests_in_max_duration = int (
340
+ scheduling_strategy .rate * max_duration
341
+ )
342
+ if 0 < total_requests_in_max_duration < end_number :
326
343
end_number = total_requests_in_max_duration
327
344
except Exception : # noqa: BLE001, S110
328
345
pass
0 commit comments