Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions docs/cli-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,20 @@ Start offset in milliseconds for fixed schedule replay. Skips all requests befor
End offset in milliseconds for fixed schedule replay. Stops issuing requests after this timestamp, allowing benchmark of specific trace subsets. Requests at exactly the end offset are included. Defaults to last timestamp in dataset. Must be ≥ `--fixed-schedule-start-offset` if both specified.
<br/>_Constraints: ≥ 0_

#### `--fixed-schedule-speedup` `<float>`

Scaling factor for fixed schedule timestamps. A value of 2.0 replays the schedule twice as fast (halving inter-request delays), while 0.5 replays at half speed (doubling delays). Applied at the timing layer to any dataset using `--fixed-schedule`.
<br/>_Constraints: > 0_

#### `--public-dataset` `<str>`

Pre-configured public dataset to download and use for benchmarking (e.g., `sharegpt`). AIPerf automatically downloads and parses these datasets. Mutually exclusive with `--custom-dataset-type`. Run `aiperf plugins public_dataset_loader` to list available datasets.
<br/>_Choices: [`sharegpt`, `aimo`]_

#### `--custom-dataset-type` `<str>`

Format specification for custom dataset provided via `--input-file`. Determines parsing logic and expected file structure. Options: `single_turn` (JSONL with single exchanges), `multi_turn` (JSONL with conversation history), `mooncake_trace`/`bailian_trace` (timestamped trace files), `random_pool` (directory of reusable prompts; when using `random_pool`, `--conversation-num` defaults to 100 if not specified; batch sizes > 1 sample each modality independently from a flat pool and do not preserve per-entry associations — use `single_turn` if paired modalities must stay together). Requires `--input-file`. Mutually exclusive with `--public-dataset`.
<br/>_Choices: [`bailian_trace`, `mooncake_trace`, `multi_turn`, `random_pool`, `single_turn`]_
Format specification for custom dataset provided via `--input-file`. Determines parsing logic and expected file structure. Options: `single_turn` (JSONL with single exchanges), `multi_turn` (JSONL with conversation history), `mooncake_trace`/`bailian_trace` (timestamped trace files), `conflux` (Conflux proxy capture with agent_id grouping and timestamp-based replay), `random_pool` (directory of reusable prompts; when using `random_pool`, `--conversation-num` defaults to 100 if not specified; batch sizes > 1 sample each modality independently from a flat pool and do not preserve per-entry associations — use `single_turn` if paired modalities must stay together). Requires `--input-file`. Mutually exclusive with `--public-dataset`.
<br/>_Choices: [`bailian_trace`, `conflux`, `mooncake_trace`, `multi_turn`, `random_pool`, `single_turn`]_

#### `--dataset-sampling-strategy` `<str>`

Expand All @@ -246,6 +251,11 @@ Random seed for deterministic data generation. When set, makes synthetic prompts

Specify service level objectives (SLOs) for goodput as space-separated 'KEY:VALUE' pairs, where KEY is a metric tag and VALUE is a number in the metric's display unit (falls back to its base unit if no display unit is defined). Examples: 'request_latency:250' (ms), 'inter_token_latency:10' (ms), `output_token_throughput_per_user:600` (tokens/s). Only metrics applicable to the current endpoint/config are considered. For more context on the definition of goodput, refer to DistServe paper: https://arxiv.org/pdf/2401.09670 and the blog: https://hao-ai-lab.github.io/blogs/distserve.

#### `--conflux-include-utility-calls`

Include unattributed utility calls when loading Conflux proxy captures. These are lightweight model calls made by the client for housekeeping tasks (topic detection, title generation) that lack an agent_id and may fall outside the main session timeline. Applies when Conflux format is specified via --custom-dataset-type conflux or auto-detected from file contents.
<br/>_Flag (no value required)_

### Audio Input

#### `--audio-batch-size`, `--batch-size-audio` `<int>`
Expand Down
45 changes: 44 additions & 1 deletion src/aiperf/common/config/input_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,19 @@ def validate_fixed_schedule_start_and_end_offset(self) -> Self:
)
return self

