Skip to content

Commit e95007d

Browse files
committed
Fixes for constant rate benchmarking race condition, simplfications, and minor bug fixes for stats accumulation and data loading
1 parent c27d488 commit e95007d

File tree

10 files changed

+672
-759
lines changed

10 files changed

+672
-759
lines changed

src/guidellm/benchmark/benchmarker.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,16 +109,18 @@ async def run(
109109
)
110110
estimated_state = EstimatedBenchmarkState()
111111
scheduler_state = None
112+
scheduler: Scheduler[RequestT, ResponseT] = Scheduler()
112113

113114
async for (
114115
response,
115116
request,
116117
request_info,
117118
scheduler_state,
118-
) in Scheduler[RequestT, ResponseT]().run(
119+
) in scheduler.run(
119120
requests=requests,
120121
backend=backend,
121122
strategy=strategy,
123+
startup_duration=warmup if warmup and warmup >= 1 else 0.0,
122124
env=environment,
123125
**constraints or {},
124126
):

src/guidellm/benchmark/schemas.py

Lines changed: 162 additions & 74 deletions
Large diffs are not rendered by default.

src/guidellm/data/loaders.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def __iter__(self):
6565
worker_modulus = worker_info.num_workers if worker_info is not None else 1
6666
worker_index = worker_info.id if worker_info is not None else 0
6767

68-
if self.precache is not None:
68+
if self.precache:
6969
for index, item in enumerate(self.precache):
7070
if (index + worker_index) % worker_modulus == 0:
7171
yield item

