diff --git a/src/guidellm/__main__.py b/src/guidellm/__main__.py index 13a748d5..0a035551 100644 --- a/src/guidellm/__main__.py +++ b/src/guidellm/__main__.py @@ -31,6 +31,7 @@ from typing import Annotated, Union import click +from pydantic import ValidationError try: import uvloop @@ -55,6 +56,7 @@ ) from guidellm.benchmark.scenario import ( GenerativeTextScenario, + get_builtin_scenarios, ) from guidellm.mock_server import MockServer, MockServerConfig from guidellm.preprocess.dataset import ShortPromptStrategy, process_dataset @@ -135,6 +137,25 @@ def benchmark(): help="Run a benchmark against a generative model using the specified arguments.", context_settings={"auto_envvar_prefix": "GUIDELLM"}, ) +@click.option( + "--scenario", + type=cli_tools.Union( + click.Path( + exists=True, + readable=True, + file_okay=True, + dir_okay=False, + path_type=Path, + ), + click.Choice(get_builtin_scenarios()), + ), + default=None, + help=( + "The name of a builtin scenario or path to a config file. " + "Missing values from the config will use defaults. " + "Options specified on the commandline will override the scenario." + ), +) @click.option( "--target", type=str, @@ -161,7 +182,7 @@ def benchmark(): ) @click.option( "--rate", - default=None, + default=GenerativeTextScenario.get_default("rate"), help=( "The rates to run the benchmark at. " "Can be a single number or a comma-separated list of numbers. " @@ -183,18 +204,18 @@ def benchmark(): "--backend-type", # legacy alias "backend", type=click.Choice(list(get_literal_vals(BackendType))), + default=GenerativeTextScenario.get_default("backend"), help=( "The type of backend to use to run requests against. Defaults to 'openai_http'." f" Supported types: {', '.join(get_literal_vals(BackendType))}" ), - default="openai_http", ) @click.option( "--backend-kwargs", "--backend-args", # legacy alias "backend_kwargs", callback=cli_tools.parse_json, - default=None, + default=GenerativeTextScenario.get_default("backend_kwargs"), help=( "A JSON string containing any arguments to pass to the backend as a " "dict with **kwargs. Headers can be removed by setting their value to " @@ -204,7 +225,7 @@ def benchmark(): ) @click.option( "--model", - default=None, + default=GenerativeTextScenario.get_default("model"), type=str, help=( "The ID of the model to benchmark within the backend. " @@ -214,7 +235,7 @@ def benchmark(): # Data configuration @click.option( "--processor", - default=None, + default=GenerativeTextScenario.get_default("processor"), type=str, help=( "The processor or tokenizer to use to calculate token counts for statistics " @@ -224,7 +245,7 @@ def benchmark(): ) @click.option( "--processor-args", - default=None, + default=GenerativeTextScenario.get_default("processor_args"), callback=cli_tools.parse_json, help=( "A JSON string containing any arguments to pass to the processor constructor " @@ -233,7 +254,7 @@ def benchmark(): ) @click.option( "--data-args", - default=None, + default=GenerativeTextScenario.get_default("data_args"), callback=cli_tools.parse_json, help=( "A JSON string containing any arguments to pass to the dataset creation " @@ -242,7 +263,7 @@ def benchmark(): ) @click.option( "--data-sampler", - default=None, + default=GenerativeTextScenario.get_default("data_sampler"), type=click.Choice(["random"]), help=( "The data sampler type to use. 'random' will add a random shuffle on the data. " @@ -301,7 +322,7 @@ def benchmark(): "--warmup-percent", # legacy alias "warmup", type=float, - default=None, + default=GenerativeTextScenario.get_default("warmup"), help=( "The specification around the number of requests to run before benchmarking. " "If within (0, 1), then the percent of requests/time to use for warmup. " @@ -315,7 +336,7 @@ def benchmark(): "--cooldown-percent", # legacy alias "cooldown", type=float, - default=GenerativeTextScenario.get_default("cooldown_percent"), + default=GenerativeTextScenario.get_default("cooldown"), help=( "The specification around the number of requests to run after benchmarking. " "If within (0, 1), then the percent of requests/time to use for cooldown. " @@ -328,19 +349,19 @@ def benchmark(): "--request-samples", "--output-sampling", # legacy alias "request_samples", + default=GenerativeTextScenario.get_default("request_samples"), type=int, help=( "The number of samples for each request status and each benchmark to save " "in the output file. If None (default), will save all samples. " "Defaults to 20." ), - default=20, ) # Constraints configuration @click.option( "--max-seconds", type=float, - default=None, + default=GenerativeTextScenario.get_default("max_seconds"), help=( "The maximum number of seconds each benchmark can run for. " "If None, will run until max_requests or the data is exhausted." @@ -349,7 +370,7 @@ def benchmark(): @click.option( "--max-requests", type=int, - default=None, + default=GenerativeTextScenario.get_default("max_requests"), help=( "The maximum number of requests each benchmark can run for. " "If None, will run until max_seconds or the data is exhausted." @@ -358,55 +379,22 @@ def benchmark(): @click.option( "--max-errors", type=int, - default=None, + default=GenerativeTextScenario.get_default("max_errors"), help="Maximum number of errors allowed before stopping the benchmark", ) @click.option( "--max-error-rate", type=float, - default=None, + default=GenerativeTextScenario.get_default("max_error_rate"), help="Maximum error rate allowed before stopping the benchmark", ) @click.option( "--max-global-error-rate", type=float, - default=None, + default=GenerativeTextScenario.get_default("max_global_error_rate"), help="Maximum global error rate allowed across all benchmarks", ) -def run( - target, - data, - profile, - rate, - random_seed, - # Backend Configuration - backend, - backend_kwargs, - model, - # Data configuration - processor, - processor_args, - data_args, - data_sampler, - # Output configuration - output_path, - output_formats, - # Updates configuration - disable_console_outputs, - disable_progress, - display_scheduler_stats, - # Aggregators configuration - output_extras, - warmup, - cooldown, - request_samples, - # Constraints configuration - max_seconds, - max_requests, - max_errors, - max_error_rate, - max_global_error_rate, -): +def run(**kwargs): """ Execute a generative text benchmark against a target model backend. @@ -415,53 +403,53 @@ def run( Supports multiple backends, data sources, output formats, and constraint types for flexible benchmark configuration. """ + scenario = kwargs.pop("scenario") + click_ctx = click.get_current_context() + overrides = cli_tools.set_if_not_default(click_ctx, **kwargs) + + try: + # If a scenario file was specified read from it + if scenario is None: + _scenario = GenerativeTextScenario.model_validate(overrides) + elif isinstance(scenario, Path): + _scenario = GenerativeTextScenario.from_file(scenario, overrides) + else: # Only builtins can make it here; click will catch anything else + _scenario = GenerativeTextScenario.from_builtin(scenario, overrides) + except ValidationError as e: + # Translate pydantic valdation error to click argument error + errs = e.errors(include_url=False, include_context=True, include_input=True) + param_name = "--" + str(errs[0]["loc"][0]).replace("_", "-") + raise click.BadParameter( + errs[0]["msg"], ctx=click_ctx, param_hint=param_name + ) from e + if HAS_UVLOOP: asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) asyncio.run( benchmark_generative_text( - target=target, - data=data, - profile=profile, - rate=rate, - random_seed=random_seed, - # Backend configuration - backend=backend, - backend_kwargs=backend_kwargs, - model=model, - # Data configuration - processor=processor, - processor_args=processor_args, - data_args=data_args, - data_sampler=data_sampler, + scenario=_scenario, # Output configuration - output_path=output_path, + output_path=kwargs["output_path"], output_formats=[ fmt - for fmt in output_formats - if not disable_console_outputs or fmt != "console" + for fmt in kwargs["output_formats"] + if not kwargs["disable_console_outputs"] or fmt != "console" ], # Updates configuration progress=( [ GenerativeConsoleBenchmarkerProgress( - display_scheduler_stats=display_scheduler_stats + display_scheduler_stats=kwargs["display_scheduler_stats"] ) ] - if not disable_progress + if not kwargs["disable_progress"] else None ), - print_updates=not disable_console_outputs, + print_updates=not kwargs["disable_console_outputs"], # Aggregators configuration - add_aggregators={"extras": InjectExtrasAggregator(extras=output_extras)}, - warmup=warmup, - cooldown=cooldown, - request_samples=request_samples, - # Constraints configuration - max_seconds=max_seconds, - max_requests=max_requests, - max_errors=max_errors, - max_error_rate=max_error_rate, - max_global_error_rate=max_global_error_rate, + add_aggregators={ + "extras": InjectExtrasAggregator(extras=kwargs["output_extras"]) + }, ) ) diff --git a/src/guidellm/benchmark/__init__.py b/src/guidellm/benchmark/__init__.py index 76324a65..9fdb231d 100644 --- a/src/guidellm/benchmark/__init__.py +++ b/src/guidellm/benchmark/__init__.py @@ -40,9 +40,23 @@ BenchmarkerProgressGroup, GenerativeConsoleBenchmarkerProgress, ) +from .scenario import ( + GenerativeTextScenario, + Scenario, + enable_scenarios, + get_builtin_scenarios, +) +from .types import ( + AggregatorInputT, + DataInputT, + OutputFormatT, + ProcessorInputT, + ProgressInputT, +) __all__ = [ "Aggregator", + "AggregatorInputT", "AggregatorState", "AsyncProfile", "Benchmark", @@ -54,6 +68,7 @@ "BenchmarkerProgressGroup", "CompilableAggregator", "ConcurrentProfile", + "DataInputT", "GenerativeBenchmark", "GenerativeBenchmarkerCSV", "GenerativeBenchmarkerConsole", @@ -65,14 +80,21 @@ "GenerativeRequestStats", "GenerativeRequestsAggregator", "GenerativeStatsProgressAggregator", + "GenerativeTextScenario", "InjectExtrasAggregator", + "OutputFormatT", + "ProcessorInputT", "Profile", "ProfileType", + "ProgressInputT", + "Scenario", "SchedulerStatsAggregator", "SerializableAggregator", "SweepProfile", "SynchronousProfile", "ThroughputProfile", "benchmark_generative_text", + "enable_scenarios", + "get_builtin_scenarios", "reimport_benchmarks_report", ] diff --git a/src/guidellm/benchmark/benchmarker.py b/src/guidellm/benchmark/benchmarker.py index ae591c23..5f05065a 100644 --- a/src/guidellm/benchmark/benchmarker.py +++ b/src/guidellm/benchmark/benchmarker.py @@ -124,7 +124,7 @@ async def run( backend=backend, strategy=strategy, env=environment, - **constraints, + **constraints or {}, ): aggregators_update = AggregatorState() for key, aggregator in benchmark_aggregators.items(): diff --git a/src/guidellm/benchmark/entrypoints.py b/src/guidellm/benchmark/entrypoints.py index 828402d8..b926394f 100644 --- a/src/guidellm/benchmark/entrypoints.py +++ b/src/guidellm/benchmark/entrypoints.py @@ -1,14 +1,8 @@ from __future__ import annotations -from collections.abc import Iterable from pathlib import Path from typing import Any, Literal -from datasets import Dataset, DatasetDict, IterableDataset, IterableDatasetDict -from transformers import ( # type: ignore[import] - PreTrainedTokenizerBase, -) - from guidellm.backends import ( Backend, BackendType, @@ -16,8 +10,6 @@ GenerationResponse, ) from guidellm.benchmark.aggregator import ( - Aggregator, - CompilableAggregator, GenerativeRequestsAggregator, GenerativeStatsProgressAggregator, SchedulerStatsAggregator, @@ -29,11 +21,15 @@ GenerativeBenchmarkerOutput, ) from guidellm.benchmark.profile import Profile, ProfileType -from guidellm.benchmark.progress import ( - BenchmarkerProgress, - BenchmarkerProgressGroup, +from guidellm.benchmark.progress import BenchmarkerProgressGroup +from guidellm.benchmark.scenario import enable_scenarios +from guidellm.benchmark.types import ( + AggregatorInputT, + DataInputT, + OutputFormatT, + ProcessorInputT, + ProgressInputT, ) -from guidellm.benchmark.scenario import GenerativeTextScenario, Scenario from guidellm.request import GenerativeRequestLoader from guidellm.scheduler import ( ConstraintInitializer, @@ -44,7 +40,6 @@ __all__ = [ "benchmark_generative_text", - "benchmark_with_scenario", "reimport_benchmarks_report", ] @@ -52,29 +47,9 @@ _CURRENT_WORKING_DIR = Path.cwd() -# Data types - -DataType = ( - Iterable[str] - | Iterable[dict[str, Any]] - | Dataset - | DatasetDict - | IterableDataset - | IterableDatasetDict - | str - | Path -) - -OutputFormatType = ( - tuple[str, ...] - | list[str] - | dict[str, str | dict[str, Any] | GenerativeBenchmarkerOutput] - | None -) - - # Helper functions + async def initialize_backend( backend: BackendType | Backend, target: str, @@ -82,9 +57,7 @@ async def initialize_backend( backend_kwargs: dict[str, Any] | None, ) -> Backend: backend = ( - Backend.create( - backend, target=target, model=model, **(backend_kwargs or {}) - ) + Backend.create(backend, target=target, model=model, **(backend_kwargs or {})) if not isinstance(backend, Backend) else backend ) @@ -121,18 +94,19 @@ async def resolve_profile( ) return profile + async def resolve_output_formats( - output_formats: OutputFormatType, + output_formats: OutputFormatT, output_path: str | Path | None, ) -> dict[str, GenerativeBenchmarkerOutput]: - output_formats = GenerativeBenchmarkerOutput.resolve( + return GenerativeBenchmarkerOutput.resolve( output_formats=(output_formats or {}), output_path=output_path ) - return output_formats + async def finalize_outputs( report: GenerativeBenchmarksReport, - resolved_output_formats: dict[str, GenerativeBenchmarkerOutput] + resolved_output_formats: dict[str, GenerativeBenchmarkerOutput], ): output_format_results = {} for key, output in resolved_output_formats.items(): @@ -143,43 +117,32 @@ async def finalize_outputs( # Complete entrypoints -async def benchmark_with_scenario(scenario: Scenario, **kwargs): - """ - Run a benchmark using a scenario and specify any extra arguments - """ - - if isinstance(scenario, GenerativeTextScenario): - return await benchmark_generative_text(**vars(scenario), **kwargs) - else: - raise ValueError(f"Unsupported Scenario type {type(scenario)}") - # @validate_call(config={"arbitrary_types_allowed": True}) +@enable_scenarios async def benchmark_generative_text( # noqa: C901 target: str, - data: DataType, + data: DataInputT, profile: StrategyType | ProfileType | Profile, - rate: float | list[float] | None = None, + rate: list[float] | None = None, random_seed: int = 42, # Backend configuration backend: BackendType | Backend = "openai_http", backend_kwargs: dict[str, Any] | None = None, model: str | None = None, # Data configuration - processor: str | Path | PreTrainedTokenizerBase | None = None, + processor: ProcessorInputT | None = None, processor_args: dict[str, Any] | None = None, data_args: dict[str, Any] | None = None, data_sampler: Literal["random"] | None = None, # Output configuration output_path: str | Path | None = _CURRENT_WORKING_DIR, - output_formats: OutputFormatType = ("console", "json", "html", "csv"), + output_formats: OutputFormatT = ("console", "json", "html", "csv"), # Updates configuration - progress: tuple[str, ...] | list[str] | list[BenchmarkerProgress] | None = None, + progress: ProgressInputT | None = None, print_updates: bool = False, # Aggregators configuration - add_aggregators: ( - dict[str, str | dict[str, Any] | Aggregator | CompilableAggregator] | None - ) = None, + add_aggregators: AggregatorInputT | None = None, warmup: float | None = None, cooldown: float | None = None, request_samples: int | None = 20, @@ -296,7 +259,9 @@ async def benchmark_generative_text( # noqa: C901 ) with console.print_update_step(title="Resolving output formats") as console_step: - resolved_output_formats = await resolve_output_formats(output_formats, output_path) + resolved_output_formats = await resolve_output_formats( + output_formats, output_path + ) console_step.finish( title="Output formats resolved", details={key: str(val) for key, val in resolved_output_formats.items()}, @@ -351,7 +316,7 @@ async def benchmark_generative_text( # noqa: C901 async def reimport_benchmarks_report( file: Path, output_path: Path | None, - output_formats: OutputFormatType = ("console", "json", "html", "csv"), + output_formats: OutputFormatT = ("console", "json", "html", "csv"), ) -> tuple[GenerativeBenchmarksReport, dict[str, Any]]: """ The command-line entry point for re-importing and displaying an @@ -363,10 +328,15 @@ async def reimport_benchmarks_report( title=f"Loading benchmarks from {file}" ) as console_step: report = GenerativeBenchmarksReport.load_file(file) - console_step.finish(f"Import of old benchmarks complete; loaded {len(report.benchmarks)} benchmark(s)") + console_step.finish( + "Import of old benchmarks complete;" + f" loaded {len(report.benchmarks)} benchmark(s)" + ) with console.print_update_step(title="Resolving output formats") as console_step: - resolved_output_formats = await resolve_output_formats(output_formats, output_path) + resolved_output_formats = await resolve_output_formats( + output_formats, output_path + ) console_step.finish( title="Output formats resolved", details={key: str(val) for key, val in resolved_output_formats.items()}, diff --git a/src/guidellm/benchmark/profile.py b/src/guidellm/benchmark/profile.py index 042179ba..3d4e7287 100644 --- a/src/guidellm/benchmark/profile.py +++ b/src/guidellm/benchmark/profile.py @@ -29,8 +29,17 @@ ) import numpy as np -from pydantic import Field, computed_field, field_serializer, field_validator +from pydantic import ( + Field, + NonNegativeFloat, + PositiveFloat, + PositiveInt, + computed_field, + field_serializer, + field_validator, +) +from guidellm import settings from guidellm.scheduler import ( AsyncConstantStrategy, AsyncPoissonStrategy, @@ -86,7 +95,7 @@ def __pydantic_schema_base_type__(cls) -> type[Profile]: def create( cls, rate_type: str, - rate: float | int | list[float | int] | None, + rate: list[float] | None, random_seed: int = 42, **kwargs: Any, ) -> Profile: @@ -112,7 +121,7 @@ def create( def resolve_args( cls, rate_type: str, - rate: float | int | list[float, int] | None, + rate: list[float] | None, random_seed: int, **kwargs: Any, ) -> dict[str, Any]: @@ -265,7 +274,7 @@ class SynchronousProfile(Profile): def resolve_args( cls, rate_type: str, - rate: float | int | list[float, int] | None, + rate: list[float] | None, random_seed: int, **kwargs: Any, ) -> dict[str, Any]: @@ -316,24 +325,22 @@ class ConcurrentProfile(Profile): """Fixed-concurrency strategy execution profile with configurable stream counts.""" type_: Literal["concurrent"] = "concurrent" # type: ignore[assignment] - streams: int | list[int] = Field( + streams: list[PositiveInt] = Field( description="Number of concurrent streams for request scheduling", - gt=0, ) - startup_duration: float = Field( + startup_duration: NonNegativeFloat = Field( default=0.0, description=( "Duration in seconds for distributing startup requests " "before completion-based timing" ), - ge=0, ) @classmethod def resolve_args( cls, rate_type: str, - rate: float | int | list[float, int] | None, + rate: list[float] | None, random_seed: int, **kwargs: Any, ) -> dict[str, Any]: @@ -348,14 +355,13 @@ def resolve_args( :raises ValueError: If rate is None. """ _ = (rate_type, random_seed) # unused - kwargs["streams"] = rate + kwargs["streams"] = [int(r) for r in rate] if rate else None return kwargs @property def strategy_types(self) -> list[StrategyType]: """Get concurrent strategy types for each configured stream count.""" - num_strategies = len(self.streams) if isinstance(self.streams, list) else 1 - return [self.type_] * num_strategies + return [self.type_] * len(self.streams) def next_strategy( self, @@ -370,13 +376,12 @@ def next_strategy( :return: ConcurrentStrategy with next stream count, or None if complete. """ _ = (prev_strategy, prev_benchmark) # unused - streams = self.streams if isinstance(self.streams, list) else [self.streams] - if len(self.completed_strategies) >= len(streams): + if len(self.completed_strategies) >= len(self.streams): return None return ConcurrentStrategy( - streams=streams[len(self.completed_strategies)], + streams=self.streams[len(self.completed_strategies)], startup_duration=self.startup_duration, ) @@ -388,25 +393,23 @@ class ThroughputProfile(Profile): """ type_: Literal["throughput"] = "throughput" # type: ignore[assignment] - max_concurrency: int | None = Field( + max_concurrency: PositiveInt | None = Field( default=None, description="Maximum number of concurrent requests to schedule", - gt=0, ) - startup_duration: float = Field( + startup_duration: NonNegativeFloat = Field( default=0.0, description=( "Duration in seconds for distributing startup requests " "before full throughput scheduling" ), - ge=0, ) @classmethod def resolve_args( cls, rate_type: str, - rate: float | int | list[float, int] | None, + rate: list[float] | None, random_seed: int, **kwargs: Any, ) -> dict[str, Any]: @@ -422,8 +425,8 @@ def resolve_args( _ = (rate_type, random_seed) # unused # Remap rate to max_concurrency, strip out random_seed kwargs.pop("random_seed", None) - if rate is not None: - kwargs["max_concurrency"] = rate + if rate is not None and len(rate) > 0: + kwargs["max_concurrency"] = rate[0] return kwargs @property @@ -463,22 +466,19 @@ class AsyncProfile(Profile): strategy_type: Literal["constant", "poisson"] = Field( description="Type of asynchronous strategy pattern to use", ) - rate: float | list[float] = Field( + rate: list[PositiveFloat] = Field( description="Request scheduling rate in requests per second", - gt=0, ) - startup_duration: float = Field( + startup_duration: NonNegativeFloat = Field( default=0.0, description=( "Duration in seconds for distributing startup requests " "to converge quickly to desired rate" ), - ge=0, ) - max_concurrency: int | None = Field( + max_concurrency: PositiveInt | None = Field( default=None, description="Maximum number of concurrent requests to schedule", - gt=0, ) random_seed: int = Field( default=42, @@ -489,7 +489,7 @@ class AsyncProfile(Profile): def resolve_args( cls, rate_type: str, - rate: float | int | list[float, int] | None, + rate: list[float] | None, random_seed: int, **kwargs: Any, ) -> dict[str, Any]: @@ -523,7 +523,7 @@ def resolve_args( @property def strategy_types(self) -> list[StrategyType]: """Get async strategy types for each configured rate.""" - num_strategies = len(self.rate) if isinstance(self.rate, list) else 1 + num_strategies = len(self.rate) return [self.strategy_type] * num_strategies def next_strategy( @@ -541,12 +541,11 @@ def next_strategy( :raises ValueError: If strategy_type is neither 'constant' nor 'poisson'. """ _ = (prev_strategy, prev_benchmark) # unused - rate = self.rate if isinstance(self.rate, list) else [self.rate] - if len(self.completed_strategies) >= len(rate): + if len(self.completed_strategies) >= len(self.rate): return None - current_rate = rate[len(self.completed_strategies)] + current_rate = self.rate[len(self.completed_strategies)] if self.strategy_type == "constant": return AsyncConstantStrategy( @@ -577,18 +576,16 @@ class SweepProfile(Profile): ge=2, ) strategy_type: Literal["constant", "poisson"] = "constant" - startup_duration: float = Field( + startup_duration: NonNegativeFloat = Field( default=0.0, description=( "Duration in seconds for distributing startup requests " "to converge quickly to desired rate" ), - ge=0, ) - max_concurrency: int | None = Field( + max_concurrency: PositiveInt | None = Field( default=None, description="Maximum number of concurrent requests to schedule", - gt=0, ) random_seed: int = Field( default=42, @@ -615,7 +612,7 @@ class SweepProfile(Profile): def resolve_args( cls, rate_type: str, - rate: float | int | list[float, int] | None, + rate: list[float] | None, random_seed: int, **kwargs: Any, ) -> dict[str, Any]: @@ -628,7 +625,8 @@ def resolve_args( :param kwargs: Additional arguments to pass through. :return: Dictionary of resolved arguments. """ - kwargs["sweep_size"] = kwargs.get("sweep_size", rate) + sweep_size_from_rate = int(rate[0]) if rate else settings.default_sweep_number + kwargs["sweep_size"] = kwargs.get("sweep_size", sweep_size_from_rate) kwargs["random_seed"] = random_seed if rate_type in ["constant", "poisson"]: kwargs["strategy_type"] = rate_type diff --git a/src/guidellm/benchmark/scenario.py b/src/guidellm/benchmark/scenario.py index 3f84f868..b53ef424 100644 --- a/src/guidellm/benchmark/scenario.py +++ b/src/guidellm/benchmark/scenario.py @@ -1,22 +1,27 @@ from __future__ import annotations -from collections.abc import Iterable -from functools import cache +import json +from functools import cache, wraps +from inspect import Parameter, signature from pathlib import Path -from typing import Annotated, Any, Literal, TypeVar +from typing import Annotated, Any, Callable, Literal, TypeVar -from datasets import Dataset, DatasetDict, IterableDataset, IterableDatasetDict -from pydantic import BeforeValidator, Field, NonNegativeInt, PositiveFloat, PositiveInt -from transformers.tokenization_utils_base import ( # type: ignore[import] - PreTrainedTokenizerBase, -) +import yaml +from loguru import logger +from pydantic import BeforeValidator, Field, PositiveFloat, PositiveInt, SkipValidation -from guidellm.backends import BackendType -from guidellm.benchmark.profile import ProfileType +from guidellm.backends import Backend, BackendType +from guidellm.benchmark.profile import Profile, ProfileType +from guidellm.benchmark.types import AggregatorInputT, DataInputT, ProcessorInputT from guidellm.scheduler import StrategyType from guidellm.utils import StandardBaseModel -__ALL__ = ["Scenario", "GenerativeTextScenario", "get_builtin_scenarios"] +__all__ = [ + "GenerativeTextScenario", + "Scenario", + "enable_scenarios", + "get_builtin_scenarios", +] SCENARIO_DIR = Path(__file__).parent / "scenarios/" @@ -58,6 +63,30 @@ class Scenario(StandardBaseModel): target: str + @classmethod + def get_default(cls: type[T], field: str) -> Any: + """Get default values for model fields""" + return cls.model_fields[field].default + + @classmethod + def from_file(cls: type[T], filename: Path, overrides: dict | None = None) -> T: + """ + Attempt to create a new instance of the model using + data loaded from json or yaml file. + """ + try: + with filename.open() as f: + if str(filename).endswith(".json"): + data = json.load(f) + else: # Assume everything else is yaml + data = yaml.safe_load(f) + except (json.JSONDecodeError, yaml.YAMLError) as e: + logger.error(f"Failed to parse {filename} as type {cls.__name__}") + raise ValueError(f"Error when parsing file: {filename}") from e + + data.update(overrides or {}) + return cls.model_validate(data) + @classmethod def from_builtin(cls: type[T], name: str, overrides: dict | None = None) -> T: filename = SCENARIO_DIR / f"{name}.json" @@ -78,29 +107,67 @@ class Config: # types like PreTrainedTokenizerBase arbitrary_types_allowed = True - backend_type: BackendType = "openai_http" - backend_args: dict[str, Any] | None = None + data: Annotated[ + DataInputT, + # BUG: See https://github.com/pydantic/pydantic/issues/9541 + SkipValidation, + ] + profile: StrategyType | ProfileType | Profile + rate: Annotated[list[PositiveFloat] | None, BeforeValidator(parse_float_list)] = ( + None + ) + random_seed: int = 42 + # Backend configuration + backend: BackendType | Backend = "openai_http" + backend_kwargs: dict[str, Any] | None = None model: str | None = None - processor: str | Path | PreTrainedTokenizerBase | None = None + # Data configuration + processor: ProcessorInputT | None = None processor_args: dict[str, Any] | None = None - data: ( - str - | Path - | Iterable[str | dict[str, Any]] - | Dataset - | DatasetDict - | IterableDataset - | IterableDatasetDict - ) data_args: dict[str, Any] | None = None data_sampler: Literal["random"] | None = None - rate_type: StrategyType | ProfileType - rate: Annotated[list[PositiveFloat] | None, BeforeValidator(parse_float_list)] = ( - None - ) - max_seconds: PositiveFloat | None = None + # Aggregators configuration + add_aggregators: AggregatorInputT | None = None + warmup: Annotated[float | None, Field(gt=0, le=1)] = None + cooldown: Annotated[float | None, Field(gt=0, le=1)] = None + request_samples: PositiveInt | None = 20 + # Constraints configuration + max_seconds: PositiveFloat | PositiveInt | None = None max_requests: PositiveInt | None = None - warmup_percent: Annotated[float | None, Field(gt=0, le=1)] = None - cooldown_percent: Annotated[float | None, Field(gt=0, le=1)] = None - output_sampling: NonNegativeInt | None = None - random_seed: int = 42 + max_errors: PositiveInt | None = None + max_error_rate: PositiveFloat | None = None + max_global_error_rate: PositiveFloat | None = None + + +# Decorator function to apply scenario to a function +def enable_scenarios(func: Callable) -> Any: + @wraps(func) + async def decorator(*args, scenario: Scenario | None = None, **kwargs) -> Any: + if scenario is not None: + kwargs.update(scenario.model_dump()) + return await func(*args, **kwargs) + + # Modify the signature of the decorator to include the `scenario` argument + sig = signature(func) + params = list(sig.parameters.values()) + # Place `scenario` before `**kwargs` or any parameter with a default value + loc = next( + ( + i + for i, p in enumerate(params) + if p.kind is Parameter.VAR_KEYWORD or p.default is not Parameter.empty + ), + len(params), + ) + params.insert( + loc, + Parameter( + "scenario", + Parameter.POSITIONAL_OR_KEYWORD, + default=None, + annotation=Scenario | None, + ), + ) + decorator.__signature__ = sig.replace(parameters=params) # type: ignore [attr-defined] + + return decorator diff --git a/src/guidellm/benchmark/scenarios/chat.json b/src/guidellm/benchmark/scenarios/chat.json index 024438c5..7ed4ce16 100644 --- a/src/guidellm/benchmark/scenarios/chat.json +++ b/src/guidellm/benchmark/scenarios/chat.json @@ -1,13 +1,4 @@ { - "rate_type": "sweep", - "data": { - "prompt_tokens": 512, - "prompt_tokens_stdev": 128, - "prompt_tokens_min": 1, - "prompt_tokens_max": 1024, - "output_tokens": 256, - "output_tokens_stdev": 64, - "output_tokens_min": 1, - "output_tokens_max": 1024 - } + "profile": "sweep", + "data": "prompt_tokens=512,prompt_tokens_stdev=128,prompt_tokens_min=1,prompt_tokens_max=1024,output_tokens=256,output_tokens_stdev=64,output_tokens_min=1,output_tokens_max=1024" } diff --git a/src/guidellm/benchmark/scenarios/rag.json b/src/guidellm/benchmark/scenarios/rag.json index c7ee2f27..d790ce60 100644 --- a/src/guidellm/benchmark/scenarios/rag.json +++ b/src/guidellm/benchmark/scenarios/rag.json @@ -1,13 +1,4 @@ { - "rate_type": "sweep", - "data": { - "prompt_tokens": 4096, - "prompt_tokens_stdev": 512, - "prompt_tokens_min": 2048, - "prompt_tokens_max": 6144, - "output_tokens": 512, - "output_tokens_stdev": 128, - "output_tokens_min": 1, - "output_tokens_max": 1024 - } + "profile": "sweep", + "data": "prompt_tokens=4096,prompt_tokens_stdev=512,prompt_tokens_min=2048,prompt_tokens_max=6144,output_tokens=512,output_tokens_stdev=128,output_tokens_min=1,output_tokens_max=1024" } diff --git a/src/guidellm/benchmark/types.py b/src/guidellm/benchmark/types.py new file mode 100644 index 00000000..04ad4061 --- /dev/null +++ b/src/guidellm/benchmark/types.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from collections.abc import Iterable +from pathlib import Path +from typing import Any, TypeAliasType + +from datasets import Dataset, DatasetDict, IterableDataset, IterableDatasetDict +from transformers import ( # type: ignore[import] + PreTrainedTokenizerBase, +) + +from guidellm.benchmark.aggregator import ( + Aggregator, + CompilableAggregator, +) +from guidellm.benchmark.output import ( + GenerativeBenchmarkerOutput, +) +from guidellm.benchmark.progress import BenchmarkerProgress + +__all__ = [ + "AggregatorInputT", + "DataInputT", + "OutputFormatT", + "ProcessorInputT", + "ProgressInputT", +] + + +DataInputT = TypeAliasType( + "DataInputT", + Iterable[str] + | Iterable[dict[str, Any]] + | Dataset + | DatasetDict + | IterableDataset + | IterableDatasetDict + | str + | Path, +) + +OutputFormatT = TypeAliasType( + "OutputFormatT", + tuple[str, ...] + | list[str] + | dict[str, str | dict[str, Any] | GenerativeBenchmarkerOutput] + | None, +) + +ProcessorInputT = TypeAliasType("ProcessorInputT", str | Path | PreTrainedTokenizerBase) + +ProgressInputT = TypeAliasType( + "ProgressInputT", tuple[str, ...] | list[str] | list[BenchmarkerProgress] +) + +AggregatorInputT = TypeAliasType( + "AggregatorInputT", + dict[str, str | dict[str, Any] | Aggregator | CompilableAggregator], +)