17
17
from loguru import logger
18
18
19
19
from guidellm .config import settings
20
- from guidellm .request .session import RequestSession
21
20
from guidellm .request .types import (
22
21
RequestT ,
23
22
ResponseT ,
27
26
SchedulerRequestResult ,
28
27
SchedulerResult ,
29
28
SchedulerRunInfo ,
30
- WorkerProcessRequestTime ,
29
+ WorkerProcessRequest ,
31
30
WorkerProcessResult ,
32
31
)
33
32
from guidellm .scheduler .strategy import SchedulingStrategy
@@ -127,10 +126,14 @@ async def run(
127
126
) as executor ,
128
127
):
129
128
requests_iter : Optional [Iterator [Any ]] = None
129
+ # TODO: Configurable delay and move somewhere more appropriate
130
+ scheduling_strategy .start_time = (
131
+ time .time ()
132
+ ) # Add a small delay to allow processes to start
130
133
futures , queues , stop_event = await self ._start_processes (
131
134
manager , executor , scheduling_strategy
132
135
)
133
- run_info , requests_iter , times_iter = self ._run_setup (
136
+ run_info , requests_iter = self ._run_setup (
134
137
futures , scheduling_strategy , max_number , max_duration
135
138
)
136
139
yield SchedulerResult (
@@ -145,19 +148,14 @@ async def run(
145
148
if future .done () and (err := future .exception ()) is not None :
146
149
raise err
147
150
148
- if (
149
- requests_iter is None
150
- and run_info .completed_requests >= run_info .created_requests
151
- ):
151
+ if requests_iter is None and run_info .processing_requests <= 0 :
152
152
# we've exhausted all requests we've wanted to run
153
153
# and yielded all responses
154
154
break
155
155
156
156
requests_iter = self ._add_requests (
157
157
requests_iter ,
158
- times_iter ,
159
158
queues .requests ,
160
- queues .times ,
161
159
run_info ,
162
160
)
163
161
await asyncio .sleep (0 ) # enable requests to start
@@ -196,7 +194,6 @@ async def _start_processes(
196
194
requests = manager .Queue (
197
195
maxsize = scheduling_strategy .processing_requests_limit
198
196
),
199
- times = manager .Queue (maxsize = scheduling_strategy .processing_requests_limit ),
200
197
responses = manager .Queue (),
201
198
)
202
199
stop_event = manager .Event ()
@@ -229,10 +226,12 @@ async def _start_processes(
229
226
executor ,
230
227
self .worker .process_loop_asynchronous ,
231
228
queues ,
229
+ scheduling_strategy ,
232
230
stop_event ,
233
231
False , # TODO: Make configurable
234
232
requests_limit ,
235
233
id_ ,
234
+ num_processes ,
236
235
)
237
236
)
238
237
@@ -246,11 +245,9 @@ def _run_setup(
246
245
scheduling_strategy : SchedulingStrategy ,
247
246
max_number : Optional [int ],
248
247
max_duration : Optional [float ],
249
- ) -> tuple [SchedulerRunInfo , Iterator [Any ], Iterator [ float ] ]:
248
+ ) -> tuple [SchedulerRunInfo , Iterator [Any ]]:
250
249
requests_iter = iter (self .request_loader )
251
- start_time = time .time ()
252
- times_iter = iter (scheduling_strategy .request_times ())
253
- end_time = time .time () + (max_duration or math .inf )
250
+ end_time = scheduling_strategy .start_time + (max_duration or math .inf )
254
251
end_number = max_number or math .inf
255
252
256
253
try :
@@ -268,27 +265,28 @@ def _run_setup(
268
265
)
269
266
270
267
info = SchedulerRunInfo (
271
- start_time = start_time ,
268
+ start_time = scheduling_strategy . start_time ,
272
269
end_time = end_time ,
273
270
end_number = end_number ,
274
271
processes = len (processes ),
275
272
strategy = scheduling_strategy ,
276
273
)
277
274
278
- return info , requests_iter , times_iter
275
+ return info , requests_iter
279
276
280
277
def _add_requests (
281
278
self ,
282
279
requests_iter : Optional [Iterator [Any ]],
283
- times_iter : Iterator [float ],
284
- requests_queue : Queue [RequestSession [RequestT , ResponseT ]],
285
- times_queue : Queue [WorkerProcessRequestTime ],
280
+ requests_queue : Queue [WorkerProcessRequest [RequestT , ResponseT ]],
286
281
run_info : SchedulerRunInfo ,
287
282
) -> Optional [Iterator [Any ]]:
288
283
if requests_iter is not None :
289
284
try :
290
285
added_count = 0
291
286
287
+ if time .time () >= run_info .end_time :
288
+ raise StopIteration
289
+
292
290
while (
293
291
not requests_queue .full ()
294
292
and added_count < settings .max_add_requests_per_loop
@@ -297,23 +295,16 @@ def _add_requests(
297
295
raise StopIteration
298
296
299
297
session = next (requests_iter )
300
- requests_queue .put (session )
301
- for _ in range (len (session )):
302
- if (
303
- request_time := next (times_iter )
304
- ) >= run_info .end_time or time .time () >= run_info .end_time :
305
- raise StopIteration
306
-
307
- work_req = WorkerProcessRequestTime (
308
- start_time = request_time ,
309
- timeout_time = run_info .end_time ,
310
- queued_time = time .time (),
311
- )
312
- times_queue .put (work_req )
313
-
314
- run_info .created_requests += 1
315
- run_info .queued_requests += 1
316
- added_count += 1
298
+ work_req = WorkerProcessRequest (
299
+ session = session ,
300
+ timeout_time = run_info .end_time ,
301
+ queued_time = time .time (),
302
+ )
303
+ requests_queue .put (work_req )
304
+
305
+ run_info .created_requests += len (session )
306
+ run_info .queued_requests += len (session )
307
+ added_count += len (session )
317
308
except StopIteration :
318
309
# we've reached the limit number, limit time, or exhausted the requests
319
310
# set to None to stop adding more and tell the loop no more requests
0 commit comments