Skip to content

Commit 4217758

Browse files
author
Dmytro Parfeniuk
committed
--rate-type concurrent CLI is implemented
use ``--rate`` CLI parameter to specify concurrent workers number
1 parent ecf2984 commit 4217758

File tree

8 files changed

+115
-36
lines changed

8 files changed

+115
-36
lines changed

src/guidellm/executor/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class Executor:
5353
:type backend: Backend
5454
:param request_generator: The generator that creates requests for execution.
5555
:type request_generator: RequestGenerator
56-
:param mode: The mode for profile generation (e.g., sweep, synchronous).
56+
:param mode: The mode for profile generation (e.g., sweep, synchronous, concurrent).
5757
:type mode: ProfileGenerationMode
5858
:param rate: The list of rates for load generation, or None.
5959
:type rate: Optional[List[float]]

src/guidellm/executor/profile_generator.py

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
]
1818

1919
ProfileGenerationMode = Literal[
20-
"sweep", "synchronous", "throughput", "constant", "poisson"
20+
"sweep", "synchronous", "throughput", "constant", "poisson", "concurrent"
2121
]
2222

2323

@@ -34,7 +34,7 @@ class Profile(Serializable):
3434
"""
3535

3636
load_gen_mode: LoadGenerationMode
37-
load_gen_rate: Optional[float] = None
37+
load_gen_rate: Optional[Union[float, int]] = None
3838
args: Dict[str, Any] = Field(default_factory=dict)
3939

4040

@@ -45,6 +45,7 @@ class ProfileGenerator:
4545
:param mode: The mode for profile generation (e.g., sweep, synchronous).
4646
:type mode: ProfileGenerationMode
4747
:param rate: The rate(s) for load generation; could be a float or list of floats.
48+
In case ``mode`` is concurrent - integer which is the number of streams.
4849
:type rate: Optional[Union[float, Sequence[float]]]
4950
"""
5051