@model_validator(mode="after")
def validate_no_double_speedup(self) -> Self:
"""Reject combining synthesis speedup with fixed-schedule speedup."""
if (
self.synthesis.speedup_ratio != 1.0
and self.fixed_schedule_speedup is not None
):
raise ValueError(
"--synthesis-speedup-ratio and --fixed-schedule-speedup cannot be used together. "
"Use --fixed-schedule-speedup to control replay speed at the timing layer."
)
return self

@model_validator(mode="after")
def validate_dataset_type(self) -> Self:
"""Validate the different dataset type configuration."""
Expand Down Expand Up @@ -267,6 +280,20 @@ def validate_goodput(self) -> Self:
),
] = InputDefaults.FIXED_SCHEDULE_END_OFFSET

fixed_schedule_speedup: Annotated[
float | None,
Field(
gt=0,
description="Scaling factor for fixed schedule timestamps. A value of 2.0 replays the schedule twice as fast "
"(halving inter-request delays), while 0.5 replays at half speed (doubling delays). "
"Applied at the timing layer to any dataset using `--fixed-schedule`.",
),
CLIParameter(
name=("--fixed-schedule-speedup",),
group=_CLI_GROUP,
),
] = None

public_dataset: Annotated[
PublicDatasetType | None,
Field(
Expand All @@ -285,7 +312,8 @@ def validate_goodput(self) -> Self:
Field(
description="Format specification for custom dataset provided via `--input-file`. Determines parsing logic and expected file structure. "
"Options: `single_turn` (JSONL with single exchanges), `multi_turn` (JSONL with conversation history), "
"`mooncake_trace`/`bailian_trace` (timestamped trace files), `random_pool` (directory of reusable prompts; "
"`mooncake_trace`/`bailian_trace` (timestamped trace files), `conflux` (Conflux proxy capture with "
"agent_id grouping and timestamp-based replay), `random_pool` (directory of reusable prompts; "
"when using `random_pool`, `--conversation-num` defaults to 100 if not specified; "
"batch sizes > 1 sample each modality independently from a flat pool and do not preserve "
"per-entry associations — use `single_turn` if paired modalities must stay together). "
Expand Down Expand Up @@ -348,6 +376,21 @@ def validate_goodput(self) -> Self:
),
] = InputDefaults.GOODPUT

conflux_include_utility_calls: Annotated[
bool,
Field(
description="Include unattributed utility calls when loading Conflux proxy captures. "
"These are lightweight model calls made by the client for housekeeping tasks "
"(topic detection, title generation) that lack an agent_id and may fall outside "
"the main session timeline. Applies when Conflux format is specified via "
"--custom-dataset-type conflux or auto-detected from file contents.",
),
CLIParameter(
name=("--conflux-include-utility-calls",),
group=_CLI_GROUP,
),
] = False

audio: AudioConfig = AudioConfig()
image: ImageConfig = ImageConfig()
video: VideoConfig = VideoConfig()
Expand Down
11 changes: 11 additions & 0 deletions src/aiperf/common/models/dataset_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ class Turn(AIPerfBaseModel):
videos: list[Video] = Field(
default=[], description="Collection of video data in each turn."
)
input_tokens: int | None = Field(
default=None,
description="Expected input token count for this turn (from trace data).",
)
extra_params: dict[str, Any] | None = Field(
default=None,
description="Per-turn hyperparameter overrides merged into the API payload at the top level. "
"Populated from dataset capture metadata.",
)

def metadata(self) -> TurnMetadata:
"""Get the metadata of the turn."""
Expand Down Expand Up @@ -209,6 +218,8 @@ def copy_with_stripped_media(self) -> "Turn":
)
for vid in self.videos
],
input_tokens=self.input_tokens,
extra_params=dict(self.extra_params) if self.extra_params else None,
)


Expand Down
15 changes: 12 additions & 3 deletions src/aiperf/dataset/composer/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
from pathlib import Path
from typing import Any

import orjson

