Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/cli_options.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding multi-turn: does anyone switch service tiers mid-conversation?

I would think not, in which case sampling per-conversation seems to make more sense than per-turn.

Thoughts?

Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ Random seed for deterministic data generation. When set, makes synthetic prompts

Specify service level objectives (SLOs) for goodput as space-separated 'KEY:VALUE' pairs, where KEY is a metric tag and VALUE is a number in the metric's display unit (falls back to its base unit if no display unit is defined). Examples: 'request_latency:250' (ms), 'inter_token_latency:10' (ms), `output_token_throughput_per_user:600` (tokens/s). Only metrics applicable to the current endpoint/config are considered. For more context on the definition of goodput, refer to DistServe paper: https://arxiv.org/pdf/2401.09670 and the blog: https://hao-ai-lab.github.io/blogs/distserve.

#### `--service-tier-dist` `<str>`

Distribution of service_tier values for OpenAI API requests. Format: `tier:prob;tier:prob` (percentages 0-100 that must sum to 100). Example: `default:50;flex:30;priority:20`. Common tiers: auto, default, flex, scale, priority.

### Audio Input

#### `--audio-batch-size`, `--batch-size-audio` `<int>`
Expand Down
44 changes: 43 additions & 1 deletion src/aiperf/common/config/input_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from typing import Annotated, Any
from typing import TYPE_CHECKING, Annotated, Any

if TYPE_CHECKING:
from aiperf.common.models.service_tier_distribution import ServiceTierDistribution

from pydantic import BeforeValidator, Field, model_validator
from typing_extensions import Self
Expand Down Expand Up @@ -128,6 +131,20 @@ def validate_synthesis_requires_mooncake_trace(self) -> Self:
)
return self

@model_validator(mode="after")
def validate_service_tier_mutual_exclusivity(self) -> Self:
"""Validate that --service-tier-dist and --extra-inputs service_tier are not both set."""
if self.service_tier_distribution is not None and self.extra:
extra_dict = (
dict(self.extra) if isinstance(self.extra, list) else self.extra
)
if isinstance(extra_dict, dict) and "service_tier" in extra_dict:
raise ValueError(
"Cannot use both --service-tier-dist and --extra-inputs service_tier. "
"Use one or the other."
)
return self

@model_validator(mode="after")
def validate_goodput(self) -> Self:
"""
Expand Down Expand Up @@ -337,10 +354,35 @@ def validate_goodput(self) -> Self:
),
] = InputDefaults.GOODPUT

service_tier_distribution: Annotated[
str | None,
Field(
default=None,
description="Distribution of service_tier values for OpenAI API requests. "
"Format: `tier:prob;tier:prob` (percentages 0-100 that must sum to 100). "
"Example: `default:50;flex:30;priority:20`. "
"Common tiers: auto, default, flex, scale, priority.",
),
CLIParameter(
name=("--service-tier-dist",),
group=_CLI_GROUP,
),
] = None

audio: AudioConfig = AudioConfig()
image: ImageConfig = ImageConfig()
video: VideoConfig = VideoConfig()
prompt: PromptConfig = PromptConfig()
rankings: RankingsConfig = RankingsConfig()
synthesis: SynthesisConfig = SynthesisConfig()
conversation: ConversationConfig = ConversationConfig()

def get_service_tier_distribution(self) -> "ServiceTierDistribution | None":
"""Get service tier distribution object, returning None if not specified."""
if self.service_tier_distribution is not None:
from aiperf.common.models.service_tier_distribution import (
ServiceTierDistributionParser,
)

return ServiceTierDistributionParser.parse(self.service_tier_distribution)
return None
12 changes: 12 additions & 0 deletions src/aiperf/common/config/user_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,18 @@ def validate_rankings_token_options(self) -> Self:
)
return self

@model_validator(mode="after")
def validate_service_tier_endpoint_type(self) -> Self:
"""Validate that --service-tier-dist is only used with chat or completions endpoints."""
if self.input.service_tier_distribution is not None:
supported = {EndpointType.CHAT, EndpointType.COMPLETIONS}
if self.endpoint.type not in supported:
raise ValueError(
f"--service-tier-dist is only supported with chat and completions endpoints, "
f"got endpoint type '{self.endpoint.type}'"
)
return self

@model_validator(mode="after")
def validate_must_have_stop_condition(self) -> Self:
"""Validate that at least one stop condition is set (requests, sessions, or duration)"""
Expand Down
5 changes: 5 additions & 0 deletions src/aiperf/common/models/dataset_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ class Turn(AIPerfBaseModel):
max_tokens: int | None = Field(
default=None, description="Maximum number of tokens to generate for this turn."
)
service_tier: str | None = Field(
default=None,
description="The service_tier to use for this turn's API request.",
)
texts: list[Text] = Field(
default=[], description="Collection of text data in each turn."
)
Expand Down Expand Up @@ -159,6 +163,7 @@ def copy_with_stripped_media(self) -> "Turn":
timestamp=self.timestamp,
delay=self.delay,
max_tokens=self.max_tokens,
service_tier=self.service_tier,
texts=[Text(name=t.name, contents=list(t.contents)) for t in self.texts],
images=[
Image(
Expand Down
4 changes: 4 additions & 0 deletions src/aiperf/common/models/record_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ class MetricRecordMetadata(AIPerfBaseModel):
description="The wall clock timestamp of the request cancellation time measured as time.time_ns(), if applicable. "
"This is only applicable to requests that were cancelled.",
)
service_tier: str | None = Field(
default=None,
description="The service_tier returned by the API for this request.",
)


class ProfileResults(AIPerfBaseModel):
Expand Down
170 changes: 170 additions & 0 deletions src/aiperf/common/models/service_tier_distribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Service tier distribution for OpenAI API requests.

Allows distributing requests across different service tiers (e.g., default, flex, priority)
with configurable probabilities. Format: ``tier:prob;tier:prob`` where probabilities are
percentages 0-100 that must sum to 100.

Example:
>>> from aiperf.common.models.service_tier_distribution import ServiceTierDistributionParser
>>> dist = ServiceTierDistributionParser.parse("default:50;flex:30;priority:20")
>>> tier = dist.sample()
"""

