17
17
from loguru import logger
18
18
19
19
from guidellm.config import settings
20
- from guidellm.request.session import RequestSession
21
20
from guidellm.request.types import (
22
21
RequestT,
23
22
ResponseT,
27
26
SchedulerRequestResult,
28
27
SchedulerResult,
29
28
SchedulerRunInfo,
30
- WorkerProcessRequestTime ,
29
+ WorkerProcessRequest ,
31
30
WorkerProcessResult,
32
31
)
33
32
from guidellm.scheduler.strategy import SchedulingStrategy
@@ -127,10 +126,14 @@ async def run(
127
126
) as executor,
128
127
):
129
128
requests_iter: Optional[Iterator[Any]] = None
129
+ # TODO: Configurable delay and move somewhere more appropriate
130
+ scheduling_strategy.start_time = (
131
+ time.time()
132
+ ) # Add a small delay to allow processes to start
130
133
futures, queues, stop_event = await self._start_processes(
131
134
manager, executor, scheduling_strategy
132
135
)
133
- run_info, requests_iter, times_iter = self._run_setup(
136
+ run_info, requests_iter = self._run_setup(
134
137
futures, scheduling_strategy, max_number, max_duration
135
138
)
136
139
yield SchedulerResult(
@@ -145,19 +148,14 @@ async def run(
145
148
if future.done() and (err := future.exception()) is not None:
146
149
raise err
147
150
148
- if (
149
- requests_iter is None
150
- and run_info.completed_requests >= run_info.created_requests
151
- ):
151
+ if requests_iter is None and run_info.processing_requests <= 0:
152
152
# we've exhausted all requests we've wanted to run
153
153
# and yielded all responses
154
154
break
155
155
156
156
requests_iter = self._add_requests(
157
157
requests_iter,
158
- times_iter,
159
158
queues.requests,
160
- queues.times,
161
159
run_info,
162
160
)
163
161
await asyncio.sleep(0) # enable requests to start
@@ -196,7 +194,6 @@ async def _start_processes(
196
194
requests=manager.Queue(
197
195
maxsize=scheduling_strategy.processing_requests_limit
198
196
),
199
- times=manager.Queue(maxsize=scheduling_strategy.processing_requests_limit),
200
197
responses=manager.Queue(),
201
198
)
202
199
stop_event = manager.Event()
@@ -229,10 +226,12 @@ async def _start_processes(
229
226
executor,
230
227
self.worker.process_loop_asynchronous,
231
228
queues,
229
+ scheduling_strategy,
232
230
stop_event,
233
231
False, # TODO: Make configurable
234
232
requests_limit,
235
233
id_,
234
+ num_processes,
236
235
)
237
236
)
238
237
@@ -246,11 +245,9 @@ def _run_setup(
246
245
scheduling_strategy: SchedulingStrategy,
247
246
max_number: Optional[int],
248
247
max_duration: Optional[float],
249
- ) -> tuple[SchedulerRunInfo, Iterator[Any], Iterator[float] ]:
248
+ ) -> tuple[SchedulerRunInfo, Iterator[Any]]:
250
249
requests_iter = iter(self.request_loader)
251
- start_time = time.time()
252
- times_iter = iter(scheduling_strategy.request_times())
253
- end_time = time.time() + (max_duration or math.inf)
250
+ end_time = scheduling_strategy.start_time + (max_duration or math.inf)
254
251
end_number = max_number or math.inf
255
252
256
253
try:
@@ -268,27 +265,28 @@ def _run_setup(
268
265
)
269
266
270
267
info = SchedulerRunInfo(
271
- start_time=start_time,
268
+ start_time=scheduling_strategy. start_time,
272
269
end_time=end_time,
273
270
end_number=end_number,
274
271
processes=len(processes),
275
272
strategy=scheduling_strategy,
276
273
)
277
274
278
- return info, requests_iter, times_iter
275
+ return info, requests_iter
279
276
280
277
def _add_requests(
281
278
self,
282
279
requests_iter: Optional[Iterator[Any]],
283
- times_iter: Iterator[float],
284
- requests_queue: Queue[RequestSession[RequestT, ResponseT]],
285
- times_queue: Queue[WorkerProcessRequestTime],
280
+ requests_queue: Queue[WorkerProcessRequest[RequestT, ResponseT]],
286
281
run_info: SchedulerRunInfo,
287
282
) -> Optional[Iterator[Any]]:
288
283
if requests_iter is not None:
289
284
try:
290
285
added_count = 0
291
286
287
+ if time.time() >= run_info.end_time:
288
+ raise StopIteration
289
+
292
290
while (
293
291
not requests_queue.full()
294
292
and added_count < settings.max_add_requests_per_loop
@@ -297,23 +295,16 @@ def _add_requests(
297
295
raise StopIteration
298
296
299
297
session = next(requests_iter)
300
- requests_queue.put(session)
301
- for _ in range(len(session)):
302
- if (
303
- request_time := next(times_iter)
304
- ) >= run_info.end_time or time.time() >= run_info.end_time:
305
- raise StopIteration
306
-
307
- work_req = WorkerProcessRequestTime(
308
- start_time=request_time,
309
- timeout_time=run_info.end_time,
310
- queued_time=time.time(),
311
- )
312
- times_queue.put(work_req)
313
-
314
- run_info.created_requests += 1
315
- run_info.queued_requests += 1
316
- added_count += 1
298
+ work_req = WorkerProcessRequest(
299
+ session=session,
300
+ timeout_time=run_info.end_time,
301
+ queued_time=time.time(),
302
+ )
303
+ requests_queue.put(work_req)
304
+
305
+ run_info.created_requests += len(session)
306
+ run_info.queued_requests += len(session)
307
+ added_count += len(session)
317
308
except StopIteration:
318
309
# we've reached the limit number, limit time, or exhausted the requests
319
310
# set to None to stop adding more and tell the loop no more requests
0 commit comments