src/guidellm/scheduler/__init__.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,15 @@
1+
"""
2+
Scheduler subsystem for orchestrating benchmark workloads and managing worker processes.
3+
4+
This module provides the core scheduling infrastructure for guidellm, including
5+
strategies for controlling request timing patterns (synchronous, asynchronous,
6+
constant rate, Poisson), constraints for limiting benchmark execution (duration,
7+
error rates, request counts), and distributed execution through worker processes.
8+
The scheduler coordinates between backend interfaces, manages benchmark state
9+
transitions, and handles multi-turn request sequences with customizable timing
10+
strategies and resource constraints.
11+
"""
12+
113
from .constraints import (
214
Constraint,
315
ConstraintInitializer,
@@ -28,11 +40,6 @@
2840
AsyncConstantStrategy,
2941
AsyncPoissonStrategy,
3042
ConcurrentStrategy,
31-
ConstantRateRequestTimings,
32-
LastCompletionRequestTimings,
33-
NoDelayRequestTimings,
34-
PoissonRateRequestTimings,
35-
ScheduledRequestTimings,
3643
SchedulingStrategy,
3744
StrategyT,
3845
StrategyType,

src/guidellm/scheduler/environments.py

Lines changed: 26 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
"""
22
Environment abstractions for coordinating scheduler execution across distributed nodes.
33
4-
Provides environment abstractions that handle synchronization, timing coordination,
5-
error propagation, and lifecycle management for scheduler execution across single
6-
or multiple nodes. The Environment protocol defines the interface for distributed
4+
Provides abstractions that handle synchronization, timing coordination, error
5+
propagation, and lifecycle management for scheduler execution across single or
6+
multiple nodes. The Environment protocol defines the interface for distributed
77
coordination while NonDistributedEnvironment provides a minimal implementation
8-
for single-node execution.
8+
for single-node execution. Environments manage the complete execution lifecycle
9+
from parameter distribution through result aggregation.
910
10-
Environment Execution Flow:
11-
1. sync_run_params() - Distribute workload and synchronize parameters across nodes
12-
2. sync_run_start() - Coordinate synchronized start time for all nodes
13-
3. update_run_iteration() - Update state after each request (called per iteration)
11+
Execution Flow:
12+
1. sync_run_params() - Distribute workload and synchronize parameters
13+
2. sync_run_start() - Coordinate synchronized start time
14+
3. update_run_iteration() - Update state after each request iteration
1415
4. sync_run_error() - Handle and propagate errors across nodes
15-
5. sync_run_end() - Aggregate results and cleanup at completion
16+
5. sync_run_end() - Aggregate results and finalize execution
1617
"""
1718

1819
from __future__ import annotations
@@ -39,12 +40,12 @@
3940

4041
class Environment(ABC, Generic[RequestT, ResponseT], InfoMixin):
4142
"""
42-
Abstract base for coordinating scheduler execution across distributed nodes.
43+
Abstract interface for coordinating scheduler execution across distributed nodes.
4344
44-
Defines the interface for managing distributed scheduler execution including
45+
Defines the protocol for managing distributed scheduler execution including
4546
parameter synchronization, timing coordination, state updates, error propagation,
46-
and result aggregation. Implementations handle the complexity of distributed
47-
coordination while providing a unified interface for scheduler orchestration.
47+
and result aggregation. Implementations handle distributed coordination complexity
48+
while providing a unified interface for scheduler orchestration.
4849
"""
4950

5051
@abstractmethod
@@ -61,10 +62,6 @@ async def sync_run_params(
6162
"""
6263
Synchronize execution parameters across nodes and resolve local scope.
6364
64-
Coordinates parameter distribution and validation across active nodes.
65-
In distributed environments, handles node assignment and workload partitioning.
66-
In non-distributed environments, typically returns parameters unchanged.
67-
6865
:param requests: Complete set of requests to process across all nodes
6966
:param strategy: Scheduling strategy to apply during execution
7067
:param constraints: Runtime constraints to enforce during execution
@@ -78,9 +75,6 @@ async def sync_run_start(self) -> float:
7875
"""
7976
Coordinate synchronized start time across all nodes.
8077
81-
Ensures all nodes begin processing simultaneously for accurate benchmarking
82-
and consistent timing measurements across distributed execution.
83-
8478
:return: Unix timestamp when all nodes should begin processing
8579
:raises Exception: If startup synchronization fails across nodes
8680
"""
@@ -97,11 +91,6 @@ async def update_run_iteration(
9791
"""
9892
Update environment state with completed request iteration results.
9993
100-
Called after each request processing to update execution progress and
101-
synchronize any required state across nodes in distributed environments.
102-
Generally, distributed is expected to store the iteration updates until
103-
all nodes have processed and sync_run_end is called to retrieve them.
104-
10594
:param response: Response generated for the request, if successful
10695
:param request: The processed request
10796
:param request_info: Metadata about request processing including timings
@@ -115,9 +104,6 @@ async def sync_run_error(self, err: list[Exception] | Exception):
115104
"""
116105
Handle and propagate errors across all active nodes.
117106
118-
Coordinates error handling when failures occur, ensuring all nodes are
119-
notified for appropriate cleanup or shutdown procedures.
120-
121107
:param err: The exception(s) that occurred during execution
122108
"""
123109
...
@@ -136,10 +122,6 @@ async def sync_run_end(
136122
"""
137123
Finalize execution and aggregate results from all nodes.
138124
139-
Handles cleanup, result synchronization, and error propagation at execution
140-
completion. Collects and yields results from worker nodes in distributed
141-
environments.
142-
143125
:return: Iterator of (response, request, request_info, state) tuples from
144126
remote nodes in distributed environments, empty for non-distributed
145127
:raises Exception: Any errors that occurred during execution
@@ -151,9 +133,9 @@ class NonDistributedEnvironment(Environment[RequestT, ResponseT]):
151133
"""
152134
Single-node scheduler execution environment with minimal coordination overhead.
153135
154-
Simplified environment for running schedulers on a single node without distributed
155-
coordination requirements. Implements the Environment interface with no-op
156-
synchronization for local testing, development, and single-machine benchmarking.
136+
Implements the Environment interface with no-op synchronization for local testing,
137+
development, and single-machine benchmarking. All synchronization methods return
138+
immediately without distributed coordination logic.
157139
158140
Example:
159141
::
@@ -165,29 +147,27 @@ class NonDistributedEnvironment(Environment[RequestT, ResponseT]):
165147
SynchronousStrategy,
166148
)
167149
168-
169-
# Definitions
150+
env = NonDistributedEnvironment()
170151
requests = [f"req_{ind}" for ind in range(5)]
171152
strategy = SynchronousStrategy()
172153
constraints = {"max_num": MaxNumberConstraint(max_num=5)}
173154
state = SchedulerState()
174155
175-
# Run environment
176156
local_req, local_strat, local_const = await env.sync_run_params(
177157
requests, strategy, constraints
178158
)
179159
start_time = await env.sync_run_start()
180160
for req in local_req:
181161
state.processed_requests += 1
182-
await env.update_run_iteration(
183-
f"resp_{req}", req, RequestInfo(), state
184-
)
162+
await env.update_run_iteration(f"resp_{req}", req, RequestInfo(), state)
185163
async for nonlocal_req in env.sync_run_end():
186164
state.processed_requests += 1
187165
"""
188166

189167
def __init__(self):
190-
"""Initialize with empty error storage for single-node execution."""
168+
"""
169+
Initialize single-node environment with empty error storage.
170+
"""
191171
self.run_errors: list[Exception] = []
192172

193173
async def sync_run_params(
@@ -206,15 +186,15 @@ async def sync_run_params(
206186
:param requests: Requests to process locally
207187
:param strategy: Scheduling strategy to apply during execution
208188
:param constraints: Runtime constraints to enforce during execution
209-
:return: Tuple containing the original (requests, strategy, constraints)
189+
:return: Original (requests, strategy, constraints) tuple unchanged
210190
"""
211191
return requests, strategy, constraints
212192

213193
async def sync_run_start(self) -> float:
214194
"""
215195
Return current time plus configured delay for single-node startup.
216196
217-
:return: Unix timestamp for when the run should start
197+
:return: Unix timestamp when execution should begin
218198
"""
219199
return time.time() + settings.scheduler_start_delay_non_distributed
220200

@@ -229,7 +209,7 @@ async def update_run_iteration(
229209
No-op for single-node execution with no distributed state synchronization.
230210
231211
:param response: Response generated for the request, if successful
232-
:param request: The request that was processed
212+
:param request: The processed request
233213
:param request_info: Metadata about request processing including timings
234214
:param state: Current scheduler state with metrics and progress
235215
"""
@@ -256,7 +236,7 @@ async def sync_run_end(
256236
"""
257237
Finalize single-node execution and propagate any stored errors.
258238
259-
:return: Empty iterator since there are no remote nodes
239+
:return: Empty iterator as there are no remote nodes
260240
:raises Exception: Any error stored during execution via sync_run_error
261241
"""
262242
if self.run_errors:

src/guidellm/scheduler/scheduler.py

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
"""
2-
Thread-safe singleton scheduler for distributed load generation workload coordination.
2+
Thread-safe singleton scheduler for distributed benchmarking workload coordination.
33
4-
Provides the core orchestration engine that coordinates request processing across
5-
worker processes and distributed environments. Manages timing synchronization,
6-
resource allocation, constraint enforcement, and result aggregation for
7-
load generation operations. Integrates with backends, environments, and strategies
8-
to enable scalable load testing across various scenarios including LLM inference.
4+
Orchestrates request processing across worker processes with distributed timing
5+
coordination, constraint enforcement, and result aggregation. Integrates with
6+
backends, environments, and strategies to enable scalable load testing across
7+
various scenarios including LLM inference benchmarking.
98
"""
109

1110
from __future__ import annotations
@@ -38,16 +37,14 @@ class Scheduler(
3837
Thread-safe singleton scheduler for distributed benchmarking workload coordination.
3938
4039
Orchestrates request processing across worker processes with distributed timing
41-
coordination, constraint enforcement, and result aggregation. Provides a unified
42-
interface for executing benchmarking operations while abstracting the complexity
43-
of multi-process coordination, environment synchronization, and resource management.
44-
Implements singleton pattern to ensure consistent execution state across concurrent
45-
benchmark operations.
40+
coordination, constraint enforcement, and result aggregation. Abstracts the
41+
complexity of multi-process coordination, environment synchronization, and
42+
resource management while providing a unified interface for executing benchmarking
43+
operations. Implements singleton pattern to ensure consistent execution state.
4644
4745
Example:
4846
::
4947
from guidellm.scheduler import Scheduler
50-
from guidellm.backends import OpenAIBackend
5148
from guidellm.scheduler import NonDistributedEnvironment, SynchronousStrategy
5249
5350
scheduler = Scheduler()
@@ -58,14 +55,15 @@ class Scheduler(
5855
env=NonDistributedEnvironment(),
5956
max_requests=1000
6057
):
61-
print(f"Processed: {request} with info: {info} and response: {response}")
58+
print(f"Processed: {request}")
6259
"""
6360

6461
async def run(
6562
self,
6663
requests: Iterable[RequestT | MultiTurnRequestT[RequestT]],
6764
backend: BackendInterface[RequestT, ResponseT],
6865
strategy: SchedulingStrategy,
66+
startup_duration: float,
6967
env: Environment[RequestT, ResponseT] | None,
7068
**constraints: Any | dict[str, Any] | Constraint,
7169
) -> AsyncIterator[
@@ -80,22 +78,23 @@ async def run(
8078
Execute distributed request processing with coordinated timing and constraints.
8179
8280
Orchestrates the complete benchmarking workflow across worker processes with
83-
environment synchronization, constraint enforcement, and error handling.
84-
Manages resource lifecycle from initialization through cleanup while yielding
85-
real-time processing updates for monitoring and aggregation.
81+
environment synchronization, constraint enforcement, and error handling. Manages
82+
resource lifecycle from initialization through cleanup while yielding real-time
83+
processing updates for monitoring and aggregation.
8684
87-
:param requests: Request collection to process. Supports single requests or
85+
:param requests: Request collection to process, supporting single requests or
8886
multi-turn sequences with optional inter-request delays
8987
:param backend: Backend interface for request processing and response generation
9088
:param strategy: Scheduling strategy controlling request timing and distribution
89+
:param startup_duration: Duration in seconds for requests to ramp up
9190
:param env: Environment interface for distributed coordination and
92-
synchronization
91+
synchronization. Defaults to NonDistributedEnvironment if None
9392
:param constraints: Runtime constraints for execution control (max_requests,
94-
max_duration, max_error_rate, etc.). Values can be primitives, dictionaries,
95-
or constraint instances
96-
:yields: Requests udpates as (response, request, request_info, scheduler_state)
97-
tuples. Each request will generate three ordered updates:
98-
queued, in_progress, completed | errored | cancelled.
93+
max_duration, max_error_rate, etc.) as primitives, dictionaries, or
94+
constraint instances
95+
:yields: Request updates as (response, request, request_info, scheduler_state)
96+
tuples. Each request generates three ordered updates: queued, in_progress,
97+
completed | errored | cancelled
9998
:raises Exception: Worker process errors, environment synchronization failures,
10099
or constraint evaluation errors are propagated after cleanup
101100
"""
@@ -122,10 +121,10 @@ async def run(
122121
# Setup the worker group, sync start with the environment
123122
worker_group = WorkerProcessGroup[RequestT, ResponseT](
124123
requests=local_requests,
125-
cycle_requests=local_requests,
126124
backend=backend,
127125
strategy=local_strategy,
128-
constraints=local_constraints,
126+
startup_duration=startup_duration,
127+
**local_constraints,
129128
)
130129
await worker_group.create_processes()
131130
local_start_time = await env.sync_run_start()

0 commit comments

Comments
 (0)