@@ -61,7 +62,7 @@ def __init__(
6162
logger.error(err)
6263
raise err
6364

64-
self._mode = mode
65+
self._mode: ProfileGenerationMode = mode
6566

6667
if self._mode in ("sweep", "throughput", "synchronous"):
6768
if rate is not None:
@@ -74,6 +75,7 @@ def __init__(
7475
err = ValueError(f"Rates are required for {self._mode} mode")
7576
logger.error(err)
7677
raise err
78+
7779
self._rates = rate if isinstance(rate, Sequence) else [rate]
7880

7981
for rt in self._rates:
@@ -96,13 +98,13 @@ def __len__(self) -> int:
9698
if self._mode == "sweep":
9799
return settings.num_sweep_profiles + 2
98100

99-
if self._mode in ("throughput", "synchronous"):
101+
if self._mode in ("throughput", "synchronous", "concurrent"):
100102
return 1
101103

102-
if not self._rates:
104+
if not self.rates:
103105
raise ValueError(f"Rates are required for {self._mode} mode")
104106

105-
return len(self._rates)
107+
return len(self.rates)
106108

107109
@property
108110
def mode(self) -> ProfileGenerationMode:
@@ -147,7 +149,7 @@ def profile_generation_modes(self) -> Sequence[ProfileGenerationMode]:
147149
settings.num_sweep_profiles
148150
)
149151

150-
if self._mode in ["throughput", "synchronous"]:
152+
if self._mode in ["throughput", "synchronous", "concurrent"]:
151153
return [self._mode]
152154

153155
if self._rates is None:
@@ -188,6 +190,19 @@ def next(self, current_report: TextGenerationBenchmarkReport) -> Optional[Profil
188190
profile = self.create_synchronous_profile(self.generated_count)
189191
elif self.mode == "throughput":
190192
profile = self.create_throughput_profile(self.generated_count)
193+
elif self.mode == "concurrent":
194+
err = ValueError(
195+
f"Can not create concurrent profile with rate {self.rates}"
196+
)
197+
try:
198+
if not self.rates:
199+
raise err
200+
201+
_rate: int = int(self.rates[0])
202+
except IndexError as error:
203+
logger.error(err)
204+
raise err from error
205+
profile = self.create_concurrent_profile(self.generated_count, _rate)
191206
elif self.mode == "sweep":
192207
profile = self.create_sweep_profile(
193208
self.generated_count,
@@ -211,6 +226,7 @@ def next(self, current_report: TextGenerationBenchmarkReport) -> Optional[Profil
211226
profile,
212227
self._generated_count,
213228
)
229+
214230
return profile
215231

216232
@staticmethod
@@ -229,9 +245,11 @@ def create_fixed_rate_profile(
229245
:return: The generated profile or None if index is out of range.
230246
:rtype: Optional[Profile]
231247
"""
248+
232249
modes_map: Dict[str, LoadGenerationMode] = {
233250
"constant": "constant",
234251
"poisson": "poisson",
252+
"concurrent": "concurrent",
235253
}
236254

237255
if mode not in modes_map:
@@ -348,3 +366,26 @@ def create_sweep_profile(
348366
else 1.0 # the fallback value
349367
),
350368
)
369+
370+
@staticmethod
371+
def create_concurrent_profile(index: int, rate: int) -> Optional[Profile]:
372+
"""
373+
Creates a profile with concurrent constant mode.
374+
375+
:param index: The index of the profile to create.
376+
:type index: int
377+
:return: The generated profile or None if index is out of range.
378+
:rtype: Optional[Profile]
379+
"""
380+
381+
profile = (
382+
Profile(
383+
load_gen_mode="concurrent",
384+
load_gen_rate=rate,
385+
)
386+
if index < 1
387+
else None
388+
)
389+
logger.debug("Created concurrent profile: {}", profile)
390+
391+
return profile

src/guidellm/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ def generate_benchmark_report(
264264
backend=backend_inst,
265265
request_generator=request_generator,
266266
mode=rate_type,
267-
rate=rate if rate_type in ("constant", "poisson") else None,
267+
rate=rate if rate_type in ("constant", "poisson", "concurrent") else None,
268268
max_number=(
269269
len(request_generator) if max_requests == "dataset" else max_requests
270270
),

src/guidellm/scheduler/base.py

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import math
33
import time
44
from dataclasses import dataclass
5-
from typing import AsyncGenerator, Literal, Optional, Union, get_args
5+
from typing import AsyncGenerator, Literal, Optional, Union, get_args, Generator
66

77
from loguru import logger
88

@@ -109,17 +109,17 @@ def __init__(
109109
logger.error(err)
110110
raise err
111111

112-
if mode in ["constant", "poisson"] and not rate:
112+
if mode in ["constant", "poisson", "concurrent"] and not rate:
113113
err = ValueError(f"Rate must be > 0 for mode: {mode}. Given: {rate}")
114114
logger.error(err)
115115
raise err
116116

117117
self._generator = generator
118118
self._worker = worker
119-
self._mode = mode
120-
self._rate = rate
121-
self._max_number = max_number
122-
self._max_duration = max_duration
119+
self._mode: LoadGenerationMode = mode
120+
self._rate: Optional[float] = rate
121+
self._max_number: Optional[int] = max_number
122+
self._max_duration: Optional[float] = max_duration
123123

124124
self._load_generator = LoadGenerator(mode, rate)
125125

@@ -227,9 +227,7 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]:
227227
count_total = (
228228
self.max_number
229229
if self.max_number
230-
else round(self.max_duration)
231-
if self.max_duration
232-
else 0
230+
else round(self.max_duration) if self.max_duration else 0
233231
)
234232

235233
# yield initial result for progress tracking
@@ -246,9 +244,7 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]:
246244
count_completed = (
247245
min(run_count, self.max_number)
248246
if self.max_number
249-
else round(time.time() - start_time)
250-
if self.max_duration
251-
else 0
247+
else round(time.time() - start_time) if self.max_duration else 0
252248
)
253249

254250
yield SchedulerResult(
@@ -267,9 +263,7 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]:
267263
count_completed=(
268264
benchmark.request_count + benchmark.error_count
269265
if self.max_number
270-
else round(time.time() - start_time)
271-
if self.max_duration
272-
else 0
266+
else round(time.time() - start_time) if self.max_duration else 0
273267
),
274268
benchmark=benchmark,
275269
)
@@ -311,9 +305,7 @@ async def _run_async(
311305
break
312306

313307
logger.debug(
314-
"Running asynchronous request={} at submit_at={}",
315-
request,
316-
submit_at,
308+
"Running asynchronous request={} at submit_at={}", request, submit_at
317309
)
318310

319311
def _completed(_task: asyncio.Task) -> None:
@@ -326,14 +318,32 @@ def _completed(_task: asyncio.Task) -> None:
326318
logger.debug("Request completed: {}", _res)
327319

328320
benchmark.request_started()
329-
task = asyncio.create_task(
330-
self._submit_task_coroutine(request, submit_at, end_time)
331-
)
332-
task.add_done_callback(_completed)
333-
tasks.append(task)
321+
322+
if self.mode == "concurrent":
323+
if self.rate is None:
324+
raise ValueError(
325+
"Can not use concurrent mode with no rate specified"
326+
)
327+
328+
_tasks: Generator[asyncio.Task, None, None] = (
329+
asyncio.create_task(
330+
self._submit_task_coroutine(request, submit_at, end_time)
331+
)
332+
for _ in range(int(self.rate))
333+
)
334+
335+
for task in _tasks:
336+
task.add_done_callback(_completed)
337+
tasks.append(task)
338+
else:
339+
task = asyncio.create_task(
340+
self._submit_task_coroutine(request, submit_at, end_time)
341+
)
342+
task.add_done_callback(_completed)
343+
tasks.append(task)
334344

335345
# release control to the event loop for other tasks
336-
await asyncio.sleep(0.001)
346+
await asyncio.sleep(0)
337347

338348
for compl_task in asyncio.as_completed(tasks):
339349
task_res = await compl_task

src/guidellm/scheduler/load_generator.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
__all__ = ["LoadGenerationMode", "LoadGenerator"]
88

9-
LoadGenerationMode = Literal["synchronous", "constant", "poisson", "throughput"]
9+
LoadGenerationMode = Literal[
10+
"synchronous", "constant", "poisson", "throughput", "concurrent"
11+
]
1012

1113

1214
class LoadGenerator:
@@ -52,8 +54,9 @@ def __init__(self, mode: LoadGenerationMode, rate: Optional[float] = None):
5254
logger.error(error)
5355
raise error
5456

55-
self._mode = mode
56-
self._rate = rate
57+
self._mode: LoadGenerationMode = mode
58+
self._rate: Optional[float] = rate
59+
5760
logger.debug(
5861
"Initialized LoadGenerator with mode: {mode}, rate: {rate}",
5962
mode=mode,
@@ -100,6 +103,8 @@ def times(self) -> Generator[float, None, None]:
100103
yield from self.poisson_times()
101104
elif self._mode == "synchronous":
102105
yield from self.synchronous_times()
106+
elif self._mode == "concurrent":
107+
yield from self.throughput_times()
103108
else:
104109
logger.error(f"Invalid mode encountered: {self._mode}")
105110
raise ValueError(f"Invalid mode: {self._mode}")

tests/unit/executor/test_profile_generator.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def test_profile_generator_mode():
1919
"throughput",
2020
"constant",
2121
"poisson",
22+
"concurrent",
2223
}
2324

2425

@@ -41,6 +42,7 @@ def test_profile_instantiation():
4142
("constant", [10, 20, 30]),
4243
("poisson", 10),
4344
("poisson", [10, 20, 30]),
45+
("concurrent", 2),
4446
],
4547
)
4648
def test_profile_generator_instantiation(mode, rate):
@@ -93,6 +95,8 @@ def test_profile_generator_instantiation(mode, rate):
9395
("poisson", None),
9496
("poisson", -1),
9597
("poisson", 0),
98+
("concurrent", 0),
99+
("concurrent", -1),
96100
],
97101
)
98102
def test_profile_generator_invalid_instantiation(mode, rate):
@@ -158,6 +162,20 @@ def test_profile_generator_next_throughput():
158162
assert generator.next(current_report) is None
159163

160164

165+
@pytest.mark.sanity()
166+
def test_profile_generator_next_concurrent():
167+
generator = ProfileGenerator(mode="concurrent", rate=2.0)
168+
current_report = TextGenerationBenchmarkReport()
169+
170+
profile: Profile = generator.next(current_report) # type: ignore
171+
assert profile.load_gen_mode == "concurrent"
172+
assert profile.load_gen_rate == 2
173+
assert generator.generated_count == 1
174+
175+
for _ in range(3):
176+
assert generator.next(current_report) is None
177+
178+
161179
@pytest.mark.sanity()
162180
@pytest.mark.parametrize(
163181
"rate",

tests/unit/scheduler/test_load_generator.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ def test_load_generator_mode():
1414
"constant",
1515
"poisson",
1616
"throughput",
17+
"concurrent",
1718
}
1819

1920

@@ -25,6 +26,7 @@ def test_load_generator_mode():
2526
("poisson", 5),
2627
("throughput", None),
2728
("synchronous", None),
29+
("concurrent", 3),
2830
],
2931
)
3032
def test_load_generator_instantiation(mode, rate):
@@ -40,6 +42,8 @@ def test_load_generator_instantiation(mode, rate):
4042
("invalid_mode", None, ValueError),
4143
("constant", 0, ValueError),
4244
("poisson", -1, ValueError),
45+
("concurrent", -1, ValueError),
46+
("concurrent", 0, ValueError),
4347
],
4448
)
4549
def test_load_generator_invalid_instantiation(mode, rate, expected_error):

tox.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ env_list = py38,py39,py310,py311,py312
55

66
[testenv]
77
description = Run all tests
8+
usedevelop = true
89
deps =
910
.[dev]
1011
commands =

0 commit comments

Comments
 (0)