1
- import time
2
- import uuid
3
- from abc import ABC , abstractmethod
4
- from collections .abc import AsyncGenerator , Iterable
5
- from pathlib import Path
1
+ from abc import ABC
2
+ from collections .abc import AsyncIterator , Iterable
6
3
from typing import (
7
- Any ,
8
- Callable ,
9
4
Generic ,
10
5
Optional ,
11
6
Union ,
12
7
)
13
8
14
- from transformers import PreTrainedTokenizerBase # type: ignore # noqa: PGH003
15
-
16
- from guidellm .backend import Backend
17
9
from guidellm .benchmark .aggregator import (
18
10
AggregatorT ,
19
11
BenchmarkT ,
20
- GenerativeBenchmarkAggregator ,
21
12
)
22
- from guidellm .benchmark .benchmark import BenchmarkArgs , GenerativeBenchmark
23
13
from guidellm .benchmark .profile import Profile
24
- from guidellm .request import (
25
- GenerativeRequestLoaderDescription ,
26
- )
27
14
from guidellm .scheduler import (
28
15
BackendT ,
29
16
Environment ,
30
17
RequestT ,
18
+ RequestTimingsT ,
31
19
ResponseT ,
32
- ScheduledRequestInfo ,
20
+ Scheduler ,
33
21
SchedulerState ,
34
- SchedulerUpdateAction ,
35
22
SchedulingStrategy ,
36
23
)
37
24
from guidellm .utils import ThreadSafeSingletonMixin
38
25
39
- __all__ = ["Benchmarker" , "GenerativeBenchmarker" ]
40
-
41
-
42
- """
43
- Scheduler:
44
- requests: Iterable[
45
- Union[RequestT, Iterable[Union[RequestT, tuple[RequestT, float]]]]
46
- ],
47
- backend: BackendT[RequestT, ResponseT],
48
- strategy: SchedulingStrategy,
49
- env: Environment,
50
- **constraints: dict[
51
- str, Union[int, float, str, ConstraintsResolveArgs, CallableConstraint]
52
- ],
53
-
54
- CallableConstraint = Callable[
55
- [SchedulerState, ScheduledRequestInfo], SchedulerUpdateAction
56
- ]
57
- """
58
-
59
-
60
- CallableConstraintInitializer = Callable [
61
- [AggregatorT , BenchmarkT ],
62
- Callable [[SchedulerState , ScheduledRequestInfo ], SchedulerUpdateAction ],
63
- ]
26
+ __all__ = ["Benchmarker" ]
64
27
65
28
66
29
class Benchmarker (
67
- Generic [AggregatorT , BenchmarkT , RequestT , ResponseT ], ABC , ThreadSafeSingletonMixin
30
+ Generic [AggregatorT , BenchmarkT , RequestT , RequestTimingsT , ResponseT ],
31
+ ABC ,
32
+ ThreadSafeSingletonMixin ,
68
33
):
69
34
async def run (
70
35
self ,
@@ -74,186 +39,44 @@ async def run(
74
39
backend : BackendT [RequestT , ResponseT ],
75
40
profile : Profile ,
76
41
environment : Environment ,
77
- aggregator : type [AggregatorT ],
78
- ) -> AsyncGenerator [
79
- BenchmarkerResult [AggregatorT , BenchmarkT , RequestT , ResponseT ], None
42
+ aggregator_class : type [AggregatorT ],
43
+ ) -> AsyncIterator [
44
+ tuple [
45
+ Optional [BenchmarkT ],
46
+ AggregatorT ,
47
+ SchedulingStrategy ,
48
+ Optional [SchedulerState ],
49
+ ]
80
50
]:
81
- try :
82
- requests_loader_size = len (self .scheduler .request_loader ) # type: ignore[arg-type]
83
- except Exception : # noqa: BLE001
84
- requests_loader_size = None
85
-
86
- strategy_limits = BenchmarkerStrategyLimits (
87
- requests_loader_size = requests_loader_size ,
88
- max_number_per_strategy = max_number_per_strategy ,
89
- max_duration_per_strategy = max_duration_per_strategy ,
90
- warmup_percent_per_strategy = warmup_percent_per_strategy ,
91
- cooldown_percent_per_strategy = cooldown_percent_per_strategy ,
92
- )
93
- start_time = time .time ()
94
- end_number = len (profile .strategy_types )
95
- current_index = - 1
96
- run_id = str (uuid .uuid4 ())
97
-
98
- yield BenchmarkerResult (
99
- type_ = "run_start" ,
100
- start_time = start_time ,
101
- end_number = end_number ,
102
- profile = profile ,
103
- current_index = current_index ,
104
- current_strategy = None ,
105
- current_aggregator = None ,
106
- current_benchmark = None ,
107
- current_result = None ,
108
- )
109
-
110
- while scheduling_strategy := profile .next_strategy ():
111
- current_index += 1
112
- aggregator = self .create_benchmark_aggregator (
113
- run_id = run_id ,
114
- profile = profile ,
115
- strategy_index = current_index ,
116
- strategy = scheduling_strategy ,
117
- limits = strategy_limits ,
118
- )
119
-
120
- async for result in self .scheduler .run (
121
- scheduling_strategy = scheduling_strategy ,
122
- max_number = max_number_per_strategy ,
123
- max_duration = max_duration_per_strategy ,
124
- ):
125
- if result .type_ == "run_start" :
126
- yield BenchmarkerResult (
127
- type_ = "scheduler_start" ,
128
- start_time = start_time ,
129
- end_number = end_number ,
130
- profile = profile ,
131
- current_index = current_index ,
132
- current_strategy = scheduling_strategy ,
133
- current_aggregator = aggregator ,
134
- current_benchmark = None ,
135
- current_result = None ,
51
+ with self .thread_lock :
52
+ strategies_generator = profile .strategies_generator ()
53
+ strategy , constraints = next (strategies_generator )
54
+
55
+ while strategy is not None :
56
+ aggregator = aggregator_class (
57
+ strategy = strategy , constraints = constraints
58
+ )
59
+ yield None , aggregator , strategy , None
60
+
61
+ async for (
62
+ response ,
63
+ request ,
64
+ request_info ,
65
+ scheduler_state ,
66
+ ) in Scheduler [BackendT , RequestT , RequestTimingsT , ResponseT ].run (
67
+ requests = requests ,
68
+ backend = backend ,
69
+ strategy = strategy ,
70
+ env = environment ,
71
+ ** constraints ,
72
+ ):
73
+ aggregator .update (
74
+ response = response ,
75
+ request = request ,
76
+ request_info = request_info ,
136
77
)
137
- elif result .type_ == "run_complete" :
138
- yield BenchmarkerResult (
139
- type_ = "scheduler_complete" ,
140
- start_time = start_time ,
141
- end_number = end_number ,
142
- profile = profile ,
143
- current_index = current_index ,
144
- current_strategy = scheduling_strategy ,
145
- current_aggregator = aggregator ,
146
- current_benchmark = None ,
147
- current_result = None ,
148
- )
149
- elif isinstance (result , SchedulerRequestResult ):
150
- aggregator .add_result (result )
151
-
152
- yield BenchmarkerResult (
153
- type_ = "scheduler_update" ,
154
- start_time = start_time ,
155
- end_number = end_number ,
156
- profile = profile ,
157
- current_index = current_index ,
158
- current_strategy = scheduling_strategy ,
159
- current_aggregator = aggregator ,
160
- current_benchmark = None ,
161
- current_result = result ,
162
- )
163
- else :
164
- raise ValueError (f"Unexpected result type: { type (result )} " )
165
-
166
- benchmark : BenchmarkT = aggregator .compile ()
167
- profile .completed_strategy (
168
- average_rate = benchmark .metrics .requests_per_second .successful .mean ,
169
- average_concurrency = benchmark .metrics .request_concurrency .successful .mean ,
170
- )
78
+ yield None , aggregator , strategy , scheduler_state
171
79
172
- yield BenchmarkerResult (
173
- type_ = "benchmark_compiled" ,
174
- start_time = start_time ,
175
- end_number = end_number ,
176
- profile = profile ,
177
- current_index = current_index ,
178
- current_strategy = scheduling_strategy ,
179
- current_aggregator = None ,
180
- current_benchmark = benchmark ,
181
- current_result = None ,
182
- )
183
-
184
- yield BenchmarkerResult (
185
- type_ = "run_complete" ,
186
- start_time = start_time ,
187
- end_number = end_number ,
188
- profile = profile ,
189
- current_index = current_index ,
190
- current_strategy = None ,
191
- current_aggregator = None ,
192
- current_benchmark = None ,
193
- current_result = None ,
194
- )
195
-
196
- @abstractmethod
197
- def create_benchmark_aggregator (
198
- self ,
199
- run_id : str ,
200
- profile : Profile ,
201
- strategy_index : int ,
202
- strategy : SchedulingStrategy ,
203
- limits : BenchmarkerStrategyLimits ,
204
- ) -> AggregatorT : ...
205
-
206
-
207
- class GenerativeBenchmarker (
208
- Benchmarker [
209
- GenerativeBenchmarkAggregator ,
210
- GenerativeBenchmark ,
211
- GenerationRequest ,
212
- ResponseSummary ,
213
- ],
214
- ):
215
- def __init__ (
216
- self ,
217
- backend : Backend ,
218
- request_loader : Iterable [GenerationRequest ],
219
- request_loader_description : GenerativeRequestLoaderDescription ,
220
- benchmark_save_extras : Optional [dict [str , Any ]] = None ,
221
- processor : Optional [Union [str , Path , PreTrainedTokenizerBase ]] = None ,
222
- processor_args : Optional [dict [str , Any ]] = None ,
223
- ):
224
- super ().__init__ (
225
- worker = GenerativeRequestsWorker (backend ),
226
- request_loader = request_loader ,
227
- requests_loader_description = request_loader_description ,
228
- benchmark_save_extras = benchmark_save_extras ,
229
- )
230
- self .processor = processor
231
- self .processor_args = processor_args
232
-
233
- def create_benchmark_aggregator (
234
- self ,
235
- run_id : str ,
236
- profile : Profile ,
237
- strategy_index : int ,
238
- strategy : SchedulingStrategy ,
239
- limits : BenchmarkerStrategyLimits ,
240
- ) -> GenerativeBenchmarkAggregator :
241
- return GenerativeBenchmarkAggregator (
242
- run_id = run_id ,
243
- args = BenchmarkArgs (
244
- profile = profile ,
245
- strategy_index = strategy_index ,
246
- strategy = strategy ,
247
- max_number = limits .max_number ,
248
- max_duration = limits .max_duration ,
249
- warmup_number = limits .warmup_number ,
250
- warmup_duration = limits .warmup_duration ,
251
- cooldown_number = limits .cooldown_number ,
252
- cooldown_duration = limits .cooldown_duration ,
253
- ),
254
- worker_description = self .worker .description , # type: ignore[arg-type]
255
- request_loader_description = self .requests_loader_description , # type: ignore[arg-type]
256
- extras = self .benchmark_save_extras or {},
257
- processor = self .processor ,
258
- processor_args = self .processor_args ,
259
- )
80
+ benchmark = aggregator .compile ()
81
+ yield benchmark , aggregator , strategy , None
82
+ strategy , constraints = strategies_generator .send ((benchmark , aggregator ))
0 commit comments