Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/guidellm/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def benchmark():
default=BenchmarkGenerativeTextArgs.get_default("rampup"),
help=(
"The time, in seconds, to ramp up the request rate over. "
"Only applicable for Throughput/Concurrent strategies"
"Applicable for Throughput, Concurrent, and Constant strategies"
),
)
@click.option(
Expand Down
4 changes: 3 additions & 1 deletion src/guidellm/benchmark/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,9 @@ def next_strategy(

if self.strategy_type == "constant":
return AsyncConstantStrategy(
rate=current_rate, max_concurrency=self.max_concurrency
rate=current_rate,
max_concurrency=self.max_concurrency,
rampup_duration=self.rampup_duration,
)
elif self.strategy_type == "poisson":
return AsyncPoissonStrategy(
Expand Down
44 changes: 40 additions & 4 deletions src/guidellm/scheduler/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from __future__ import annotations

import asyncio
import math
import random
from abc import abstractmethod
from multiprocessing import Event, Value, synchronize
Expand Down Expand Up @@ -453,6 +454,13 @@ class AsyncConstantStrategy(SchedulingStrategy):
default=None,
description="Maximum number of concurrent requests to schedule",
)
rampup_duration: NonNegativeFloat = Field(
default=0.0,
description=(
"Duration in seconds to linearly ramp up from 0 to target rate "
"at the beginning of each strategy run"
),
)

def __str__(self) -> str:
"""
Expand All @@ -476,19 +484,47 @@ def requests_limit(self) -> PositiveInt | None:

async def next_request_time(self, worker_index: PositiveInt) -> float:
"""
Calculate next request time at fixed intervals.
Calculate next request time at fixed intervals with optional linear rampup.

Schedules requests at uniform intervals determined by the configured rate,
independent of request completion times.
independent of request completion times. If rampup_duration is set, the rate
increases linearly from 0 to the target rate during the rampup period, then
continues at the constant rate.

:param worker_index: Unused for constant strategy
:return: Start time plus constant interval based on request index
:return: Start time plus interval based on request index and
rampup configuration
"""
_ = worker_index # unused
current_index = self.next_request_index()
start_time = await self.get_processes_start_time()

return start_time + current_index / self.rate
if self.rampup_duration > 0:
# Calculate number of requests that would be sent during rampup
# Cumulative requests by time t during rampup:
# n = rate * t² / (2 * rampup_duration)
# At end of rampup (t = rampup_duration), n_rampup is calculated below
n_rampup = self.rate * self.rampup_duration / 2.0

if current_index == 1:
# First request at start_time
return start_time
elif current_index <= n_rampup:
# During rampup: solve for t where
# n = rate * t² / (2 * rampup_duration)
time_offset = math.sqrt(
2.0 * current_index * self.rampup_duration / self.rate
)
return start_time + time_offset
else:
# After rampup: continue at constant rate
time_offset = (
self.rampup_duration + (current_index - n_rampup) / self.rate
)
return start_time + time_offset
else:
# No rampup: uniform intervals
return start_time + current_index / self.rate

def request_completed(self, request_info: RequestInfo):
"""
Expand Down
177 changes: 177 additions & 0 deletions tests/unit/scheduler/test_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ class TestAsyncConstantStrategy:
{"rate": 1.0},
{"rate": 5.0},
{"rate": 10.3, "max_concurrency": 8},
{"rate": 2.0, "rampup_duration": 1.0},
{"rate": 10.0, "rampup_duration": 2.0, "max_concurrency": 5},
]
)
def valid_instances(self, request):
Expand All @@ -412,6 +414,7 @@ def test_initialization(self, valid_instances: tuple[AsyncConstantStrategy, dict
[
("rate", 0),
("rate", -1.0),
("rampup_duration", -1.0),
],
)
def test_invalid_initialization(self, field, value):
Expand Down Expand Up @@ -473,6 +476,180 @@ def test_marshalling(self, valid_instances: tuple[AsyncConstantStrategy, dict]):
for key, value in constructor_args.items():
assert getattr(base_json_reconstructed, key) == value

@pytest.mark.smoke
def test_rampup_duration_default(self):
"""Test that rampup_duration defaults to 0.0.

### WRITTEN BY AI ###
"""
instance = AsyncConstantStrategy(rate=1.0)
assert instance.rampup_duration == 0.0

@pytest.mark.smoke
def test_rampup_duration_initialization(self):
"""Test that rampup_duration can be set.

### WRITTEN BY AI ###
"""
instance = AsyncConstantStrategy(rate=10.0, rampup_duration=2.0)
assert instance.rampup_duration == 2.0

@pytest.mark.smoke
@pytest.mark.asyncio
async def test_timing_without_rampup(self):
"""Test timing without rampup matches existing behavior.

### WRITTEN BY AI ###
"""
strategy = AsyncConstantStrategy(rate=10.0, rampup_duration=0.0)
strategy.init_processes_timings(worker_count=1, max_concurrency=100)
start_time = 1000.0
strategy.init_processes_start(start_time)

# Test multiple request indices
# Each call to next_request_time increments the index automatically
for expected_index in range(1, 11):
time = await strategy.next_request_time(0)
expected_time = start_time + expected_index / 10.0
assert time == pytest.approx(expected_time, rel=1e-10), (
f"Request {expected_index}: expected {expected_time}, got {time}"
)

@pytest.mark.smoke
@pytest.mark.asyncio
async def test_timing_with_rampup(self):
"""Test timing with rampup follows quadratic then linear pattern.

### WRITTEN BY AI ###
"""
rate = 10.0
rampup_duration = 2.0
strategy = AsyncConstantStrategy(rate=rate, rampup_duration=rampup_duration)
strategy.init_processes_timings(worker_count=1, max_concurrency=100)
start_time = 1000.0
strategy.init_processes_start(start_time)

# Calculate number of requests during rampup
n_rampup = rate * rampup_duration / 2.0 # Should be 10

# Test first request (index 1) - should be at start_time
time1 = await strategy.next_request_time(0)
assert time1 == pytest.approx(start_time, abs=1e-6), (
f"First request should be at start_time, got {time1}"
)

# Test requests during rampup (indices 2-10)
# For index n during rampup: t = sqrt(2 * n * rampup_duration / rate)
# Each call increments the index automatically
for n in range(2, int(n_rampup) + 1):
time_n = await strategy.next_request_time(0)
expected_time = start_time + math.sqrt(2.0 * n * rampup_duration / rate)
assert time_n == pytest.approx(expected_time, rel=1e-6), (
f"Request {n} during rampup: expected {expected_time}, got {time_n}"
)

# Test request right after rampup (index 11)
# Should be at: rampup_duration + (11 - n_rampup) / rate
time_after = await strategy.next_request_time(0)
expected_after = start_time + rampup_duration + (11 - n_rampup) / rate
assert time_after == pytest.approx(expected_after, rel=1e-6), (
f"Request 11 after rampup: expected {expected_after}, got {time_after}"
)

# Test a few more requests after rampup to verify constant rate
for i in range(12, 15):
time_i = await strategy.next_request_time(0)
expected_i = start_time + rampup_duration + (i - n_rampup) / rate
assert time_i == pytest.approx(expected_i, rel=1e-6), (
f"Request {i} after rampup: expected {expected_i}, got {time_i}"
)

@pytest.mark.sanity
@pytest.mark.asyncio
async def test_timing_with_rampup_edge_cases(self):
"""Test edge cases for rampup timing.

### WRITTEN BY AI ###
"""

# Test with very short rampup_duration
strategy = AsyncConstantStrategy(rate=100.0, rampup_duration=0.01)
strategy.init_processes_timings(worker_count=1, max_concurrency=100)
start_time = 2000.0
strategy.init_processes_start(start_time)

# First request
time1 = await strategy.next_request_time(0)
assert time1 == pytest.approx(start_time, abs=1e-6)

# Test with very long rampup_duration
strategy2 = AsyncConstantStrategy(rate=1.0, rampup_duration=100.0)
strategy2.init_processes_timings(worker_count=1, max_concurrency=100)
start_time2 = 3000.0
strategy2.init_processes_start(start_time2)

# First request
time1_2 = await strategy2.next_request_time(0)
assert time1_2 == pytest.approx(start_time2, abs=1e-6)

# Request at end of rampup
# We need to advance to request index 50 (n_rampup = 1.0 * 100.0 / 2.0)
# Already at index 1, need 49 more calls to reach index 50
time_end_rampup = None
for _ in range(49): # 49 calls to go from index 2 to index 50
time_end_rampup = await strategy2.next_request_time(0)
expected_end = start_time2 + 100.0
assert time_end_rampup == pytest.approx(expected_end, rel=1e-6), (
f"End of rampup: expected {expected_end}, got {time_end_rampup}"
)

@pytest.mark.sanity
@pytest.mark.asyncio
async def test_timing_rampup_transition(self):
"""Test smooth transition from rampup to constant rate.

### WRITTEN BY AI ###
"""
rate = 10.0
rampup_duration = 2.0
strategy = AsyncConstantStrategy(rate=rate, rampup_duration=rampup_duration)
strategy.init_processes_timings(worker_count=1, max_concurrency=100)
start_time = 5000.0
strategy.init_processes_start(start_time)

n_rampup = rate * rampup_duration / 2.0 # 10

# Get to the last request of rampup (index 10)
for _ in range(9): # Already at index 1, need 9 more to reach 10
await strategy.next_request_time(0)

time_last_rampup = await strategy.next_request_time(0)
expected_last_rampup = start_time + math.sqrt(
2.0 * 10 * rampup_duration / rate
)
assert time_last_rampup == pytest.approx(
expected_last_rampup, rel=1e-6
), (
f"Last rampup request: expected {expected_last_rampup}, "
f"got {time_last_rampup}"
)

# First request after rampup (index 11)
time_first_after = await strategy.next_request_time(0)
expected_first_after = start_time + rampup_duration + (11 - n_rampup) / rate
assert time_first_after == pytest.approx(
expected_first_after, rel=1e-6
), (
f"First after rampup: expected {expected_first_after}, "
f"got {time_first_after}"
)

# Verify the transition is smooth (no gap)
# The last rampup request should be at rampup_duration
assert time_last_rampup == pytest.approx(
start_time + rampup_duration, rel=1e-6
), "Last rampup should be at end of rampup period"


class TestAsyncPoissonStrategy:
@pytest.fixture(
Expand Down