Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions src/aiperf/common/hash_id_random_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Hash-ID-based random generator for parallel processing with reproducibility.

Enables parallel processing of traces with hash_ids while maintaining
reproducibility. Each (trace_id, hash_id) pair produces a deterministic random
sequence regardless of worker count or processing order.

Architecture:
Global Seed -> Base RNG -> (trace_id, hash_id) -> Deterministic tokens

The trace_id (typically a content hash of the trace file) ensures that different
trace files with overlapping hash_id values produce different content, while the
same trace file always produces identical results.
"""

import hashlib

from aiperf.common.random_generator import RandomGenerator

__all__ = ["HashIdRandomGenerator"]


class _DisabledNumpyRNG:
"""Raises on any attribute access to prevent NumPy RNG usage."""

def __getattr__(self, name):
raise RuntimeError(
"HashIdRandomGenerator does not support NumPy RNG operations. "
"Use Python RNG methods (randrange, choice, etc.) instead."
)


class HashIdRandomGenerator(RandomGenerator):
"""RandomGenerator that re-seeds deterministically per (trace_id, hash_id).

Designed for parallel processing where multiple workers need to generate
identical content for the same hash_id within a trace file.

Thread Safety:
NOT thread-safe. Each worker process must have its own instance.
"""

@classmethod
def from_base_rng(cls, base_rng: RandomGenerator) -> "HashIdRandomGenerator":
"""Create from a base RandomGenerator (typically from rng.derive())."""
base_seed = base_rng.seed or base_rng.randrange(0, 2**64)
return cls(base_seed, _internal=True)
Comment on lines +46 to +49
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Read the file to see the actual code at lines 46-49
cat -n src/aiperf/common/hash_id_random_generator.py | head -60 | tail -20

Repository: ai-dynamo/aiperf

Length of output: 1015


🏁 Script executed:

# Find the RandomGenerator class definition to understand the seed attribute
rg "class RandomGenerator" -A 20

Repository: ai-dynamo/aiperf

Length of output: 1810


🏁 Script executed:

# Search for how seed is initialized and used
rg "seed" src/aiperf/common/hash_id_random_generator.py -B 2 -A 2

Repository: ai-dynamo/aiperf

Length of output: 1290


🏁 Script executed:

# Check the RandomGenerator class definition fully, including seed property/attribute
cat -n src/aiperf/common/random_generator.py | head -50

Repository: ai-dynamo/aiperf

Length of output: 2245


🏁 Script executed:

# Check if seed is a property or stored attribute
rg "self.seed" src/aiperf/common/random_generator.py -B 2 -A 2

Repository: ai-dynamo/aiperf

Length of output: 42


🏁 Script executed:

# Check if there are any tests that show intended behavior with seed=0
rg "seed.*=.*0" src/ -t py --max-count 20

Repository: ai-dynamo/aiperf

Length of output: 262


🏁 Script executed:

# Get full RandomGenerator class definition
cat -n src/aiperf/common/random_generator.py | head -150

Repository: ai-dynamo/aiperf

Length of output: 6423


🏁 Script executed:

# Check how seed attribute is defined and stored in RandomGenerator
rg "self\.seed\s*=" src/aiperf/common/random_generator.py -B 5 -A 5

Repository: ai-dynamo/aiperf

Length of output: 42


🏁 Script executed:

# Look for any usage of from_base_rng to understand intended behavior
rg "from_base_rng" --max-count 10

Repository: ai-dynamo/aiperf

Length of output: 1499


🏁 Script executed:

# Check the tests to see if any test seed=0 behavior
cat -n tests/unit/common/test_hash_id_random_generator.py | head -100

Repository: ai-dynamo/aiperf

Length of output: 4665


🏁 Script executed:

# Look for how from_base_rng is called in real code
cat -n src/aiperf/dataset/generator/prompt.py | grep -A 5 -B 5 "from_base_rng"

Repository: ai-dynamo/aiperf

Length of output: 680


Fix truthiness check to preserve explicit seed 0.

Line 48 uses or to evaluate seed, which treats 0 as falsy despite being a valid seed documented in the constructor. This breaks reproducibility when seed is intentionally set to 0.

Proposed fix
-        base_seed = base_rng.seed or base_rng.randrange(0, 2**64)
+        base_seed = (
+            base_rng.seed if base_rng.seed is not None else base_rng.randrange(0, 2**64)
+        )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/common/hash_id_random_generator.py` around lines 46 - 49, The
truthiness check in HashIdRandomGenerator.from_base_rng incorrectly treats seed
0 as falsy; change the logic to preserve an explicit seed of 0 by testing for
None instead of using `or`. Specifically, in from_base_rng use a None check on
base_rng.seed (e.g., base_seed = base_rng.seed if base_rng.seed is not None else
base_rng.randrange(...)) so that a seed value of 0 is kept; this references the
from_base_rng method and the base_rng.seed attribute on RandomGenerator.


def __init__(self, base_seed: int, *, _internal: bool = False):
super().__init__(base_seed, _internal=_internal)
self._numpy_rng = _DisabledNumpyRNG()
self._trace_id: str = ""

def set_trace_id(self, trace_id: str) -> None:
"""Set trace identifier to scope hash_ids to a specific trace file.

