Skip to content

Commit 49cad6a

Browse files
Implement incremental rate type
1 parent 2cbb7c9 commit 49cad6a

File tree

6 files changed

+274
-3
lines changed

6 files changed

+274
-3
lines changed

src/guidellm/__main__.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,28 @@ def benchmark():
162162
help=(
163163
"Benchmark rate(s) to test. Meaning depends on profile: "
164164
"sweep=number of benchmarks, concurrent=concurrent requests, "
165-
"async/constant/poisson=requests per second."
165+
"async/constant/poisson=requests per second. "
166+
"Not used for incremental profile."
166167
),
167168
)
169+
@click.option(
170+
"--start-rate",
171+
type=float,
172+
default=BenchmarkGenerativeTextArgs.get_default("start_rate"),
173+
help="Initial rate for incremental profile in requests per second.",
174+
)
175+
@click.option(
176+
"--increment-factor",
177+
type=float,
178+
default=BenchmarkGenerativeTextArgs.get_default("increment_factor"),
179+
help="Factor by which to increase rate over time for incremental profile.",
180+
)
181+
@click.option(
182+
"--rate-limit",
183+
type=int,
184+
default=BenchmarkGenerativeTextArgs.get_default("rate_limit"),
185+
help="Maximum rate cap for incremental profile.",
186+
)
168187
# Backend configuration
169188
@click.option(
170189
"--backend",

src/guidellm/benchmark/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from .profile import (
2222
AsyncProfile,
2323
ConcurrentProfile,
24+
IncrementalProfile,
2425
Profile,
2526
ProfileType,
2627
SweepProfile,
@@ -69,6 +70,7 @@
6970
"GenerativeMetrics",
7071
"GenerativeMetricsSummary",
7172
"GenerativeVideoMetricsSummary",
73+
"IncrementalProfile",
7274
"Profile",
7375
"ProfileType",
7476
"SchedulerDict",

src/guidellm/benchmark/profile.py

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from guidellm import settings
2828
from guidellm.scheduler import (
2929
AsyncConstantStrategy,
30+
AsyncIncrementalStrategy,
3031
AsyncPoissonStrategy,
3132
ConcurrentStrategy,
3233
Constraint,
@@ -45,14 +46,17 @@
4546
__all__ = [
4647
"AsyncProfile",
4748
"ConcurrentProfile",
49+
"IncrementalProfile",
4850
"Profile",
4951
"ProfileType",
5052
"SweepProfile",
5153
"SynchronousProfile",
5254
"ThroughputProfile",
5355
]
5456

55-
ProfileType = Literal["synchronous", "concurrent", "throughput", "async", "sweep"]
57+
ProfileType = Literal[
58+
"synchronous", "concurrent", "throughput", "async", "sweep", "incremental"
59+
]
5660

5761

5862
class Profile(
@@ -707,3 +711,120 @@ def next_strategy(
707711
)
708712
else:
709713
raise ValueError(f"Invalid strategy type: {self.strategy_type}")
714+
715+
716+
@Profile.register("incremental")
717+
class IncrementalProfile(ThroughputProfile):
718+
"""
719+
Incremental rate execution profile with incremental load over time.
720+
721+
Schedules requests starting at a base rate and incrementally increasing
722+
the rate by a factor over time until reaching an optional rate limit.
723+
"""
724+
725+
type_: Literal["incremental"] = "incremental" # type: ignore[assignment]
726+
start_rate: PositiveFloat = Field(
727+
description="Initial rate at which to schedule requests in requests per second",
728+
)
729+
increment_factor: PositiveFloat = Field(
730+
description="Factor by which to increase the rate over time",
731+
)
732+
rate_limit: PositiveInt | None = Field(
733+
default=None,
734+
description="Maximum rate cap after which load remains constant",
735+
)
736+
initial_burst: bool = Field(
737+
default=True,
738+
description=(
739+
"Whether to send initial burst of math.floor(start_rate) requests "
740+
"to reach target rate"
741+
),
742+
)
743+
744+
@classmethod
745+
def resolve_args(
746+
cls,
747+
rate_type: str,
748+
rate: list[float] | None,
749+
random_seed: int,
750+
start_rate: float | None = None,
751+
increment_factor: float | None = None,
752+
rate_limit: int | None = None,
753+
**kwargs: Any,
754+
) -> dict[str, Any]:
755+
"""
756+
Resolve arguments for incremental profile construction.
757+
758+
:param rate_type: Profile type identifier
759+
:param rate: Rate parameter (must be None for incremental)
760+
:param random_seed: Random seed (ignored)
761+
:param start_rate: Initial rate in requests per second
762+
:param increment_factor: Rate increase factor over time
763+
:param rate_limit: Optional maximum rate cap
764+
:param kwargs: Additional arguments passed through unchanged
765+
:return: Resolved arguments dictionary
766+
:raises ValueError: If rate is not None or required params missing
767+
"""
768+
_ = random_seed # unused
769+
if rate_type != "incremental":
770+
raise ValueError("Rate type must be 'incremental' for incremental profile")
771+
772+
if rate is not None:
773+
raise ValueError(
774+
"rate does not apply to incremental profile, it must be set to None "
775+
"or not set at all. Use start_rate and increment_factor instead."
776+
)
777+
778+
if start_rate is None:
779+
raise ValueError("start_rate is required for incremental profile")
780+
781+
if increment_factor is None:
782+
raise ValueError("increment_factor is required for incremental profile")
783+
784+
if start_rate <= 0:
785+
raise ValueError("start_rate must be a positive number")
786+
787+
if increment_factor <= 0:
788+
raise ValueError("increment_factor must be a positive number")
789+
790+
if rate_limit is not None and rate_limit <= 0:
791+
raise ValueError("rate_limit must be a positive integer")
792+
793+
kwargs["start_rate"] = start_rate
794+
kwargs["increment_factor"] = increment_factor
795+
if rate_limit is not None:
796+
kwargs["rate_limit"] = rate_limit
797+
798+
return kwargs
799+
800+
@property
801+
def strategy_types(self) -> list[StrategyType]:
802+
"""
803+
:return: Single incremental strategy type
804+
"""
805+
return [self.type_]
806+
807+
def next_strategy(
808+
self,
809+
prev_strategy: SchedulingStrategy | None,
810+
prev_benchmark: Benchmark | None,
811+
) -> AsyncIncrementalStrategy | None:
812+
"""
813+
Generate incremental strategy or None if already completed.
814+
815+
:param prev_strategy: Previously completed strategy (unused)
816+
:param prev_benchmark: Benchmark results from previous execution (unused)
817+
:return: AsyncIncrementalStrategy for first execution, None afterward
818+
"""
819+
_ = (prev_strategy, prev_benchmark) # unused
820+
if len(self.completed_strategies) >= 1:
821+
return None
822+
823+
return AsyncIncrementalStrategy(
824+
start_rate=self.start_rate,
825+
increment_factor=self.increment_factor,
826+
rate_limit=self.rate_limit,
827+
initial_burst=self.initial_burst,
828+
max_concurrency=self.max_concurrency,
829+
startup_duration=self.startup_duration,
830+
)

src/guidellm/benchmark/schemas.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1840,6 +1840,18 @@ def get_default(cls: type[BenchmarkGenerativeTextArgs], field: str) -> Any:
18401840
rate: float | list[float] | None = Field(
18411841
default=None, description="Request rate(s) for rate-based scheduling"
18421842
)
1843+
start_rate: float | None = Field(
1844+
default=None,
1845+
description="Initial rate for incremental profile in requests per second",
1846+
)
1847+
increment_factor: float | None = Field(
1848+
default=None,
1849+
description="Factor by which to increase rate over time for incremental profile",
1850+
)
1851+
rate_limit: int | None = Field(
1852+
default=None,
1853+
description="Maximum rate cap for incremental profile",
1854+
)
18431855
# Backend configuration
18441856
backend: BackendType | Backend = Field(
18451857
default="openai_http", description="Backend type or instance for execution"

src/guidellm/scheduler/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
)
3939
from .strategies import (
4040
AsyncConstantStrategy,
41+
AsyncIncrementalStrategy,
4142
AsyncPoissonStrategy,
4243
ConcurrentStrategy,
4344
SchedulingStrategy,
@@ -51,6 +52,7 @@
5152

5253
__all__ = [
5354
"AsyncConstantStrategy",
55+
"AsyncIncrementalStrategy",
5456
"AsyncPoissonStrategy",
5557
"BackendInterface",
5658
"BackendT",

src/guidellm/scheduler/strategies.py

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from __future__ import annotations
1313

1414
import asyncio
15+
import math
1516
import random
1617
import time
1718
from abc import abstractmethod
@@ -25,6 +26,7 @@
2526

2627
__all__ = [
2728
"AsyncConstantStrategy",
29+
"AsyncIncrementalStrategy",
2830
"AsyncPoissonStrategy",
2931
"ConcurrentStrategy",
3032
"SchedulingStrategy",
@@ -36,7 +38,9 @@
3638

3739

3840
StrategyType = Annotated[
39-
Literal["synchronous", "concurrent", "throughput", "constant", "poisson"],
41+
Literal[
42+
"synchronous", "concurrent", "throughput", "constant", "poisson", "incremental"
43+
],
4044
"Valid strategy type identifiers for scheduling request patterns",
4145
]
4246

@@ -517,3 +521,114 @@ def request_completed(self, request_info: RequestInfo):
517521
:param request_info: Completed request metadata (unused)
518522
"""
519523
_ = request_info # request_info unused for async poisson strategy
524+
525+
526+
@SchedulingStrategy.register("incremental")
527+
class AsyncIncrementalStrategy(ThroughputStrategy):
528+
"""
529+
Incremental rate scheduling with gradual load increase over time.
530+
531+
Schedules requests starting at a base rate and incrementally increasing
532+
the rate by a factor over time until reaching an optional rate limit.
533+
Supports initial burst mode to quickly reach the target starting rate.
534+
Useful for finding system saturation points or progressive load testing.
535+
"""
536+
537+
type_: Literal["incremental"] = "incremental" # type: ignore[assignment]
538+
start_rate: float = Field(
539+
description="Initial rate at which to schedule requests in requests/second",
540+
gt=0,
541+
)
542+
increment_factor: float = Field(
543+
description="Factor by which to increase the rate over time",
544+
gt=0,
545+
)
546+
rate_limit: int | None = Field(
547+
default=None,
548+
description="Maximum rate cap after which load remains constant",
549+
gt=0,
550+
)
551+
initial_burst: bool = Field(
552+
default=True,
553+
description=(
554+
"Whether to send initial burst of math.floor(start_rate) requests "
555+
"to reach target rate"
556+
),
557+
)
558+
559+
_process_offset: float | None = PrivateAttr(None)
560+
_burst_sent: bool = PrivateAttr(False)
561+
562+
def __str__(self) -> str:
563+
"""
564+
:return: String identifier with start rate and increment factor
565+
"""
566+
return f"incremental@{self.start_rate:.2f}+{self.increment_factor:.2f}"
567+
568+
def init_processes_timings(
569+
self,
570+
worker_count: int,
571+
max_concurrency: int,
572+
startup_duration: float,
573+
):
574+
"""
575+
Initialize incremental-specific timing state.
576+
577+
:param worker_count: Number of worker processes to coordinate
578+
:param max_concurrency: Maximum number of concurrent requests allowed
579+
:param startup_duration: Duration in seconds for request startup ramping
580+
"""
581+
super().init_processes_timings(worker_count, max_concurrency, startup_duration)
582+
with self._processes_lock:
583+
self._process_offset = None
584+
585+
async def next_request_time(self, offset: int) -> float:
586+
"""
587+
Calculate next request time with incremental rate increase.
588+
589+
Implements gradual rate increase: rate = start_rate + (increment_factor * elapsed_time)
590+
Optionally sends initial burst and caps at rate_limit.
591+
592+
:param offset: Unused for incremental strategy
593+
:return: Next request time based on incremental rate calculation
594+
"""
595+
_ = offset # offset unused for incremental strategy
596+
start_time = await self.get_processes_start_time()
597+
598+
# Handle initial burst if enabled
599+
if self.initial_burst and not self._burst_sent:
600+
self._burst_sent = True
601+
burst_count = math.floor(self.start_rate)
602+
for _ in range(burst_count):
603+
pass
604+
if self._process_offset is None:
605+
self._process_offset = start_time
606+
607+
if self._process_offset is None:
608+
self._process_offset = start_time
609+
610+
current_time = time.time()
611+
if current_time <= start_time:
612+
return start_time
613+
614+
# Calculate current rate based on elapsed time
615+
elapsed_time = current_time - start_time
616+
next_rate = self.start_rate + (self.increment_factor * elapsed_time)
617+
618+
# Cap at rate limit if specified
619+
if self.rate_limit and next_rate >= self.rate_limit:
620+
increment = 1.0 / self.rate_limit
621+
else:
622+
increment = 1.0 / next_rate
623+
624+
self._process_offset += increment
625+
626+
return self._process_offset
627+
628+
def request_completed(self, request_info: RequestInfo):
629+
"""
630+
Handle request completion (no-op for incremental strategy).
631+
632+
:param request_info: Completed request metadata (unused)
633+
"""
634+
_ = request_info # request_info unused for async incremental strategy

0 commit comments

Comments
 (0)