Skip to content

Commit 3dcecaf

Browse files
committed
Revert loop logic changes
1 parent 87da497 commit 3dcecaf

File tree

1 file changed

+23
-11
lines changed

1 file changed

+23
-11
lines changed

src/guidellm/scheduler/scheduler.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -130,15 +130,17 @@ async def run(
130130
futures, queues, stop_event = await self._start_processes(
131131
manager, executor, scheduling_strategy
132132
)
133-
run_info, requests_iter = self._run_setup(
133+
run_info, requests_iter, times_iter = self._run_setup(
134134
futures, scheduling_strategy, max_number, max_duration
135135
)
136136

137137
# Add some initial requests to the queue
138138
requests_iter = self._add_requests(
139139
requests_iter,
140140
queues.requests,
141+
times_iter,
141142
run_info,
143+
loop_limit=run_info.strategy.queued_requests_limit,
142144
)
143145
# Wait for the test to start
144146
await asyncio.sleep(time.time() - scheduling_strategy.start_time)
@@ -154,14 +156,18 @@ async def run(
154156
if future.done() and (err := future.exception()) is not None:
155157
raise err
156158

157-
if requests_iter is None and run_info.processing_requests <= 0:
159+
if (
160+
requests_iter is None
161+
and run_info.completed_requests >= run_info.created_requests
162+
):
158163
# we've exhausted all requests we've wanted to run
159164
# and yielded all responses
160165
break
161166

162167
requests_iter = self._add_requests(
163168
requests_iter,
164169
queues.requests,
170+
times_iter,
165171
run_info,
166172
)
167173
await asyncio.sleep(0) # enable requests to start
@@ -250,8 +256,9 @@ def _run_setup(
250256
scheduling_strategy: SchedulingStrategy,
251257
max_number: Optional[int],
252258
max_duration: Optional[float],
253-
) -> tuple[SchedulerRunInfo, Iterator[Any]]:
259+
) -> tuple[SchedulerRunInfo, Iterator[Any], Iterator[float]]:
254260
requests_iter = iter(self.request_loader)
261+
times_iter = iter(scheduling_strategy.request_times())
255262
end_time = scheduling_strategy.start_time + (max_duration or math.inf)
256263
end_number = max_number or math.inf
257264

@@ -277,28 +284,33 @@ def _run_setup(
277284
strategy=scheduling_strategy,
278285
)
279286

280-
return info, requests_iter
287+
return info, requests_iter, times_iter
281288

282289
def _add_requests(
283290
self,
284291
requests_iter: Optional[Iterator[Any]],
285292
requests_queue: Queue[WorkerProcessRequest[RequestT, ResponseT]],
293+
times_iter: Iterator[float],
286294
run_info: SchedulerRunInfo,
295+
loop_limit: Optional[int] = None,
287296
) -> Optional[Iterator[Any]]:
288297
if requests_iter is not None:
298+
if loop_limit is None:
299+
loop_limit = settings.max_add_requests_per_loop
300+
289301
try:
290302
added_count = 0
291303

292-
if time.time() >= run_info.end_time:
293-
raise StopIteration
294-
295-
while not requests_queue.full() and added_count < (
296-
run_info.strategy.queued_requests_limit
297-
or settings.max_add_requests_per_loop
298-
):
304+
while not requests_queue.full() and added_count < loop_limit:
299305
if run_info.created_requests >= run_info.end_number:
300306
raise StopIteration
301307

308+
if (
309+
next(times_iter) >= run_info.end_time
310+
or time.time() >= run_info.end_time
311+
):
312+
raise StopIteration
313+
302314
work_req = WorkerProcessRequest[RequestT, ResponseT](
303315
request=next(requests_iter),
304316
timeout_time=run_info.end_time,

0 commit comments

Comments
 (0)