from aiperf.common.config import UserConfig
from aiperf.common.enums import ConversationContextMode
from aiperf.common.models import Conversation
from aiperf.common.tokenizer import Tokenizer
from aiperf.common.utils import load_json_str
from aiperf.dataset.composer.base import BaseDatasetComposer
from aiperf.dataset.loader.base_loader import BaseLoader
from aiperf.dataset.utils import check_file_exists
Expand Down Expand Up @@ -83,12 +84,20 @@ def _infer_dataset_type(self, file_path: str) -> CustomDatasetType:
if path.is_dir():
return self._infer_type(data=None, filename=file_path)

# For files, read first non-empty line and use both content and path detection
# For files, read first non-empty line and use both content and path detection.
# If the first line isn't valid JSON (e.g. pretty-printed JSON arrays start
# with "["), fall through to filename-only detection so file-probing loaders
# like ConfluxLoader can inspect the file directly.
with open(file_path) as f:
for line in f:
if not (line := line.strip()):
continue
data = load_json_str(line)
try:
data = orjson.loads(line)
except orjson.JSONDecodeError:
return self._infer_type(data=None, filename=file_path)
if not isinstance(data, dict):
return self._infer_type(data=None, filename=file_path)
return self._infer_type(data=data, filename=file_path)

except ValueError as e:
Expand Down
4 changes: 4 additions & 0 deletions src/aiperf/dataset/loader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
from aiperf.dataset.loader.base_loader import BaseFileLoader, BaseLoader
from aiperf.dataset.loader.base_public_dataset import BasePublicDatasetLoader
from aiperf.dataset.loader.base_trace_loader import BaseTraceDatasetLoader
from aiperf.dataset.loader.conflux import ConfluxLoader
from aiperf.dataset.loader.mixins import MediaConversionMixin
from aiperf.dataset.loader.models import (
BailianTrace,
ConfluxRecord,
MooncakeTrace,
MultiTurn,
RandomPool,
Expand All @@ -27,6 +29,8 @@
"BaseLoader",
"BasePublicDatasetLoader",
"BaseTraceDatasetLoader",
"ConfluxLoader",
"ConfluxRecord",
"MediaConversionMixin",
"MooncakeTrace",
"MooncakeTraceDatasetLoader",
Expand Down
195 changes: 195 additions & 0 deletions src/aiperf/dataset/loader/conflux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Conflux dataset loader for timestamp-based replay of proxy captures.

Loads JSON files containing arrays of API request records. Groups records
by agent_id into independent Conversations with timestamp-based inter-turn
delays for fixed-schedule replay.
"""

from __future__ import annotations

from pathlib import Path
from typing import Any

import orjson
from pydantic import ValidationError

from aiperf.common.aiperf_logger import AIPerfLogger
from aiperf.common.enums import ConversationContextMode
from aiperf.common.models import Conversation, Turn
from aiperf.dataset.loader.base_loader import BaseFileLoader
from aiperf.dataset.loader.models import ConfluxRecord
from aiperf.plugin.enums import DatasetSamplingStrategy

_EXTRA_PARAMS_SKIP = frozenset(
{
"max_tokens",
"max_completion_tokens",
"max_output_tokens",
}
)


_logger = AIPerfLogger(__name__)


class ConfluxLoader(BaseFileLoader):
"""Dataset loader for Conflux proxy capture JSON files.

Each agent_id group becomes an independent Conversation with
zero-aligned timestamps for fixed-schedule replay.
"""

@classmethod
def get_default_context_mode(cls) -> ConversationContextMode:
return ConversationContextMode.MESSAGE_ARRAY_WITH_RESPONSES

@classmethod
def get_preferred_sampling_strategy(cls) -> DatasetSamplingStrategy:
return DatasetSamplingStrategy.SEQUENTIAL

@classmethod
def can_load(
cls, data: dict[str, Any] | None = None, filename: str | Path | None = None
) -> bool:
"""Return True if filename is a Conflux JSON file or directory."""
if filename is None:
return False
path = Path(filename)
if path.is_dir():
first = next(path.glob("*.json"), None)
return first is not None and cls._probe_file(first)
return cls._probe_file(path)

@classmethod
def _probe_file(cls, path: Path) -> bool:
"""Return True if the file loads as a valid Conflux JSON array."""
if not path.is_file() or path.suffix != ".json":
return False
try:
raw_records: list[dict[str, Any]] = orjson.loads(path.read_bytes())
if not raw_records or not isinstance(raw_records, list):
return False
ConfluxRecord.model_validate(raw_records[0])
return True
except (orjson.JSONDecodeError, ValidationError) as e:
_logger.debug(f"Failed to validate Conflux JSON array: {e!r}")
return False

def load_dataset(self) -> dict[str, list[ConfluxRecord]]:
"""Load and group Conflux records by agent_id."""
path = Path(self.filename)
if path.is_dir():
return self._load_directory(path)
return self._load_single_file(self.filename)

def _load_directory(self, path: Path) -> dict[str, list[ConfluxRecord]]:
"""Load all JSON files in a directory as independent sessions."""
json_files = sorted(path.glob("*.json"))
if not json_files:
raise FileNotFoundError(
f"No .json files found in directory: {self.filename}"
)

all_groups: dict[str, list[ConfluxRecord]] = {}
for file_idx, json_file in enumerate(json_files):
file_groups = self._load_single_file(str(json_file), prefix=f"f{file_idx}_")
all_groups.update(file_groups)

total_records = sum(len(recs) for recs in all_groups.values())
self.info(
f"Loaded {len(all_groups)} agent threads from "
f"{len(json_files)} files ({total_records} total records) in {path.name}/"
)
return all_groups

def _load_single_file(
self, filename: str, prefix: str = ""
) -> dict[str, list[ConfluxRecord]]:
"""Load and group records from a single JSON file."""
raw_records: list[dict[str, Any]] = orjson.loads(Path(filename).read_bytes())

include_utility = self.user_config.input.conflux_include_utility_calls

groups: dict[str, list[ConfluxRecord]] = {}
utility_count = 0

for raw in raw_records:
record = ConfluxRecord.model_validate(raw)
if record.agent_id is not None:
key = f"{prefix}{record.agent_id}"
groups.setdefault(key, []).append(record)
else:
if include_utility:
groups[f"{prefix}_utility_{utility_count}"] = [record]
utility_count += 1

for records in groups.values():
records.sort(key=lambda r: r.timestamp)

if not prefix:
total_records = sum(len(recs) for recs in groups.values())
action = "included" if include_utility else "skipped"
utility_label = f"{utility_count} utility calls {action}"
self.info(
f"Loaded {len(groups)} agent threads + {utility_label} "
f"({total_records} total records)"
)

return groups

def convert_to_conversations(
self, data: dict[str, list[ConfluxRecord]]
) -> list[Conversation]:
"""Convert grouped Conflux records to Conversation objects."""
conversations = [
self._build_conversation(agent_id, records)
for agent_id, records in data.items()
]

total_turns = sum(len(c.turns) for c in conversations)
self.info(
f"Converted {len(conversations)} conversations ({total_turns} total turns)"
)
return conversations

def _build_conversation(
self,
agent_id: str,
records: list[ConfluxRecord],
) -> Conversation:
"""Build a Conversation from a list of ConfluxRecords for one agent."""
conversation = Conversation(session_id=f"conflux_{agent_id}")

for record in records:
input_tokens = record.tokens.input if record.tokens else None

max_tokens = None
if record.tokens is not None:
total_output = record.tokens.output + record.tokens.output_reasoning
max_tokens = total_output or None

turn = Turn(
timestamp=record.timestamp,
max_tokens=max_tokens,
input_tokens=input_tokens,
raw_messages=record.messages,
raw_tools=record.tools or None,
extra_params=self._extract_extra_params(record),
)
conversation.turns.append(turn)

return conversation

@staticmethod
def _extract_extra_params(record: ConfluxRecord) -> dict[str, Any] | None:
"""Extract per-turn hyperparameter overrides from a ConfluxRecord."""
if not record.hyperparameters:
return None
params = {
k: v
for k, v in record.hyperparameters.items()
if k not in _EXTRA_PARAMS_SKIP and v is not None
}
return params or None
Loading
Loading