Args:
trace_id: Content hash or unique identifier for the trace file.
Different trace files must use different trace_ids.
"""
self._trace_id = trace_id

def reseed_for_hash_id(self, hash_id: int) -> None:
"""Re-seed RNG deterministically for a specific hash_id.

After calling, all random operations use the derived seed until
the next reseed_for_hash_id call.

Args:
hash_id: KV block hash ID from trace data.
"""
seed_bytes = hashlib.sha256(
f"{self.seed}:{self._trace_id}:{hash_id}".encode()
).digest()
self._python_rng.seed(int.from_bytes(seed_bytes[:8], "big"))
21 changes: 21 additions & 0 deletions src/aiperf/common/tokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""HuggingFace tokenizer wrapper with sensible defaults."""

import contextlib
import inspect
import io
import logging
import os
Expand Down Expand Up @@ -47,6 +48,14 @@ def __init__(self, name: str, suggestions: list[tuple[str, int]]) -> None:
)


def _supports_kwarg(obj: object, method_name: str, kwarg: str) -> bool:
"""Check if a method on an object accepts a specific keyword argument."""
method = getattr(obj, method_name, None)
if method is None:
return False
return kwarg in inspect.signature(method).parameters
Comment on lines +51 to +56
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cd /root && find . -name "tokenizer.py" -path "*/aiperf/*" | head -5

Repository: ai-dynamo/aiperf

Length of output: 115


🏁 Script executed:

cat -n src/aiperf/common/tokenizer.py | head -70

Repository: ai-dynamo/aiperf

Length of output: 2839


🏁 Script executed:

cat -n src/aiperf/common/tokenizer.py | sed -n '51,70p'

Repository: ai-dynamo/aiperf

Length of output: 984


🏁 Script executed:

# Check usage of _supports_kwarg
rg "_supports_kwarg" --type py -B 2 -A 2

Repository: ai-dynamo/aiperf

Length of output: 5955


🏁 Script executed:

# Check usage of _apply_kwarg_overrides
rg "_apply_kwarg_overrides" --type py -B 2 -A 2

Repository: ai-dynamo/aiperf

Length of output: 3710


🏁 Script executed:

# Look for tokenizer instantiation and from_pretrained usage
rg "from_pretrained" src/aiperf/common/tokenizer.py -B 2 -A 2

Repository: ai-dynamo/aiperf

Length of output: 1313


Add error handling in _supports_kwarg for non-introspectable tokenizer methods.

Line 56 assumes inspect.signature(method) always succeeds. For some tokenizer backends (custom implementations, C extensions), signature introspection can raise ValueError or TypeError, causing tokenizer loading to fail. Wrap the call in a try-except to gracefully return False on introspection failure, matching the semantics of missing methods.

Proposed fix
 def _supports_kwarg(obj: object, method_name: str, kwarg: str) -> bool:
     """Check if a method on an object accepts a specific keyword argument."""
     method = getattr(obj, method_name, None)
     if method is None:
         return False