from __future__ import annotations

from dataclasses import dataclass

import numpy as np

from aiperf.common import random_generator as rng
from aiperf.common.aiperf_logger import AIPerfLogger

logger = AIPerfLogger(__name__)


def _validate_probability_sum(entries: list[ServiceTierEntry]) -> None:
"""Validate that probabilities sum to approximately 100.0.

Args:
entries: List of ServiceTierEntry objects to validate

Raises:
ValueError: If probabilities don't sum to 100.0 (within floating-point tolerance)
"""
total_prob = sum(entry.probability for entry in entries)
if not np.isclose(total_prob, 100.0, rtol=1e-6, atol=1e-6):
raise ValueError(
f"Probabilities must sum to 100.0, got {total_prob:.6f}. "
f"Entries: {[str(e) for e in entries]}"
)


@dataclass(frozen=True)
class ServiceTierEntry:
"""Immutable representation of a service tier with probability weight."""

tier: str
probability: float

def __post_init__(self) -> None:
"""Validate tier name and probability on construction."""
if not self.tier or not self.tier.strip():
raise ValueError("Tier name must be non-empty")
if not 0.0 <= self.probability <= 100.0:
raise ValueError(f"Probability must be in [0,100], got {self.probability}")

def __str__(self) -> str:
return f"{self.tier}:{self.probability}%"


class ServiceTierDistribution:
"""Manages probability distribution of service tiers for request sampling.

Supports efficient O(log n) sampling using binary search on cumulative
probability distribution.
"""

def __init__(self, entries: list[ServiceTierEntry]) -> None:
"""Initialize distribution from list of service tier entries.

Args:
entries: List of ServiceTierEntry objects. Probabilities must sum to 100.

Raises:
ValueError: If entries is empty or probabilities don't sum to 100.
"""
if not entries:
raise ValueError(
"Distribution must contain at least one service tier entry"
)

self._rng = rng.derive("models.service_tier.distribution")
self._entries = tuple(entries)
_validate_probability_sum(list(self._entries))
self._cumulative_probs = self._compute_cumulative_probabilities()

logger.debug(
lambda: f"Created service tier distribution with {len(self._entries)} entries: {self}"
)

def _compute_cumulative_probabilities(self) -> np.ndarray:
"""Compute cumulative probability distribution for efficient sampling."""
probs = [entry.probability / 100.0 for entry in self._entries]
return np.cumsum(probs, dtype=np.float64)

def sample(self) -> str:
"""Sample a service tier according to the distribution.

Returns:
Service tier string value
"""
rand_val = self._rng.random()
idx = np.searchsorted(self._cumulative_probs, rand_val, side="right")
idx = min(idx, len(self._entries) - 1)
return self._entries[idx].tier

@property
def entries(self) -> tuple[ServiceTierEntry, ...]:
"""Get immutable view of service tier entries."""
return self._entries

def __str__(self) -> str:
entries_str = ";".join(str(entry) for entry in self._entries)
return f"ServiceTierDistribution[{entries_str}]"

