From 4217758026c9d1070e2d5aba8b87cfe9f62d058f Mon Sep 17 00:00:00 2001 From: Dmytro Parfeniuk Date: Thu, 27 Feb 2025 13:50:41 +0200 Subject: [PATCH] ``--rate-type concurrent`` CLI is implemented use ``--rate`` CLI parameter to specify concurrent workers number --- src/guidellm/executor/base.py | 2 +- src/guidellm/executor/profile_generator.py | 55 +++++++++++++++--- src/guidellm/main.py | 2 +- src/guidellm/scheduler/base.py | 58 +++++++++++-------- src/guidellm/scheduler/load_generator.py | 11 +++- tests/unit/executor/test_profile_generator.py | 18 ++++++ tests/unit/scheduler/test_load_generator.py | 4 ++ tox.ini | 1 + 8 files changed, 115 insertions(+), 36 deletions(-) diff --git a/src/guidellm/executor/base.py b/src/guidellm/executor/base.py index 865ab30d..a1224a81 100644 --- a/src/guidellm/executor/base.py +++ b/src/guidellm/executor/base.py @@ -53,7 +53,7 @@ class Executor: :type backend: Backend :param request_generator: The generator that creates requests for execution. :type request_generator: RequestGenerator - :param mode: The mode for profile generation (e.g., sweep, synchronous). + :param mode: The mode for profile generation (e.g., sweep, synchronous, concurrent). :type mode: ProfileGenerationMode :param rate: The list of rates for load generation, or None. :type rate: Optional[List[float]] diff --git a/src/guidellm/executor/profile_generator.py b/src/guidellm/executor/profile_generator.py index 757646cf..5d43588a 100644 --- a/src/guidellm/executor/profile_generator.py +++ b/src/guidellm/executor/profile_generator.py @@ -17,7 +17,7 @@ ] ProfileGenerationMode = Literal[ - "sweep", "synchronous", "throughput", "constant", "poisson" + "sweep", "synchronous", "throughput", "constant", "poisson", "concurrent" ] @@ -34,7 +34,7 @@ class Profile(Serializable): """ load_gen_mode: LoadGenerationMode - load_gen_rate: Optional[float] = None + load_gen_rate: Optional[Union[float, int]] = None args: Dict[str, Any] = Field(default_factory=dict) @@ -45,6 +45,7 @@ class ProfileGenerator: :param mode: The mode for profile generation (e.g., sweep, synchronous). :type mode: ProfileGenerationMode :param rate: The rate(s) for load generation; could be a float or list of floats. + In case ``mode`` is concurrent - integer which is the number of streams. :type rate: Optional[Union[float, Sequence[float]]] """ @@ -61,7 +62,7 @@ def __init__( logger.error(err) raise err - self._mode = mode + self._mode: ProfileGenerationMode = mode if self._mode in ("sweep", "throughput", "synchronous"): if rate is not None: @@ -74,6 +75,7 @@ def __init__( err = ValueError(f"Rates are required for {self._mode} mode") logger.error(err) raise err + self._rates = rate if isinstance(rate, Sequence) else [rate] for rt in self._rates: @@ -96,13 +98,13 @@ def __len__(self) -> int: if self._mode == "sweep": return settings.num_sweep_profiles + 2 - if self._mode in ("throughput", "synchronous"): + if self._mode in ("throughput", "synchronous", "concurrent"): return 1 - if not self._rates: + if not self.rates: raise ValueError(f"Rates are required for {self._mode} mode") - return len(self._rates) + return len(self.rates) @property def mode(self) -> ProfileGenerationMode: @@ -147,7 +149,7 @@ def profile_generation_modes(self) -> Sequence[ProfileGenerationMode]: settings.num_sweep_profiles ) - if self._mode in ["throughput", "synchronous"]: + if self._mode in ["throughput", "synchronous", "concurrent"]: return [self._mode] if self._rates is None: @@ -188,6 +190,19 @@ def next(self, current_report: TextGenerationBenchmarkReport) -> Optional[Profil profile = self.create_synchronous_profile(self.generated_count) elif self.mode == "throughput": profile = self.create_throughput_profile(self.generated_count) + elif self.mode == "concurrent": + err = ValueError( + f"Can not create concurrent profile with rate {self.rates}" + ) + try: + if not self.rates: + raise err + + _rate: int = int(self.rates[0]) + except IndexError as error: + logger.error(err) + raise err from error + profile = self.create_concurrent_profile(self.generated_count, _rate) elif self.mode == "sweep": profile = self.create_sweep_profile( self.generated_count, @@ -211,6 +226,7 @@ def next(self, current_report: TextGenerationBenchmarkReport) -> Optional[Profil profile, self._generated_count, ) + return profile @staticmethod @@ -229,9 +245,11 @@ def create_fixed_rate_profile( :return: The generated profile or None if index is out of range. :rtype: Optional[Profile] """ + modes_map: Dict[str, LoadGenerationMode] = { "constant": "constant", "poisson": "poisson", + "concurrent": "concurrent", } if mode not in modes_map: @@ -348,3 +366,26 @@ def create_sweep_profile( else 1.0 # the fallback value ), ) + + @staticmethod + def create_concurrent_profile(index: int, rate: int) -> Optional[Profile]: + """ + Creates a profile with concurrent constant mode. + + :param index: The index of the profile to create. + :type index: int + :return: The generated profile or None if index is out of range. + :rtype: Optional[Profile] + """ + + profile = ( + Profile( + load_gen_mode="concurrent", + load_gen_rate=rate, + ) + if index < 1 + else None + ) + logger.debug("Created concurrent profile: {}", profile) + + return profile diff --git a/src/guidellm/main.py b/src/guidellm/main.py index 4016ecec..ae8231b4 100644 --- a/src/guidellm/main.py +++ b/src/guidellm/main.py @@ -264,7 +264,7 @@ def generate_benchmark_report( backend=backend_inst, request_generator=request_generator, mode=rate_type, - rate=rate if rate_type in ("constant", "poisson") else None, + rate=rate if rate_type in ("constant", "poisson", "concurrent") else None, max_number=( len(request_generator) if max_requests == "dataset" else max_requests ), diff --git a/src/guidellm/scheduler/base.py b/src/guidellm/scheduler/base.py index 602166b0..ac14784b 100644 --- a/src/guidellm/scheduler/base.py +++ b/src/guidellm/scheduler/base.py @@ -2,7 +2,7 @@ import math import time from dataclasses import dataclass -from typing import AsyncGenerator, Literal, Optional, Union, get_args +from typing import AsyncGenerator, Literal, Optional, Union, get_args, Generator from loguru import logger @@ -109,17 +109,17 @@ def __init__( logger.error(err) raise err - if mode in ["constant", "poisson"] and not rate: + if mode in ["constant", "poisson", "concurrent"] and not rate: err = ValueError(f"Rate must be > 0 for mode: {mode}. Given: {rate}") logger.error(err) raise err self._generator = generator self._worker = worker - self._mode = mode - self._rate = rate - self._max_number = max_number - self._max_duration = max_duration + self._mode: LoadGenerationMode = mode + self._rate: Optional[float] = rate + self._max_number: Optional[int] = max_number + self._max_duration: Optional[float] = max_duration self._load_generator = LoadGenerator(mode, rate) @@ -227,9 +227,7 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]: count_total = ( self.max_number if self.max_number - else round(self.max_duration) - if self.max_duration - else 0 + else round(self.max_duration) if self.max_duration else 0 ) # yield initial result for progress tracking @@ -246,9 +244,7 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]: count_completed = ( min(run_count, self.max_number) if self.max_number - else round(time.time() - start_time) - if self.max_duration - else 0 + else round(time.time() - start_time) if self.max_duration else 0 ) yield SchedulerResult( @@ -267,9 +263,7 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]: count_completed=( benchmark.request_count + benchmark.error_count if self.max_number - else round(time.time() - start_time) - if self.max_duration - else 0 + else round(time.time() - start_time) if self.max_duration else 0 ), benchmark=benchmark, ) @@ -311,9 +305,7 @@ async def _run_async( break logger.debug( - "Running asynchronous request={} at submit_at={}", - request, - submit_at, + "Running asynchronous request={} at submit_at={}", request, submit_at ) def _completed(_task: asyncio.Task) -> None: @@ -326,14 +318,32 @@ def _completed(_task: asyncio.Task) -> None: logger.debug("Request completed: {}", _res) benchmark.request_started() - task = asyncio.create_task( - self._submit_task_coroutine(request, submit_at, end_time) - ) - task.add_done_callback(_completed) - tasks.append(task) + + if self.mode == "concurrent": + if self.rate is None: + raise ValueError( + "Can not use concurrent mode with no rate specified" + ) + + _tasks: Generator[asyncio.Task, None, None] = ( + asyncio.create_task( + self._submit_task_coroutine(request, submit_at, end_time) + ) + for _ in range(int(self.rate)) + ) + + for task in _tasks: + task.add_done_callback(_completed) + tasks.append(task) + else: + task = asyncio.create_task( + self._submit_task_coroutine(request, submit_at, end_time) + ) + task.add_done_callback(_completed) + tasks.append(task) # release control to the event loop for other tasks - await asyncio.sleep(0.001) + await asyncio.sleep(0) for compl_task in asyncio.as_completed(tasks): task_res = await compl_task diff --git a/src/guidellm/scheduler/load_generator.py b/src/guidellm/scheduler/load_generator.py index f629752a..e7577c00 100644 --- a/src/guidellm/scheduler/load_generator.py +++ b/src/guidellm/scheduler/load_generator.py @@ -6,7 +6,9 @@ __all__ = ["LoadGenerationMode", "LoadGenerator"] -LoadGenerationMode = Literal["synchronous", "constant", "poisson", "throughput"] +LoadGenerationMode = Literal[ + "synchronous", "constant", "poisson", "throughput", "concurrent" +] class LoadGenerator: @@ -52,8 +54,9 @@ def __init__(self, mode: LoadGenerationMode, rate: Optional[float] = None): logger.error(error) raise error - self._mode = mode - self._rate = rate + self._mode: LoadGenerationMode = mode + self._rate: Optional[float] = rate + logger.debug( "Initialized LoadGenerator with mode: {mode}, rate: {rate}", mode=mode, @@ -100,6 +103,8 @@ def times(self) -> Generator[float, None, None]: yield from self.poisson_times() elif self._mode == "synchronous": yield from self.synchronous_times() + elif self._mode == "concurrent": + yield from self.throughput_times() else: logger.error(f"Invalid mode encountered: {self._mode}") raise ValueError(f"Invalid mode: {self._mode}") diff --git a/tests/unit/executor/test_profile_generator.py b/tests/unit/executor/test_profile_generator.py index 9c91d574..40c2b2be 100644 --- a/tests/unit/executor/test_profile_generator.py +++ b/tests/unit/executor/test_profile_generator.py @@ -19,6 +19,7 @@ def test_profile_generator_mode(): "throughput", "constant", "poisson", + "concurrent", } @@ -41,6 +42,7 @@ def test_profile_instantiation(): ("constant", [10, 20, 30]), ("poisson", 10), ("poisson", [10, 20, 30]), + ("concurrent", 2), ], ) def test_profile_generator_instantiation(mode, rate): @@ -93,6 +95,8 @@ def test_profile_generator_instantiation(mode, rate): ("poisson", None), ("poisson", -1), ("poisson", 0), + ("concurrent", 0), + ("concurrent", -1), ], ) def test_profile_generator_invalid_instantiation(mode, rate): @@ -158,6 +162,20 @@ def test_profile_generator_next_throughput(): assert generator.next(current_report) is None +@pytest.mark.sanity() +def test_profile_generator_next_concurrent(): + generator = ProfileGenerator(mode="concurrent", rate=2.0) + current_report = TextGenerationBenchmarkReport() + + profile: Profile = generator.next(current_report) # type: ignore + assert profile.load_gen_mode == "concurrent" + assert profile.load_gen_rate == 2 + assert generator.generated_count == 1 + + for _ in range(3): + assert generator.next(current_report) is None + + @pytest.mark.sanity() @pytest.mark.parametrize( "rate", diff --git a/tests/unit/scheduler/test_load_generator.py b/tests/unit/scheduler/test_load_generator.py index 6b84ee01..4a04bdc9 100644 --- a/tests/unit/scheduler/test_load_generator.py +++ b/tests/unit/scheduler/test_load_generator.py @@ -14,6 +14,7 @@ def test_load_generator_mode(): "constant", "poisson", "throughput", + "concurrent", } @@ -25,6 +26,7 @@ def test_load_generator_mode(): ("poisson", 5), ("throughput", None), ("synchronous", None), + ("concurrent", 3), ], ) def test_load_generator_instantiation(mode, rate): @@ -40,6 +42,8 @@ def test_load_generator_instantiation(mode, rate): ("invalid_mode", None, ValueError), ("constant", 0, ValueError), ("poisson", -1, ValueError), + ("concurrent", -1, ValueError), + ("concurrent", 0, ValueError), ], ) def test_load_generator_invalid_instantiation(mode, rate, expected_error): diff --git a/tox.ini b/tox.ini index 36e28090..4b0f53c0 100644 --- a/tox.ini +++ b/tox.ini @@ -5,6 +5,7 @@ env_list = py38,py39,py310,py311,py312 [testenv] description = Run all tests +usedevelop = true deps = .[dev] commands =