Skip to content

Commit 841499d

Browse files
committed
Drop times queue and handling timing in each worker
1 parent 94890cd commit 841499d

File tree

4 files changed

+94
-77
lines changed

4 files changed

+94
-77
lines changed

src/guidellm/scheduler/result.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
"SchedulerRequestResult",
1818
"SchedulerResult",
1919
"SchedulerRunInfo",
20-
"WorkerProcessRequestTime",
20+
"WorkerProcessRequest",
2121
"WorkerProcessResult",
2222
]
2323

@@ -147,8 +147,8 @@ class SchedulerRequestResult(
147147

148148

149149
@dataclass
150-
class WorkerProcessRequestTime:
151-
start_time: float
150+
class WorkerProcessRequest(Generic[RequestT, ResponseT]):
151+
session: RequestSession[RequestT, ResponseT]
152152
timeout_time: float
153153
queued_time: float
154154

@@ -163,6 +163,5 @@ class WorkerProcessResult(Generic[RequestT, ResponseT]):
163163

164164
@dataclass
165165
class MPQueues(Generic[RequestT, ResponseT]):
166-
requests: Queue[RequestSession[RequestT, ResponseT]]
167-
times: Queue[WorkerProcessRequestTime]
166+
requests: Queue[WorkerProcessRequest[RequestT, ResponseT]]
168167
responses: Queue[WorkerProcessResult[RequestT, ResponseT]]

src/guidellm/scheduler/scheduler.py

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from loguru import logger
1818

1919
from guidellm.config import settings
20-
from guidellm.request.session import RequestSession
2120
from guidellm.request.types import (
2221
RequestT,
2322
ResponseT,
@@ -27,7 +26,7 @@
2726
SchedulerRequestResult,
2827
SchedulerResult,
2928
SchedulerRunInfo,
30-
WorkerProcessRequestTime,
29+
WorkerProcessRequest,
3130
WorkerProcessResult,
3231
)
3332
from guidellm.scheduler.strategy import SchedulingStrategy
@@ -130,7 +129,7 @@ async def run(
130129
futures, queues, stop_event = await self._start_processes(
131130
manager, executor, scheduling_strategy
132131
)
133-
run_info, requests_iter, times_iter = self._run_setup(
132+
run_info, requests_iter = self._run_setup(
134133
futures, scheduling_strategy, max_number, max_duration
135134
)
136135
yield SchedulerResult(
@@ -155,9 +154,7 @@ async def run(
155154

156155
requests_iter = self._add_requests(
157156
requests_iter,
158-
times_iter,
159157
queues.requests,
160-
queues.times,
161158
run_info,
162159
)
163160
await asyncio.sleep(0) # enable requests to start
@@ -196,7 +193,6 @@ async def _start_processes(
196193
requests=manager.Queue(
197194
maxsize=scheduling_strategy.processing_requests_limit
198195
),
199-
times=manager.Queue(maxsize=scheduling_strategy.processing_requests_limit),
200196
responses=manager.Queue(),
201197
)
202198
stop_event = manager.Event()
@@ -214,6 +210,7 @@ async def _start_processes(
214210
% scheduling_strategy.processes_limit
215211
)
216212
process_ids = (id_ for id_ in range(num_processes))
213+
start_time = time.time()
217214
process_requests_limits = (
218215
requests_limit_split + 1
219216
if i < requests_limit_remain
@@ -229,10 +226,13 @@ async def _start_processes(
229226
executor,
230227
self.worker.process_loop_asynchronous,
231228
queues,
229+
scheduling_strategy,
230+
start_time,
232231
stop_event,
233232
False, # TODO: Make configurable
234233
requests_limit,
235234
id_,
235+
num_processes,
236236
)
237237
)
238238

@@ -246,10 +246,9 @@ def _run_setup(
246246
scheduling_strategy: SchedulingStrategy,
247247
max_number: Optional[int],
248248
max_duration: Optional[float],
249-
) -> tuple[SchedulerRunInfo, Iterator[Any], Iterator[float]]:
249+
) -> tuple[SchedulerRunInfo, Iterator[Any]]:
250250
requests_iter = iter(self.request_loader)
251251
start_time = time.time()
252-
times_iter = iter(scheduling_strategy.request_times())
253252
end_time = time.time() + (max_duration or math.inf)
254253
end_number = max_number or math.inf
255254

@@ -275,14 +274,12 @@ def _run_setup(
275274
strategy=scheduling_strategy,
276275
)
277276

278-
return info, requests_iter, times_iter
277+
return info, requests_iter
279278

280279
def _add_requests(
281280
self,
282281
requests_iter: Optional[Iterator[Any]],
283-
times_iter: Iterator[float],
284-
requests_queue: Queue[RequestSession[RequestT, ResponseT]],
285-
times_queue: Queue[WorkerProcessRequestTime],
282+
requests_queue: Queue[WorkerProcessRequest[RequestT, ResponseT]],
286283
run_info: SchedulerRunInfo,
287284
) -> Optional[Iterator[Any]]:
288285
if requests_iter is not None:
@@ -296,24 +293,20 @@ def _add_requests(
296293
if run_info.created_requests >= run_info.end_number:
297294
raise StopIteration
298295

296+
if time.time() >= run_info.end_time:
297+
raise StopIteration
298+
299299
session = next(requests_iter)
300-
requests_queue.put(session)
301-
for _ in range(len(session)):
302-
if (
303-
request_time := next(times_iter)
304-
) >= run_info.end_time or time.time() >= run_info.end_time:
305-
raise StopIteration
306-
307-
work_req = WorkerProcessRequestTime(
308-
start_time=request_time,
309-
timeout_time=run_info.end_time,
310-
queued_time=time.time(),
311-
)
312-
times_queue.put(work_req)
313-
314-
run_info.created_requests += 1
315-
run_info.queued_requests += 1
316-
added_count += 1
300+
work_req = WorkerProcessRequest(
301+
session=session,
302+
timeout_time=run_info.end_time,
303+
queued_time=time.time(),
304+
)
305+
requests_queue.put(work_req)
306+
307+
run_info.created_requests += len(session)
308+
run_info.queued_requests += len(session)
309+
added_count += len(session)
317310
except StopIteration:
318311
# we've reached the limit number, limit time, or exhausted the requests
319312
# set to None to stop adding more and tell the loop no more requests

src/guidellm/scheduler/strategy.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,9 @@ def processing_requests_limit(self) -> int:
9494
"""
9595
return settings.max_concurrency
9696

97-
def request_times(self) -> Generator[float, None, None]:
97+
def request_times(
98+
self, start_time: Optional[float] = None
99+
) -> Generator[float, None, None]:
98100
"""
99101
A generator that yields timestamps for when requests should be sent.
100102
This method should be implemented by subclasses to provide specific
@@ -168,7 +170,9 @@ def processing_requests_limit(self) -> int:
168170
"""
169171
return 1
170172

171-
def request_times(self) -> Generator[float, None, None]:
173+
def request_times(
174+
self, start_time: Optional[float] = None
175+
) -> Generator[float, None, None]:
172176
"""
173177
A generator that yields time.time() so requests are sent immediately,
174178
while scheduling them synchronously.
@@ -254,7 +258,9 @@ def processing_requests_limit(self) -> int:
254258
"""
255259
return self.streams
256260

257-
def request_times(self) -> Generator[float, None, None]:
261+
def request_times(
262+
self, start_time: Optional[float] = None
263+
) -> Generator[float, None, None]:
258264
"""
259265
A generator that yields time.time() so requests are sent
260266
immediately, while scheduling them concurrently with the specified
@@ -328,15 +334,18 @@ def processing_requests_limit(self) -> int:
328334
"""
329335
return self.max_concurrency or super().processing_requests_limit
330336

331-
def request_times(self) -> Generator[float, None, None]:
337+
def request_times(
338+
self, start_time: Optional[float] = None
339+
) -> Generator[float, None, None]:
332340
"""
333341
A generator that yields the start time.time() so requests are sent
334342
immediately, while scheduling as many asynchronously as possible.
335343
336344
:return: A generator that yields the start time.time()
337345
for immediate request scheduling.
338346
"""
339-
start_time = time.time()
347+
if start_time is None:
348+
start_time = time.time()
340349

341350
while True:
342351
yield start_time
@@ -379,7 +388,9 @@ class AsyncConstantStrategy(ThroughputStrategy):
379388
),
380389
)
381390

382-
def request_times(self) -> Generator[float, None, None]:
391+
def request_times(
392+
self, start_time: Optional[float] = None
393+
) -> Generator[float, None, None]:
383394
"""
384395
A generator that yields timestamps for when requests should be sent.
385396
This method schedules requests asynchronously at a constant rate
@@ -391,7 +402,8 @@ def request_times(self) -> Generator[float, None, None]:
391402
392403
:return: A generator that yields timestamps for request scheduling.
393404
"""
394-
start_time = time.time()
405+
if start_time is None:
406+
start_time = time.time()
395407
constant_increment = 1.0 / self.rate
396408

397409
# handle bursts first to get to the desired rate
@@ -451,7 +463,9 @@ class AsyncPoissonStrategy(ThroughputStrategy):
451463
description=("The random seed to use for the Poisson distribution. "),
452464
)
453465

454-
def request_times(self) -> Generator[float, None, None]:
466+
def request_times(
467+
self, start_time: Optional[float] = None
468+
) -> Generator[float, None, None]:
455469
"""
456470
A generator that yields timestamps for when requests should be sent.
457471
This method schedules requests asynchronously at a Poisson rate
@@ -461,7 +475,8 @@ def request_times(self) -> Generator[float, None, None]:
461475
462476
:return: A generator that yields timestamps for request scheduling.
463477
"""
464-
start_time = time.time()
478+
if start_time is None:
479+
start_time = time.time()
465480

466481
if self.initial_burst is not None:
467482
# send an initial burst equal to the rate

0 commit comments

Comments
 (0)