-    return kwarg in inspect.signature(method).parameters
+    try:
+        return kwarg in inspect.signature(method).parameters
+    except (TypeError, ValueError):
+        return False
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/common/tokenizer.py` around lines 51 - 56, The helper
_supports_kwarg currently calls inspect.signature(method) without guarding for
introspection failures; update _supports_kwarg to catch exceptions like
ValueError and TypeError raised by inspect.signature and return False in those
cases so non-introspectable tokenizer methods are treated the same as missing
methods (i.e., no kwarg support). Locate the _supports_kwarg function and wrap
the inspect.signature(method).parameters access in a try/except that returns
False on ValueError/TypeError, keeping existing behavior for missing methods.



def _is_offline_mode() -> bool:
"""Check if HuggingFace offline mode is enabled via environment variables."""
return bool(os.environ.get("HF_HUB_OFFLINE", "")) or bool(
Expand Down Expand Up @@ -147,6 +156,16 @@ def _require_init(self) -> None:
if self._tokenizer is None:
raise NotInitializedError("Tokenizer is not initialized.")

def _apply_kwarg_overrides(self) -> None:
"""Override default args for tokenizers that use non-standard kwargs (e.g. Kimi)."""
if self._tokenizer is None:
return
if _supports_kwarg(self._tokenizer, "encode", "allow_special_tokens"):
self._call_args = {"allow_special_tokens": False}
self._encode_args = {"allow_special_tokens": False}
if not _supports_kwarg(self._tokenizer, "decode", "skip_special_tokens"):
self._decode_args = {}

@staticmethod
def resolve_alias(name: str) -> AliasResolutionResult:
"""Resolve a tokenizer name alias to its canonical repository ID."""
Expand Down Expand Up @@ -208,6 +227,7 @@ def from_pretrained(
revision=revision,
)
tokenizer_cls._resolved_name = resolved_name
tokenizer_cls._apply_kwarg_overrides()
except AmbiguousTokenizerNameError:
raise
except Exception as e:
Expand Down Expand Up @@ -285,6 +305,7 @@ class _OfflineModelInfo:
revision=revision,
local_files_only=True,
)
tokenizer_cls._apply_kwarg_overrides()
return tokenizer_cls
finally:
huggingface_hub.model_info = _original_model_info
Expand Down
83 changes: 29 additions & 54 deletions src/aiperf/dataset/composer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Iterator

from aiperf.common import random_generator as rng
from aiperf.common.config import UserConfig
Expand Down Expand Up @@ -41,12 +42,11 @@ def __init__(self, config: UserConfig, tokenizer: Tokenizer | None, **kwargs):
self._turn_sequence_cache: dict[int, tuple[int, int]] = {}

@abstractmethod
def create_dataset(self) -> list[Conversation]:
"""
Create a set of conversation objects from the given configuration.
def create_dataset(self) -> Iterator[Conversation]:
"""Create conversation objects from the given configuration.

Returns:
list[Conversation]: A list of conversation objects.
Yields Conversation objects one at a time so callers can stream
them directly to the backing store without materializing the full list.
"""
...

Expand Down Expand Up @@ -151,63 +151,38 @@ def prefix_prompt_enabled(self) -> bool:
and self.config.input.prompt.prefix_prompt.length > 0
)

def _finalize_conversations(self, conversations: list[Conversation]) -> None:
"""Finalize conversations by adding conversation-level context prompts.

Injects shared system prompts and per-conversation user context prompts.
Note: Turn-level finalization (_finalize_turn) is handled by each composer
according to its needs (eager in synthetic, lazy in custom).

Args:
conversations: List of conversations to finalize
"""
self._inject_context_prompts(conversations)
def _finalize_conversation(
self, conversation: Conversation, session_index: int
) -> None:
"""Inject context prompts into a single conversation.

def _inject_context_prompts(self, conversations: list[Conversation]) -> None:
"""Inject shared system and user context prompts into conversations.

Sets the system_message and context_message fields on Conversation objects,
which endpoint formatters will prepend to the first turn when creating payloads.
Sets the system_message and user_context_message fields, which
endpoint formatters prepend to the first turn when creating payloads.

Args:
conversations: List of conversations to inject prompts into
conversation: Conversation to finalize.
session_index: Position of this conversation in the dataset
(used for per-session user context prompt generation).
"""
if self.prompt_generator is None:
return

config = self.config.input.prompt.prefix_prompt
has_shared_system = config.shared_system_prompt_length is not None
has_user_context = config.user_context_prompt_length is not None

if not (has_shared_system or has_user_context):
return
if config.shared_system_prompt_length is not None:
prompt = self._get_shared_system_prompt()
if prompt:
conversation.system_message = prompt

self.debug(
lambda: f"Injecting context prompts into {len(conversations)} conversations"
)

# Get shared system prompt once (same for all sessions)
shared_system_prompt = None
if has_shared_system:
shared_system_prompt = self.prompt_generator.get_shared_system_prompt()

# Iterate through conversations and set conversation-level fields
for session_index, conversation in enumerate(conversations):
# Set shared system prompt
if shared_system_prompt:
conversation.system_message = shared_system_prompt
self.trace(
lambda conv=conversation: f"Set system_message on conversation {conv.session_id}"
)
if config.user_context_prompt_length is not None:
conversation.user_context_message = (
self.prompt_generator.generate_user_context_prompt(session_index)
)

# Set user context prompt (unique per session)
if has_user_context:
user_context = self.prompt_generator.generate_user_context_prompt(
session_index
)
conversation.user_context_message = user_context
self.trace(
lambda idx=session_index,
conv=conversation: f"Set user_context_message for session {idx} "
f"(conversation {conv.session_id})"
)
def _get_shared_system_prompt(self) -> str | None:
"""Return the shared system prompt, computing and caching on first call."""
if not hasattr(self, "_shared_system_prompt_cache"):
self._shared_system_prompt_cache: str | None = (
self.prompt_generator.get_shared_system_prompt()
)
return self._shared_system_prompt_cache
19 changes: 10 additions & 9 deletions src/aiperf/dataset/composer/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

from collections.abc import Iterator
from pathlib import Path
from typing import Any

Expand All @@ -19,11 +20,12 @@ class CustomDatasetComposer(BaseDatasetComposer):
def __init__(self, config: UserConfig, tokenizer: Tokenizer | None):
super().__init__(config, tokenizer)

def create_dataset(self) -> list[Conversation]:
def create_dataset(self) -> Iterator[Conversation]:
"""Create conversations from a file or directory.

Returns:
list[Conversation]: A list of conversation objects.
Yields conversations one at a time, finalizing each inline so
the caller can stream them directly to the backing store without
materializing the full list.
"""
# TODO: (future) for K8s, we need to transfer file data from SC (across node)
check_file_exists(self.config.input.file)
Expand All @@ -44,14 +46,13 @@ def create_dataset(self) -> list[Conversation]:
dataset = self.loader.load_dataset()
conversations = self.loader.convert_to_conversations(dataset)

# Finalize all turns with metadata (custom datasets need this)
for conversation in conversations:
for session_index, conversation in enumerate(conversations):
# Finalize all turns with metadata (custom datasets need this)
for turn in conversation.turns:
self._finalize_turn(turn)

# Finalize conversation-level context prompts
self._finalize_conversations(conversations)
return conversations
# Finalize conversation-level context prompts
self._finalize_conversation(conversation, session_index)
yield conversation

def _infer_dataset_type(self, file_path: str) -> CustomDatasetType:
"""Infer the custom dataset type from the input file.
Expand Down
21 changes: 9 additions & 12 deletions src/aiperf/dataset/composer/synthetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

from collections.abc import Iterator

from aiperf.common import random_generator as rng
from aiperf.common.config import UserConfig
from aiperf.common.config.config_defaults import InputDefaults
Expand Down Expand Up @@ -39,17 +41,13 @@ def __init__(self, config: UserConfig, tokenizer: Tokenizer | None):
"setting the mean to a positive value."
)

def create_dataset(self) -> list[Conversation]:
def create_dataset(self) -> Iterator[Conversation]:
"""Create a synthetic conversation dataset from the given configuration.

It generates a set of conversations with a varying number of turns,
where each turn contains synthetic text, image, and audio payloads.

Returns:
list[Conversation]: A list of conversation objects.
Yields conversations one at a time, where each conversation contains
a varying number of turns with synthetic text, image, and audio payloads.
"""
conversations = []
for _ in range(self.config.input.conversation.num_dataset_entries):
for session_index in range(self.config.input.conversation.num_dataset_entries):
conversation = Conversation(session_id=self.session_id_generator.next())

num_turns = self._turn_sampler_rng.sample_positive_normal_integer(
Expand All @@ -61,11 +59,10 @@ def create_dataset(self) -> list[Conversation]:
for turn_idx in range(num_turns):
turn = self._create_turn(is_first=(turn_idx == 0))
conversation.turns.append(turn)
conversations.append(conversation)

# Finalize all conversations (turn metadata + context prompts)
self._finalize_conversations(conversations)
return conversations
# Finalize conversation-level context prompts
self._finalize_conversation(conversation, session_index)
yield conversation

def _create_turn(self, is_first: bool) -> Turn:
"""Create a turn object that contains synthetic payloads to send.
Expand Down
13 changes: 7 additions & 6 deletions src/aiperf/dataset/composer/synthetic_rankings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

from collections.abc import Iterator

from aiperf.common import random_generator as rng
from aiperf.common.config import InputDefaults, UserConfig
from aiperf.common.models import Conversation, Text, Turn
Expand Down Expand Up @@ -33,26 +35,25 @@ def __init__(self, config: UserConfig, tokenizer: Tokenizer | None):
f"Using default sampling strategy for synthetic rankings dataset: {InputDefaults.DATASET_SAMPLING_STRATEGY}"
)

def create_dataset(self) -> list[Conversation]:
def create_dataset(self) -> Iterator[Conversation]:
"""Generate synthetic dataset for the rankings endpoint.

Each conversation contains one turn with one query and multiple passages.
"""
conversations: list[Conversation] = []
num_entries = self.config.input.conversation.num_dataset_entries
num_passages_mean = self.config.input.rankings.passages.mean
num_passages_std = self.config.input.rankings.passages.stddev

for _ in range(num_entries):
for session_index in range(num_entries):
num_passages = self._passages_rng.sample_positive_normal_integer(
num_passages_mean, num_passages_std
)
conversation = Conversation(session_id=self.session_id_generator.next())
turn = self._create_turn(num_passages=num_passages)
conversation.turns.append(turn)
conversations.append(conversation)

return conversations
# Finalize conversation-level context prompts
self._finalize_conversation(conversation, session_index)
yield conversation

def _create_turn(self, num_passages: int) -> Turn:
"""Create a single ranking turn with one synthetic query and multiple synthetic passages.
Expand Down
Loading
Loading