diff --git a/src/guidellm/__main__.py b/src/guidellm/__main__.py index 2b52bbc5b..3878137cf 100644 --- a/src/guidellm/__main__.py +++ b/src/guidellm/__main__.py @@ -149,7 +149,17 @@ def benchmark(): help=( "Benchmark rate(s) to test. Meaning depends on profile: " "sweep=number of benchmarks, concurrent=concurrent requests, " - "async/constant/poisson=requests per second." + "async/constant/poisson=requests per second, " + "incremental=start rate in requests per second." + ), +) +@click.option( + "--profile-kwargs", + callback=cli_tools.parse_json, + default=BenchmarkGenerativeTextArgs.get_default("profile_kwargs"), + help=( + "JSON string of arguments to pass to the profile. " + 'For incremental: {"increment_factor": 0.5, "rate_limit": 100}.' ), ) # Backend configuration diff --git a/src/guidellm/benchmark/__init__.py b/src/guidellm/benchmark/__init__.py index 6cbf80c9d..a0d6c8789 100644 --- a/src/guidellm/benchmark/__init__.py +++ b/src/guidellm/benchmark/__init__.py @@ -21,6 +21,7 @@ from .profiles import ( AsyncProfile, ConcurrentProfile, + IncrementalProfile, Profile, ProfileType, SweepProfile, @@ -81,6 +82,7 @@ "GenerativeRequestsAccumulator", "GenerativeTextMetricsSummary", "GenerativeVideoMetricsSummary", + "IncrementalProfile", "Profile", "ProfileType", "RunningMetricStats", diff --git a/src/guidellm/benchmark/entrypoints.py b/src/guidellm/benchmark/entrypoints.py index 5b57b22fe..5e8371238 100644 --- a/src/guidellm/benchmark/entrypoints.py +++ b/src/guidellm/benchmark/entrypoints.py @@ -324,6 +324,7 @@ async def resolve_profile( max_error_rate: float | None, max_global_error_rate: float | None, console: Console | None = None, + profile_kwargs: dict[str, Any] | None = None, ) -> Profile: """ Resolve and configure a benchmark profile with rate and constraint settings. @@ -370,6 +371,7 @@ async def resolve_profile( random_seed=random_seed, rampup_duration=rampup, constraints={**constraints}, + **(profile_kwargs or {}), ) elif constraints: raise ValueError( @@ -501,6 +503,7 @@ async def benchmark_generative_text( max_error_rate=args.max_error_rate, max_global_error_rate=args.max_global_error_rate, console=console, + profile_kwargs=args.profile_kwargs, ) output_formats = await resolve_output_formats( outputs=args.outputs, output_dir=args.output_dir, console=console diff --git a/src/guidellm/benchmark/profiles.py b/src/guidellm/benchmark/profiles.py index 08fea0efc..5aae2a68b 100644 --- a/src/guidellm/benchmark/profiles.py +++ b/src/guidellm/benchmark/profiles.py @@ -29,6 +29,7 @@ from guidellm import settings from guidellm.scheduler import ( AsyncConstantStrategy, + AsyncIncrementalStrategy, AsyncPoissonStrategy, ConcurrentStrategy, Constraint, @@ -46,6 +47,7 @@ __all__ = [ "AsyncProfile", "ConcurrentProfile", + "IncrementalProfile", "Profile", "ProfileType", "SweepProfile", @@ -54,7 +56,7 @@ ] ProfileType = Annotated[ - Literal["synchronous", "concurrent", "throughput", "async", "sweep"], + Literal["synchronous", "concurrent", "throughput", "async", "sweep", "incremental"], "Profile type identifiers for polymorphic deserialization", ] @@ -712,3 +714,119 @@ def next_strategy( ) else: raise ValueError(f"Invalid strategy type: {self.strategy_type}") + + +@Profile.register("incremental") +class IncrementalProfile(ThroughputProfile): + """ + Incremental rate execution profile with incremental load over time. + + Schedules requests starting at a base rate and incrementally increasing + the rate by a factor over time until reaching an optional rate limit. + """ + + type_: Literal["incremental"] = "incremental" # type: ignore[assignment] + start_rate: PositiveFloat = Field( + description="Initial rate at which to schedule requests in requests per second", + ) + increment_factor: PositiveFloat = Field( + description="Factor by which to increase the rate over time", + ) + rate_limit: PositiveFloat | None = Field( + default=None, + description="Maximum rate cap after which load remains constant", + ) + initial_burst: bool = Field( + default=True, + description=( + "Whether to send initial burst of math.floor(start_rate) requests " + "to reach target rate" + ), + ) + + @classmethod + def resolve_args( + cls, + rate_type: str, + rate: list[float] | None, + random_seed: int, + increment_factor: float | None = None, + rate_limit: float | None = None, + **kwargs: Any, + ) -> dict[str, Any]: + """ + Resolve arguments for incremental profile construction. + + :param rate_type: Profile type identifier + :param rate: Start rate in requests per second + :param random_seed: Random seed (ignored) + :param increment_factor: Rate increase factor over time + :param rate_limit: Optional maximum rate cap + :param kwargs: Additional arguments passed through unchanged + :return: Resolved arguments dictionary + :raises ValueError: If required params missing or invalid + """ + _ = random_seed # unused + if rate_type != "incremental": + raise ValueError("Rate type must be 'incremental' for incremental profile") + + # For incremental profile, rate is used as start_rate + start_rate = rate[0] if isinstance(rate, list) and rate else rate + if start_rate is None: + raise ValueError( + "rate is required for incremental profile (used as start_rate)" + ) + + if increment_factor is None: + raise ValueError( + "increment_factor is required for incremental profile. " + "Pass it via --profile-kwargs '{\"increment_factor\": }'" + ) + + if start_rate <= 0: + raise ValueError("rate (start_rate) must be a positive number") + + if increment_factor <= 0: + raise ValueError("increment_factor must be a positive number") + + if rate_limit is not None and rate_limit <= 0: + raise ValueError("rate_limit must be a positive number") + + kwargs["start_rate"] = start_rate + kwargs["increment_factor"] = increment_factor + if rate_limit is not None: + kwargs["rate_limit"] = rate_limit + + return kwargs + + @property + def strategy_types(self) -> list[StrategyType]: + """ + :return: Single incremental strategy type + """ + return [self.type_] + + def next_strategy( + self, + prev_strategy: SchedulingStrategy | None, + prev_benchmark: Benchmark | None, + ) -> AsyncIncrementalStrategy | None: + """ + Generate incremental strategy or None if already completed. + + :param prev_strategy: Previously completed strategy (unused) + :param prev_benchmark: Benchmark results from previous execution (unused) + :return: AsyncIncrementalStrategy for first execution, None afterward + """ + _ = (prev_strategy, prev_benchmark) # unused + if len(self.completed_strategies) >= 1: + return None + + return AsyncIncrementalStrategy( + start_rate=self.start_rate, + increment_factor=self.increment_factor, + rate_limit=self.rate_limit, + initial_burst=self.initial_burst, + max_concurrency=self.max_concurrency, + startup_duration=self.startup_duration, + ) diff --git a/src/guidellm/benchmark/schemas/generative/entrypoints.py b/src/guidellm/benchmark/schemas/generative/entrypoints.py index a080daa03..d1fb63721 100644 --- a/src/guidellm/benchmark/schemas/generative/entrypoints.py +++ b/src/guidellm/benchmark/schemas/generative/entrypoints.py @@ -172,6 +172,9 @@ def get_default(cls: type[BenchmarkGenerativeTextArgs], field: str) -> Any: rate: list[float] | None = Field( default=None, description="Request rate(s) for rate-based scheduling" ) + profile_kwargs: dict[str, Any] | None = Field( + default=None, description="Additional profile-specific configuration arguments" + ) # Backend configuration backend: BackendType | Backend = Field( default="openai_http", description="Backend type or instance for execution" diff --git a/src/guidellm/scheduler/__init__.py b/src/guidellm/scheduler/__init__.py index c03410767..776a0743e 100644 --- a/src/guidellm/scheduler/__init__.py +++ b/src/guidellm/scheduler/__init__.py @@ -38,6 +38,7 @@ ) from .strategies import ( AsyncConstantStrategy, + AsyncIncrementalStrategy, AsyncPoissonStrategy, ConcurrentStrategy, SchedulingStrategy, @@ -51,6 +52,7 @@ __all__ = [ "AsyncConstantStrategy", + "AsyncIncrementalStrategy", "AsyncPoissonStrategy", "BackendInterface", "BackendT", diff --git a/src/guidellm/scheduler/strategies.py b/src/guidellm/scheduler/strategies.py index dfdd97f58..9aba9474e 100644 --- a/src/guidellm/scheduler/strategies.py +++ b/src/guidellm/scheduler/strategies.py @@ -16,6 +16,7 @@ from __future__ import annotations import asyncio +import math import random from abc import abstractmethod from multiprocessing import Lock, Value @@ -28,6 +29,7 @@ __all__ = [ "AsyncConstantStrategy", + "AsyncIncrementalStrategy", "AsyncPoissonStrategy", "ConcurrentStrategy", "SchedulingStrategy", @@ -39,7 +41,9 @@ StrategyType = Annotated[ - Literal["synchronous", "concurrent", "throughput", "constant", "poisson"], + Literal[ + "synchronous", "concurrent", "throughput", "constant", "poisson", "incremental" + ], "Valid strategy type identifiers for scheduling request patterns", ] @@ -627,3 +631,114 @@ def request_completed(self, request_info: RequestInfo): :param request_info: Completed request metadata (unused) """ _ = request_info # request_info unused for async poisson strategy + + +@SchedulingStrategy.register("incremental") +class AsyncIncrementalStrategy(ThroughputStrategy): + """ + Incremental rate scheduling with gradual load increase over time. + + Schedules requests starting at a base rate and incrementally increasing + the rate by a factor over time until reaching an optional rate limit. + Supports initial burst mode to quickly reach the target starting rate. + Useful for finding system saturation points or progressive load testing. + """ + + type_: Literal["incremental"] = "incremental" # type: ignore[assignment] + start_rate: float = Field( + description="Initial rate at which to schedule requests in requests/second", + gt=0, + ) + increment_factor: float = Field( + description="Factor by which to increase the rate over time", + gt=0, + ) + rate_limit: float | None = Field( + default=None, + description="Maximum rate cap after which load remains constant", + gt=0, + ) + initial_burst: bool = Field( + default=True, + description=( + "Whether to send initial burst of math.floor(start_rate) requests " + "to reach target rate" + ), + ) + + _process_offset: float | None = PrivateAttr(None) + _burst_sent: bool = PrivateAttr(False) + + def __str__(self) -> str: + """ + :return: String identifier with start rate and increment factor + """ + return f"incremental@{self.start_rate:.2f}+{self.increment_factor:.2f}" + + def init_processes_timings( + self, + worker_count: int, + max_concurrency: int, + startup_duration: float, + ): + """ + Initialize incremental-specific timing state. + + :param worker_count: Number of worker processes to coordinate + :param max_concurrency: Maximum number of concurrent requests allowed + :param startup_duration: Duration in seconds for request startup ramping + """ + super().init_processes_timings(worker_count, max_concurrency, startup_duration) + with self._processes_lock: + self._process_offset = None + + async def next_request_time(self, offset: int) -> float: + """ + Calculate next request time with incremental rate increase. + + Implements gradual rate increase: rate = start_rate + (increment_factor * elapsed_time) + Optionally sends initial burst and caps at rate_limit. + + :param offset: Unused for incremental strategy + :return: Next request time based on incremental rate calculation + """ + _ = offset # offset unused for incremental strategy + start_time = await self.get_processes_start_time() + + # Handle initial burst if enabled + if self.initial_burst and not self._burst_sent: + self._burst_sent = True + burst_count = math.floor(self.start_rate) + for _ in range(burst_count): + pass + if self._process_offset is None: + self._process_offset = start_time + + if self._process_offset is None: + self._process_offset = start_time + + current_time = time.time() + if current_time <= start_time: + return start_time + + # Calculate current rate based on elapsed time + elapsed_time = current_time - start_time + next_rate = self.start_rate + (self.increment_factor * elapsed_time) + + # Cap at rate limit if specified + if self.rate_limit and next_rate >= self.rate_limit: + increment = 1.0 / self.rate_limit + else: + increment = 1.0 / next_rate + + self._process_offset += increment + + return self._process_offset + + def request_completed(self, request_info: RequestInfo): + """ + Handle request completion (no-op for incremental strategy). + + :param request_info: Completed request metadata (unused) + """ + _ = request_info # request_info unused for async incremental strategy