Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/guidellm/__main__.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally we don't like to add top-level arguments that only map to a specific use-case. Many of the other components have a "kwargs" argument so it might make sense to add a --profile-kwargs. Also --rate needs to map to something so maybe make it increment_factor (or start_rate; whichever it makes more sense to sweep over).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option might be to modify --rate to take tuples of (start_rate, increment_factor).

cc: @markurtz @jaredoconnell to weigh in on this.

Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,17 @@ def benchmark():
help=(
"Benchmark rate(s) to test. Meaning depends on profile: "
"sweep=number of benchmarks, concurrent=concurrent requests, "
"async/constant/poisson=requests per second."
"async/constant/poisson=requests per second, "
"incremental=start rate in requests per second."
),
)
@click.option(
"--profile-kwargs",
callback=cli_tools.parse_json,
default=BenchmarkGenerativeTextArgs.get_default("profile_kwargs"),
help=(
"JSON string of arguments to pass to the profile. "
'For incremental: {"increment_factor": 0.5, "rate_limit": 100}.'
),
)
# Backend configuration
Expand Down
2 changes: 2 additions & 0 deletions src/guidellm/benchmark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .profiles import (
AsyncProfile,
ConcurrentProfile,
IncrementalProfile,
Profile,
ProfileType,
SweepProfile,
Expand Down Expand Up @@ -81,6 +82,7 @@
"GenerativeRequestsAccumulator",
"GenerativeTextMetricsSummary",
"GenerativeVideoMetricsSummary",
"IncrementalProfile",
"Profile",
"ProfileType",
"RunningMetricStats",
Expand Down
3 changes: 3 additions & 0 deletions src/guidellm/benchmark/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ async def resolve_profile(
max_error_rate: float | None,
max_global_error_rate: float | None,
console: Console | None = None,
profile_kwargs: dict[str, Any] | None = None,
) -> Profile:
"""
Resolve and configure a benchmark profile with rate and constraint settings.
Expand Down Expand Up @@ -370,6 +371,7 @@ async def resolve_profile(
random_seed=random_seed,
rampup_duration=rampup,
constraints={**constraints},
**(profile_kwargs or {}),
)
elif constraints:
raise ValueError(
Expand Down Expand Up @@ -501,6 +503,7 @@ async def benchmark_generative_text(
max_error_rate=args.max_error_rate,
max_global_error_rate=args.max_global_error_rate,
console=console,
profile_kwargs=args.profile_kwargs,
)
output_formats = await resolve_output_formats(
outputs=args.outputs, output_dir=args.output_dir, console=console
Expand Down
120 changes: 119 additions & 1 deletion src/guidellm/benchmark/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from guidellm import settings
from guidellm.scheduler import (
AsyncConstantStrategy,
AsyncIncrementalStrategy,
AsyncPoissonStrategy,
ConcurrentStrategy,
Constraint,
Expand All @@ -46,6 +47,7 @@
__all__ = [
"AsyncProfile",
"ConcurrentProfile",
"IncrementalProfile",
"Profile",
"ProfileType",
"SweepProfile",
Expand All @@ -54,7 +56,7 @@
]

ProfileType = Annotated[
Literal["synchronous", "concurrent", "throughput", "async", "sweep"],
Literal["synchronous", "concurrent", "throughput", "async", "sweep", "incremental"],
"Profile type identifiers for polymorphic deserialization",
]

Expand Down Expand Up @@ -712,3 +714,119 @@ def next_strategy(
)
else:
raise ValueError(f"Invalid strategy type: {self.strategy_type}")


@Profile.register("incremental")
class IncrementalProfile(ThroughputProfile):
"""
Incremental rate execution profile with incremental load over time.

Schedules requests starting at a base rate and incrementally increasing
the rate by a factor over time until reaching an optional rate limit.
"""

type_: Literal["incremental"] = "incremental" # type: ignore[assignment]
start_rate: PositiveFloat = Field(
description="Initial rate at which to schedule requests in requests per second",
)
increment_factor: PositiveFloat = Field(
description="Factor by which to increase the rate over time",
)
rate_limit: PositiveFloat | None = Field(
default=None,
description="Maximum rate cap after which load remains constant",
)
initial_burst: bool = Field(
default=True,
description=(
"Whether to send initial burst of math.floor(start_rate) requests "
"to reach target rate"
),
)

@classmethod
def resolve_args(
cls,
rate_type: str,
rate: list[float] | None,
random_seed: int,
increment_factor: float | None = None,
rate_limit: float | None = None,
**kwargs: Any,
) -> dict[str, Any]:
"""
Resolve arguments for incremental profile construction.

:param rate_type: Profile type identifier
:param rate: Start rate in requests per second
:param random_seed: Random seed (ignored)
:param increment_factor: Rate increase factor over time
:param rate_limit: Optional maximum rate cap
:param kwargs: Additional arguments passed through unchanged
:return: Resolved arguments dictionary
:raises ValueError: If required params missing or invalid
"""
_ = random_seed # unused
if rate_type != "incremental":
raise ValueError("Rate type must be 'incremental' for incremental profile")

# For incremental profile, rate is used as start_rate
start_rate = rate[0] if isinstance(rate, list) and rate else rate
if start_rate is None:
raise ValueError(
"rate is required for incremental profile (used as start_rate)"
)

if increment_factor is None:
raise ValueError(
"increment_factor is required for incremental profile. "
"Pass it via --profile-kwargs '{\"increment_factor\": <value>}'"
)

if start_rate <= 0:
raise ValueError("rate (start_rate) must be a positive number")

if increment_factor <= 0:
raise ValueError("increment_factor must be a positive number")

if rate_limit is not None and rate_limit <= 0:
raise ValueError("rate_limit must be a positive number")

kwargs["start_rate"] = start_rate
kwargs["increment_factor"] = increment_factor
if rate_limit is not None:
kwargs["rate_limit"] = rate_limit

return kwargs

@property
def strategy_types(self) -> list[StrategyType]:
"""
:return: Single incremental strategy type
"""
return [self.type_]

def next_strategy(
self,
prev_strategy: SchedulingStrategy | None,
prev_benchmark: Benchmark | None,
) -> AsyncIncrementalStrategy | None:
"""
Generate incremental strategy or None if already completed.

:param prev_strategy: Previously completed strategy (unused)
:param prev_benchmark: Benchmark results from previous execution (unused)
:return: AsyncIncrementalStrategy for first execution, None afterward
"""
_ = (prev_strategy, prev_benchmark) # unused
if len(self.completed_strategies) >= 1:
return None

return AsyncIncrementalStrategy(
start_rate=self.start_rate,
increment_factor=self.increment_factor,
rate_limit=self.rate_limit,
initial_burst=self.initial_burst,
max_concurrency=self.max_concurrency,
startup_duration=self.startup_duration,
)
3 changes: 3 additions & 0 deletions src/guidellm/benchmark/schemas/generative/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ def get_default(cls: type[BenchmarkGenerativeTextArgs], field: str) -> Any:
rate: list[float] | None = Field(
default=None, description="Request rate(s) for rate-based scheduling"
)
profile_kwargs: dict[str, Any] | None = Field(
default=None, description="Additional profile-specific configuration arguments"
)
# Backend configuration
backend: BackendType | Backend = Field(
default="openai_http", description="Backend type or instance for execution"
Expand Down
2 changes: 2 additions & 0 deletions src/guidellm/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
)
from .strategies import (
AsyncConstantStrategy,
AsyncIncrementalStrategy,
AsyncPoissonStrategy,
ConcurrentStrategy,
SchedulingStrategy,
Expand All @@ -51,6 +52,7 @@

__all__ = [
"AsyncConstantStrategy",
"AsyncIncrementalStrategy",
"AsyncPoissonStrategy",
"BackendInterface",
"BackendT",
Expand Down
117 changes: 116 additions & 1 deletion src/guidellm/scheduler/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from __future__ import annotations

import asyncio
import math
import random
from abc import abstractmethod
from multiprocessing import Lock, Value
Expand All @@ -28,6 +29,7 @@

__all__ = [
"AsyncConstantStrategy",
"AsyncIncrementalStrategy",
"AsyncPoissonStrategy",
"ConcurrentStrategy",
"SchedulingStrategy",
Expand All @@ -39,7 +41,9 @@


StrategyType = Annotated[
Literal["synchronous", "concurrent", "throughput", "constant", "poisson"],
Literal[
"synchronous", "concurrent", "throughput", "constant", "poisson", "incremental"
],
"Valid strategy type identifiers for scheduling request patterns",
]

Expand Down Expand Up @@ -627,3 +631,114 @@ def request_completed(self, request_info: RequestInfo):
:param request_info: Completed request metadata (unused)
"""
_ = request_info # request_info unused for async poisson strategy


@SchedulingStrategy.register("incremental")
class AsyncIncrementalStrategy(ThroughputStrategy):
"""
Incremental rate scheduling with gradual load increase over time.

Schedules requests starting at a base rate and incrementally increasing
the rate by a factor over time until reaching an optional rate limit.
Supports initial burst mode to quickly reach the target starting rate.
Useful for finding system saturation points or progressive load testing.
"""

type_: Literal["incremental"] = "incremental" # type: ignore[assignment]
start_rate: float = Field(
description="Initial rate at which to schedule requests in requests/second",
gt=0,
)
increment_factor: float = Field(
description="Factor by which to increase the rate over time",
gt=0,
)
rate_limit: float | None = Field(
default=None,
description="Maximum rate cap after which load remains constant",
gt=0,
)
initial_burst: bool = Field(
default=True,
description=(
"Whether to send initial burst of math.floor(start_rate) requests "
"to reach target rate"
),
)

_process_offset: float | None = PrivateAttr(None)
_burst_sent: bool = PrivateAttr(False)

def __str__(self) -> str:
"""
:return: String identifier with start rate and increment factor
"""
return f"incremental@{self.start_rate:.2f}+{self.increment_factor:.2f}"

def init_processes_timings(
self,
worker_count: int,
max_concurrency: int,
startup_duration: float,
):
"""
Initialize incremental-specific timing state.

:param worker_count: Number of worker processes to coordinate
:param max_concurrency: Maximum number of concurrent requests allowed
:param startup_duration: Duration in seconds for request startup ramping
"""
super().init_processes_timings(worker_count, max_concurrency, startup_duration)
with self._processes_lock:
self._process_offset = None

async def next_request_time(self, offset: int) -> float:
"""
Calculate next request time with incremental rate increase.

Implements gradual rate increase: rate = start_rate + (increment_factor * elapsed_time)
Optionally sends initial burst and caps at rate_limit.

:param offset: Unused for incremental strategy
:return: Next request time based on incremental rate calculation
"""
_ = offset # offset unused for incremental strategy
start_time = await self.get_processes_start_time()

# Handle initial burst if enabled
if self.initial_burst and not self._burst_sent:
self._burst_sent = True
burst_count = math.floor(self.start_rate)
for _ in range(burst_count):
pass
if self._process_offset is None:
self._process_offset = start_time

if self._process_offset is None:
self._process_offset = start_time

current_time = time.time()
if current_time <= start_time:
return start_time

# Calculate current rate based on elapsed time
elapsed_time = current_time - start_time
next_rate = self.start_rate + (self.increment_factor * elapsed_time)

# Cap at rate limit if specified
if self.rate_limit and next_rate >= self.rate_limit:
increment = 1.0 / self.rate_limit
else:
increment = 1.0 / next_rate

self._process_offset += increment
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strategies are shared across threads/processes, any assignments need to be atomic or locked with self._processes_lock. See other Strategies for examples.


return self._process_offset

def request_completed(self, request_info: RequestInfo):
"""
Handle request completion (no-op for incremental strategy).

:param request_info: Completed request metadata (unused)
"""
_ = request_info # request_info unused for async incremental strategy