def __repr__(self) -> str:
return f"ServiceTierDistribution({list(self._entries)})"


class ServiceTierDistributionParser:
"""Parser for service tier distribution strings."""

@classmethod
def parse(cls, dist_str: str) -> ServiceTierDistribution:
"""Parse a service tier distribution string.

Format: ``tier:prob;tier:prob`` where probabilities are percentages 0-100.

Args:
dist_str: Distribution specification string (e.g., "default:50;flex:30;priority:20")

Returns:
ServiceTierDistribution object

Raises:
ValueError: If string format is invalid
"""
if not isinstance(dist_str, str) or not dist_str.strip():
raise ValueError("Distribution string cannot be empty")

dist_str = dist_str.strip()
entries: list[ServiceTierEntry] = []

for pair_str in dist_str.split(";"):
pair_str = pair_str.strip()
if not pair_str:
continue

parts = pair_str.rsplit(":", 1)
if len(parts) != 2:
raise ValueError(
f"Invalid pair format: '{pair_str}'. Expected 'tier:probability'"
)

tier = parts[0].strip()
try:
probability = float(parts[1].strip())
except ValueError as e:
raise ValueError(
f"Invalid probability value in '{pair_str}': {parts[1].strip()}"
) from e

entries.append(ServiceTierEntry(tier=tier, probability=probability))

if not entries:
raise ValueError("No valid entries found in distribution string")

return ServiceTierDistribution(entries)
6 changes: 6 additions & 0 deletions src/aiperf/dataset/composer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def __init__(self, config: UserConfig, tokenizer: Tokenizer, **kwargs):
# Initialize sequence distribution
self._seq_distribution = config.input.prompt.get_sequence_distribution()

# Initialize service tier distribution
self._service_tier_distribution = config.input.get_service_tier_distribution()

# Cache for turn-level sequence lengths to ensure ISL/OSL pairing consistency
self._turn_sequence_cache: dict[int, tuple[int, int]] = {}

Expand Down Expand Up @@ -140,6 +143,9 @@ def _finalize_turn(self, turn: Turn) -> None:
turn.model = self._select_model_name()
self._set_max_tokens(turn)

if self._service_tier_distribution is not None:
turn.service_tier = self._service_tier_distribution.sample()

# Clear cached sequence lengths for this turn to free memory
turn_id = id(turn)
self._clear_turn_cache(turn_id)
Expand Down
10 changes: 9 additions & 1 deletion src/aiperf/endpoints/openai_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ def format_payload(self, request_info: RequestInfo) -> dict[str, Any]:
)
payload[token_field] = turns[-1].max_tokens

if turns[-1].service_tier is not None:
payload["service_tier"] = turns[-1].service_tier

if model_endpoint.endpoint.extra:
payload.update(model_endpoint.endpoint.extra)

Expand Down Expand Up @@ -199,7 +202,12 @@ def parse_response(
usage = json_obj.get("usage") or None

if data or usage:
return ParsedResponse(perf_ns=response.perf_ns, data=data, usage=usage)
metadata = {}
if (service_tier := json_obj.get("service_tier")) is not None:
metadata["service_tier"] = service_tier
return ParsedResponse(
perf_ns=response.perf_ns, data=data, usage=usage, metadata=metadata
)

return None

Expand Down
10 changes: 9 additions & 1 deletion src/aiperf/endpoints/openai_completions.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ def format_payload(self, request_info: RequestInfo) -> RequestOutputT:
if turn.max_tokens:
payload["max_tokens"] = turn.max_tokens

if turn.service_tier is not None:
payload["service_tier"] = turn.service_tier

if extra:
payload.update(extra)

Expand Down Expand Up @@ -87,7 +90,12 @@ def parse_response(
usage = json_obj.get("usage") or None

if data or usage:
return ParsedResponse(perf_ns=response.perf_ns, data=data, usage=usage)
metadata = {}
if (service_tier := json_obj.get("service_tier")) is not None:
metadata["service_tier"] = service_tier
return ParsedResponse(
perf_ns=response.perf_ns, data=data, usage=usage, metadata=metadata
)

return None

Expand Down
7 changes: 7 additions & 0 deletions src/aiperf/records/record_processor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ async def _on_inference_results(self, message: InferenceResultsMessage) -> None:
metadata = self._create_metric_record_metadata(
message.record, message.service_id
)

# Extract response service_tier (check reversed - streaming puts metadata on last chunk)
for resp in reversed(parsed_record.responses):
if st := resp.metadata.get("service_tier"):
metadata.service_tier = st
break

raw_results = await self._process_record(parsed_record, metadata)
results = []
for result in raw_results:
Expand Down
Loading