Skip to content

feat: 2-3x faster trace dataset loading via HashIdRandomGenerator#724

Open
ajcasagrande wants to merge 10 commits intomainfrom
ajc/even-faster-trace-datasets
Open

feat: 2-3x faster trace dataset loading via HashIdRandomGenerator#724
ajcasagrande wants to merge 10 commits intomainfrom
ajc/even-faster-trace-datasets

Conversation

@ajcasagrande
Copy link
Contributor

@ajcasagrande ajcasagrande commented Feb 27, 2026

Tip

Full 23k mooncake_trace.jsonl:
Linux i9-14900k: From 16s down to ~6-7s
MacBook Pro M4: From 20s down to 10s

Add HashIdRandomGenerator that deterministically seeds per (trace_id, hash_id) pair, enabling parallel token generation across workers without lock contention or cache coordination. Extract parallel_convert module to leverage multiprocessing with shared-memory token corpus. Stream conversations through composers to the backing store instead of materializing the full dataset in memory.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added deterministic parallel dataset processing for reproducible results across multiple workers
    • Implemented streaming dataset composition to reduce memory footprint
  • Improvements

    • Enhanced tokenizer compatibility with automatic keyword argument handling
    • Optimized dataset loading using memory-backed storage instead of in-memory structures
  • Removals

    • Removed legacy parallel decoding utility; replaced with improved parallel trace conversion

Add support for Alibaba Bailian trace format (bailian_trace) with multi-turn
conversation linking via chat_id/parent_chat_id and 16-token SipHash blocks.
Extract common trace loading logic into BaseTraceDatasetLoader to share
infrastructure between Mooncake and Bailian loaders. Make block_size
configurable per-loader via plugin metadata default_block_size, and generalize
mooncake-specific validation to work with any trace dataset type.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
- Remove .value on enum in error message (use string-based enum directly)
- Validate mean is not None before cached prompt generation
- Add cycle detection in Bailian find_root to prevent infinite loops
- Reset filtering counters per load_dataset() call to avoid over-reporting

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
@github-actions
Copy link

github-actions bot commented Feb 27, 2026

Try out this PR

Quick install:

pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@766253cc79dadf4535b6e294ff176feac8dc437a

Recommended with virtual environment (using uv):

uv venv --python 3.12 && source .venv/bin/activate
uv pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@766253cc79dadf4535b6e294ff176feac8dc437a

Last updated for commit: 766253cBrowse code

@github-actions github-actions bot added the feat label Feb 27, 2026
Tokenizers like Kimi use `allow_special_tokens` instead of the standard
`add_special_tokens` for encode, and their `decode()` doesn't accept
`skip_special_tokens`. Passing unsupported kwargs triggers the slow
`PreTrainedTokenizer.super()` fallback path, causing ~5000x slower
decode (~204ms vs 0.04ms per 4500 tokens).

After loading, inspect the tokenizer's method signatures and override
the default call/encode/decode args to match.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Gracefully handle unrecognized 'type' field values during dataset type
inference by skipping the explicit type shortcut and falling through to
structural detection. This fixes Bailian traces (which use "type" for
request type, not dataset type) auto-detecting correctly. Also updates
CLI descriptions to reference both trace formats and adds tests for the
type field fallback behavior.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
@ajcasagrande ajcasagrande force-pushed the ajc/even-faster-trace-datasets branch from eb9491c to 3617fdb Compare February 28, 2026 06:44
Add HashIdRandomGenerator that deterministically seeds per (trace_id,
hash_id) pair, enabling parallel token generation across workers without
lock contention or cache coordination. Extract parallel_convert module
to leverage multiprocessing with shared-memory token corpus. Stream
conversations through composers to the backing store instead of
materializing the full dataset in memory.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
@ajcasagrande ajcasagrande reopened this Feb 28, 2026
@ajcasagrande
Copy link
Contributor Author

@coderabbitai review

@coderabbitai
Copy link

coderabbitai bot commented Feb 28, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai
Copy link

coderabbitai bot commented Feb 28, 2026

Walkthrough

This pull request introduces a deterministic hash-based random generator, converts dataset composition from batch-based to streaming iterators, replaces parallel token decoding with parallel trace-to-conversation conversion, and adds tokenizer keyword argument override detection. Over thirty files are modified with comprehensive test coverage.

Changes

Cohort / File(s) Summary
Deterministic RNG Infrastructure
src/aiperf/common/hash_id_random_generator.py, tests/unit/common/test_hash_id_random_generator.py
New HashIdRandomGenerator class enabling per-hash_id reseeding for reproducible parallel processing with comprehensive unit tests validating determinism, isolation, and edge cases.
Tokenizer Enhancements
src/aiperf/common/tokenizer.py, tests/unit/common/test_tokenizer_kwarg_overrides.py
Added kwarg override detection (_supports_kwarg) and application (_apply_kwarg_overrides) to handle non-standard tokenizer kwargs like allow_special_tokens and skip_special_tokens across different backends with extensive test coverage.
Dataset Composition Streaming
src/aiperf/dataset/composer/base.py, src/aiperf/dataset/composer/custom.py, src/aiperf/dataset/composer/synthetic.py, src/aiperf/dataset/composer/synthetic_rankings.py
Converted create_dataset from returning list[Conversation] to Iterator[Conversation]; replaced batch finalization with per-conversation finalization and context prompt injection.
Dataset Composition Tests
tests/unit/dataset/composer/test_*.py
Updated tests to accommodate iterator-based dataset generation, refactored finalization test names and implementations to reflect per-conversation processing.
Prompt Generation with Deterministic Sampling
src/aiperf/dataset/generator/prompt.py
Introduced sample_tokens_from_corpus function and HashIdRandomGenerator integration for deterministic per-hash_id token sampling; refactored caching from string-level to token-block-level with configurable block_size parameter.
Parallel Processing Infrastructure
src/aiperf/dataset/loader/parallel_convert.py, tests/unit/dataset/loader/test_parallel_convert.py
New parallel_convert module replaces parallel_decode, enabling parallel trace-to-conversation conversion with shared memory corpus, per-worker HashIdRandomGenerator seeding, and comprehensive test suite with 1000+ lines of test code.
Removed Parallel Decode
src/aiperf/dataset/generator/parallel_decode.py, tests/unit/dataset/generator/test_parallel_decode.py
Removed multiprocessing-based token sequence decoding module (139 lines) and associated tests (269 lines) in favor of HashIdRandomGenerator-driven deterministic sampling.
Trace Loader Refactoring
src/aiperf/dataset/loader/base_trace_loader.py, tests/unit/dataset/loader/test_base_trace_loader.py, src/aiperf/dataset/loader/parallel_convert.py
Updated BaseTraceDatasetLoader to compute file-based trace_id, integrate HashIdRandomGenerator, support parallel conversion with per-worker RNG seeding, and dispatch between single-threaded and parallel paths based on dataset size.
Dataset Manager Streaming
src/aiperf/dataset/dataset_manager.py, tests/unit/dataset/test_dataset_manager.py, tests/unit/dataset/test_dataset_manager_inputs_json.py, tests/unit/dataset/conftest.py
Refactored dataset storage from in-memory dict to streaming writes to mmap-backed storage; made _generate_input_payloads asynchronous; updated fixture setup to use mock async dataset client with DatasetMetadata.
Loader Protocol Updates
src/aiperf/dataset/protocols.py
Widened convert_to_conversations return type from list[Conversation] to Iterable[Conversation].
Other Trace Loaders
tests/unit/dataset/loader/test_bailian_trace.py, tests/unit/dataset/loader/test_trace.py, tests/unit/dataset/test_prompt_generator.py
Updated existing loader tests to remove parallel_decode mocking, wrap iterator results with list(), adjust mock expectations for block_size parameter; refactored prompt generator tests to validate HashIdRandomGenerator integration.
Integration Updates
src/aiperf/dataset/synthesis/rolling_hasher.py
Minor change to forward block_size parameter to prompt_generator.generate when hash_ids are provided.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Poem

🐰 Hops through fields of streaming code,
Where hash-ids pave the random road,
No more batches, just a flow—
Parallel workers, watch them go!
Tokenizers tamed with kwargs bright,
Deterministic seeds alight!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 74.19% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely summarizes the main change: introducing HashIdRandomGenerator to significantly improve trace dataset loading performance by 2-3x.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Base automatically changed from ajc/bailian to main March 3, 2026 00:56
parallel_decode and _init_worker were not forwarding tokenizer config
args, causing failures with tokenizers like Kimi that require
trust_remote_code=True.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/aiperf/dataset/protocols.py (1)

20-20: ⚠️ Potential issue | 🟡 Minor

Align protocol documentation with the new iterable return contract.

At Line 20, the protocol docstring still says “list of Conversation objects,” but the method now returns Iterable[Conversation]. That mismatch can cause incorrect caller assumptions (e.g., len()/indexing always available).

Proposed doc fix
 class CustomDatasetLoaderProtocol(Protocol):
-    """Protocol for custom dataset loaders that load dataset from a file and convert it to a list of Conversation objects."""
+    """Protocol for custom dataset loaders that load data from a file and convert it to an iterable of Conversation objects.
+
+    Implementations may return either a materialized list or a single-pass iterator.
+    """

Also applies to: 49-51

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/protocols.py` at line 20, Update the protocol docstrings
to reflect the new iterable return contract: change any wording that says “list
of Conversation objects” to “Iterable[Conversation]” and explicitly note that
callers should not assume list ops like len() or indexing are available; update
the main protocol docstring (the one starting “Protocol for custom dataset
loaders...”) and the related docblock referenced around lines 49–51 to match
this wording and mention that callers should consume the returned Iterable via
iteration or by materializing it into a list if random access is required.
🧹 Nitpick comments (4)
tests/unit/dataset/loader/test_parallel_convert.py (1)

481-522: Remove unused tmp_path fixture parameter.

The tmp_path parameter is not used in this test. The test creates its own shared memory and doesn't need a temporary directory.

🧹 Remove unused parameter
-    def test_init_worker_sets_up_state(self, sample_corpus_array, tmp_path):
+    def test_init_worker_sets_up_state(self, sample_corpus_array):
         """_init_worker should populate _worker dict with all required fields."""
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/loader/test_parallel_convert.py` around lines 481 - 522,
The test test_init_worker_sets_up_state contains an unused fixture parameter
tmp_path; remove tmp_path from the test signature so the test becomes def
test_init_worker_sets_up_state(self, sample_corpus_array): and update any
references if present, leaving the body that creates shared_memory, constructs
_WorkerInitArgs, calls _init_worker, and asserts on
parallel_convert_mod._worker_state unchanged; ensure the test imports/mocks
(e.g., Tokenizer.from_pretrained) and the cleanup resetting
parallel_convert_mod._worker_state and closing/unlinking shm remain intact.
tests/unit/dataset/test_dataset_manager.py (1)

465-466: Consider completing the verification comments.

The comments "Verify in-memory dataset was not materialized" on lines 465-466 and 490-491 appear to be placeholders without actual assertions. If verification is no longer needed (since the in-memory dataset no longer exists), consider removing these comments or adding actual assertions if verification is intended.

🧹 Remove incomplete verification comments
-        # Verify in-memory dataset was not materialized
-
         # Request should still work via dataset client

Also applies to: 490-491

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/test_dataset_manager.py` around lines 465 - 466, Locate
the two placeholder comments "Verify in-memory dataset was not materialized" in
tests/unit/dataset/test_dataset_manager.py and either remove them or replace
them with concrete assertions: call the test's DatasetManager instance (e.g.,
dataset_manager) and assert the dataset is not marked materialized (e.g., assert
not dataset_manager.is_materialized(dataset_id)) or assert no materialized
file/path exists on disk (e.g., assert not os.path.exists(materialized_path));
if the project uses an internal map (e.g., dataset_manager._materialized or
dataset_manager.materialized_datasets), assert the dataset_id is not present
there instead. Ensure the chosen assertion uses existing public
methods/attributes in the test context (materialize_dataset, is_materialized,
materialized_datasets, or filesystem path helpers) so the verification is real
and not just a comment.
tests/unit/common/test_hash_id_random_generator.py (1)

15-22: Optional cleanup: de-duplicate RNG setup fixture.

Both test classes use the same autouse RNG reset/init fixture; extracting one module-level fixture would keep this file a bit leaner.

Also applies to: 215-222

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/common/test_hash_id_random_generator.py` around lines 15 - 22, The
RNG setup/reset fixture is duplicated across test classes; keep a single
module-level autouse fixture instead: define the existing setup_rng fixture once
at the top of the test module (using `@pytest.fixture`(autouse=True)) that calls
rng.reset(), rng.init(42), yield, then rng.reset(), and remove the duplicate
fixture definitions (e.g., the other occurrence around lines 215-222); keep
references to the same symbols (setup_rng, rng.reset, rng.init) so all tests get
the shared setup.
tests/unit/dataset/loader/test_base_trace_loader.py (1)

243-348: Refactor threshold dispatch tests into one parameterized test.

These tests are data-driven variants of the same assertion pattern. Consolidating with @pytest.mark.parametrize will reduce duplication and keep boundary coverage clearer.

Possible consolidation
+    `@pytest.mark.parametrize`(
+        ("trace_count", "expect_parallel"),
+        [
+            (_MIN_TRACES_FOR_PARALLEL + 1, True),
+            (_MIN_TRACES_FOR_PARALLEL, True),
+            (_MIN_TRACES_FOR_PARALLEL - 1, False),
+        ],
+    )
+    def test_threshold_dispatch_behavior(
+        self,
+        create_jsonl_file,
+        mock_prompt_generator,
+        default_user_config,
+        trace_count,
+        expect_parallel,
+    ):
+        content = [
+            f'{{"input_length": 100, "hash_ids": [{i}], "timestamp": {i * 1000}}}'
+            for i in range(trace_count)
+        ]
+        filename = create_jsonl_file(content)
+        loader = MooncakeTraceDatasetLoader(
+            filename=filename,
+            user_config=default_user_config,
+            prompt_generator=mock_prompt_generator,
+        )
+        data = loader.load_dataset()
+        with patch("aiperf.dataset.loader.base_trace_loader.parallel_convert", return_value=[]) as mock_parallel:
+            list(loader.convert_to_conversations(data))
+            if expect_parallel:
+                mock_parallel.assert_called_once()
+            else:
+                mock_parallel.assert_not_called()

As per coding guidelines, "Use @pytest.mark.parametrize for data-driven tests".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/loader/test_base_trace_loader.py` around lines 243 - 348,
Tests test_large_dataset_uses_parallel, test_exactly_threshold_uses_parallel,
and test_below_threshold_uses_single_threaded are duplicate threshold variations
and should be consolidated into one pytest.mark.parametrize test; create a
single test (e.g., test_threshold_dispatch_parametrized) that takes a
trace_count and expected_parallel boolean, build content using
_MIN_TRACES_FOR_PARALLEL +/- values, instantiate MooncakeTraceDatasetLoader as
before, call loader.load_dataset() and then patch
aiperf.dataset.loader.base_trace_loader.parallel_convert and call
list(loader.convert_to_conversations(data)), finally assert
mock_parallel.assert_called_once() when expected_parallel is True or
mock_parallel.assert_not_called() when False; keep _MIN_TRACES_FOR_PARALLEL,
MooncakeTraceDatasetLoader, convert_to_conversations, and parallel_convert
references to locate code.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/aiperf/common/hash_id_random_generator.py`:
- Around line 46-49: The truthiness check in HashIdRandomGenerator.from_base_rng
incorrectly treats seed 0 as falsy; change the logic to preserve an explicit
seed of 0 by testing for None instead of using `or`. Specifically, in
from_base_rng use a None check on base_rng.seed (e.g., base_seed = base_rng.seed
if base_rng.seed is not None else base_rng.randrange(...)) so that a seed value
of 0 is kept; this references the from_base_rng method and the base_rng.seed
attribute on RandomGenerator.

In `@src/aiperf/common/tokenizer.py`:
- Around line 51-56: The helper _supports_kwarg currently calls
inspect.signature(method) without guarding for introspection failures; update
_supports_kwarg to catch exceptions like ValueError and TypeError raised by
inspect.signature and return False in those cases so non-introspectable
tokenizer methods are treated the same as missing methods (i.e., no kwarg
support). Locate the _supports_kwarg function and wrap the
inspect.signature(method).parameters access in a try/except that returns False
on ValueError/TypeError, keeping existing behavior for missing methods.

In `@src/aiperf/dataset/dataset_manager.py`:
- Around line 159-160: The Kubernetes-mode log message in dataset_manager.py
builds an f-string containing self._conversation_count and currently ends with
an extra closing parenthesis in the literal "backing store)"; update the string
used in the logging call (the f"...Kubernetes mode: skipped local client,
compressed {self._conversation_count} conversations into backing store)") to
remove the stray ')' so the message reads without the extra parenthesis.

In `@src/aiperf/dataset/generator/prompt.py`:
- Around line 289-299: The cache key uses only hash_id which breaks when
current_block_size varies; change the caching to include current_block_size
(e.g., use a tuple key like (hash_id, current_block_size)) so that self._cache
stores size-specific samples; update any annotations for self._cache
accordingly, ensure you still call
self._hash_id_corpus_rng.reseed_for_hash_id(hash_id) and populate the cache
using sample_tokens_from_corpus(self._tokenized_corpus, current_block_size,
self._hash_id_corpus_rng, self.tokenizer.block_separation_token_id), and
continue to extend final_prompt with the cached value.

In `@src/aiperf/dataset/loader/parallel_convert.py`:
- Around line 138-146: Validate that final_block_size is positive before using
it: compute final_block_size as currently done (in the block handling hash_ids
inside parallel_convert.py), then check that input_length >= (len(hash_ids) - 1)
* block_size (or equivalently final_block_size > 0) and raise a clear ValueError
(or return/skip the trace) if the check fails; reference the variables hash_ids,
input_length, final_block_size and the functions get_block_tokens/decode so
maintainers can locate where to add the validation and the error message
describing the invalid sizes (include values for input_length, len(hash_ids),
and block_size in the error).

In `@tests/unit/dataset/loader/test_base_trace_loader.py`:
- Around line 38-72: Add explicit type annotations to all untyped fixtures and
test functions in this file: annotate create_jsonl_file with its
return/generator type (e.g., Iterator[Callable[[List[str]], str]]), mark
default_user_config() -> UserConfig, mock_prompt_generator() -> Mock (or
PromptGenerator protocol), and add -> None return annotations to all test_*
functions and typed parameters where applicable; update any local variables'
types if needed to satisfy type checkers (references: create_jsonl_file,
default_user_config, mock_prompt_generator, and all test_* functions).
- Around line 422-442: The test test_turn_fields_populated is missing an
assertion for the turn.delay field; update the test to assert that turn.delay
equals the expected value produced by MooncakeTraceDatasetLoader (i.e., the
delay mapping implemented in MooncakeTraceDatasetLoader.convert_to_conversations
or loader.load_dataset), for example add a line like assert turn.delay ==
<expected_delay> where <expected_delay> is the loader's computed/default delay
for the provided record (commonly 0 if no delay field is present).

---

Outside diff comments:
In `@src/aiperf/dataset/protocols.py`:
- Line 20: Update the protocol docstrings to reflect the new iterable return
contract: change any wording that says “list of Conversation objects” to
“Iterable[Conversation]” and explicitly note that callers should not assume list
ops like len() or indexing are available; update the main protocol docstring
(the one starting “Protocol for custom dataset loaders...”) and the related
docblock referenced around lines 49–51 to match this wording and mention that
callers should consume the returned Iterable via iteration or by materializing
it into a list if random access is required.

---

Nitpick comments:
In `@tests/unit/common/test_hash_id_random_generator.py`:
- Around line 15-22: The RNG setup/reset fixture is duplicated across test
classes; keep a single module-level autouse fixture instead: define the existing
setup_rng fixture once at the top of the test module (using
`@pytest.fixture`(autouse=True)) that calls rng.reset(), rng.init(42), yield, then
rng.reset(), and remove the duplicate fixture definitions (e.g., the other
occurrence around lines 215-222); keep references to the same symbols
(setup_rng, rng.reset, rng.init) so all tests get the shared setup.

In `@tests/unit/dataset/loader/test_base_trace_loader.py`:
- Around line 243-348: Tests test_large_dataset_uses_parallel,
test_exactly_threshold_uses_parallel, and
test_below_threshold_uses_single_threaded are duplicate threshold variations and
should be consolidated into one pytest.mark.parametrize test; create a single
test (e.g., test_threshold_dispatch_parametrized) that takes a trace_count and
expected_parallel boolean, build content using _MIN_TRACES_FOR_PARALLEL +/-
values, instantiate MooncakeTraceDatasetLoader as before, call
loader.load_dataset() and then patch
aiperf.dataset.loader.base_trace_loader.parallel_convert and call
list(loader.convert_to_conversations(data)), finally assert
mock_parallel.assert_called_once() when expected_parallel is True or
mock_parallel.assert_not_called() when False; keep _MIN_TRACES_FOR_PARALLEL,
MooncakeTraceDatasetLoader, convert_to_conversations, and parallel_convert
references to locate code.

In `@tests/unit/dataset/loader/test_parallel_convert.py`:
- Around line 481-522: The test test_init_worker_sets_up_state contains an
unused fixture parameter tmp_path; remove tmp_path from the test signature so
the test becomes def test_init_worker_sets_up_state(self, sample_corpus_array):
and update any references if present, leaving the body that creates
shared_memory, constructs _WorkerInitArgs, calls _init_worker, and asserts on
parallel_convert_mod._worker_state unchanged; ensure the test imports/mocks
(e.g., Tokenizer.from_pretrained) and the cleanup resetting
parallel_convert_mod._worker_state and closing/unlinking shm remain intact.

In `@tests/unit/dataset/test_dataset_manager.py`:
- Around line 465-466: Locate the two placeholder comments "Verify in-memory
dataset was not materialized" in tests/unit/dataset/test_dataset_manager.py and
either remove them or replace them with concrete assertions: call the test's
DatasetManager instance (e.g., dataset_manager) and assert the dataset is not
marked materialized (e.g., assert not
dataset_manager.is_materialized(dataset_id)) or assert no materialized file/path
exists on disk (e.g., assert not os.path.exists(materialized_path)); if the
project uses an internal map (e.g., dataset_manager._materialized or
dataset_manager.materialized_datasets), assert the dataset_id is not present
there instead. Ensure the chosen assertion uses existing public
methods/attributes in the test context (materialize_dataset, is_materialized,
materialized_datasets, or filesystem path helpers) so the verification is real
and not just a comment.

ℹ️ Review info
Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: f5b177a5-4d79-4a6e-ae16-e09df52a497b

📥 Commits

Reviewing files that changed from the base of the PR and between 3601096 and 766253c.

📒 Files selected for processing (28)
  • src/aiperf/common/hash_id_random_generator.py
  • src/aiperf/common/tokenizer.py
  • src/aiperf/dataset/composer/base.py
  • src/aiperf/dataset/composer/custom.py
  • src/aiperf/dataset/composer/synthetic.py
  • src/aiperf/dataset/composer/synthetic_rankings.py
  • src/aiperf/dataset/dataset_manager.py
  • src/aiperf/dataset/generator/parallel_decode.py
  • src/aiperf/dataset/generator/prompt.py
  • src/aiperf/dataset/loader/base_trace_loader.py
  • src/aiperf/dataset/loader/parallel_convert.py
  • src/aiperf/dataset/protocols.py
  • src/aiperf/dataset/synthesis/rolling_hasher.py
  • tests/unit/common/test_hash_id_random_generator.py
  • tests/unit/common/test_tokenizer_kwarg_overrides.py
  • tests/unit/dataset/composer/test_base_composer.py
  • tests/unit/dataset/composer/test_custom_composer.py
  • tests/unit/dataset/composer/test_synthetic_composer.py
  • tests/unit/dataset/composer/test_synthetic_rankings_composer.py
  • tests/unit/dataset/conftest.py
  • tests/unit/dataset/generator/test_parallel_decode.py
  • tests/unit/dataset/generator/test_prompt_generator.py
  • tests/unit/dataset/loader/test_bailian_trace.py
  • tests/unit/dataset/loader/test_base_trace_loader.py
  • tests/unit/dataset/loader/test_parallel_convert.py
  • tests/unit/dataset/loader/test_trace.py
  • tests/unit/dataset/test_dataset_manager.py
  • tests/unit/dataset/test_dataset_manager_inputs_json.py
💤 Files with no reviewable changes (2)
  • src/aiperf/dataset/generator/parallel_decode.py
  • tests/unit/dataset/generator/test_parallel_decode.py

Comment on lines +46 to +49
def from_base_rng(cls, base_rng: RandomGenerator) -> "HashIdRandomGenerator":
"""Create from a base RandomGenerator (typically from rng.derive())."""
base_seed = base_rng.seed or base_rng.randrange(0, 2**64)
return cls(base_seed, _internal=True)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Read the file to see the actual code at lines 46-49
cat -n src/aiperf/common/hash_id_random_generator.py | head -60 | tail -20

Repository: ai-dynamo/aiperf

Length of output: 1015


🏁 Script executed:

# Find the RandomGenerator class definition to understand the seed attribute
rg "class RandomGenerator" -A 20

Repository: ai-dynamo/aiperf

Length of output: 1810


🏁 Script executed:

# Search for how seed is initialized and used
rg "seed" src/aiperf/common/hash_id_random_generator.py -B 2 -A 2

Repository: ai-dynamo/aiperf

Length of output: 1290


🏁 Script executed:

# Check the RandomGenerator class definition fully, including seed property/attribute
cat -n src/aiperf/common/random_generator.py | head -50

Repository: ai-dynamo/aiperf

Length of output: 2245


🏁 Script executed:

# Check if seed is a property or stored attribute
rg "self.seed" src/aiperf/common/random_generator.py -B 2 -A 2

Repository: ai-dynamo/aiperf

Length of output: 42


🏁 Script executed:

# Check if there are any tests that show intended behavior with seed=0
rg "seed.*=.*0" src/ -t py --max-count 20

Repository: ai-dynamo/aiperf

Length of output: 262


🏁 Script executed:

# Get full RandomGenerator class definition
cat -n src/aiperf/common/random_generator.py | head -150

Repository: ai-dynamo/aiperf

Length of output: 6423


🏁 Script executed:

# Check how seed attribute is defined and stored in RandomGenerator
rg "self\.seed\s*=" src/aiperf/common/random_generator.py -B 5 -A 5

Repository: ai-dynamo/aiperf

Length of output: 42


🏁 Script executed:

# Look for any usage of from_base_rng to understand intended behavior
rg "from_base_rng" --max-count 10

Repository: ai-dynamo/aiperf

Length of output: 1499


🏁 Script executed:

# Check the tests to see if any test seed=0 behavior
cat -n tests/unit/common/test_hash_id_random_generator.py | head -100

Repository: ai-dynamo/aiperf

Length of output: 4665


🏁 Script executed:

# Look for how from_base_rng is called in real code
cat -n src/aiperf/dataset/generator/prompt.py | grep -A 5 -B 5 "from_base_rng"

Repository: ai-dynamo/aiperf

Length of output: 680


Fix truthiness check to preserve explicit seed 0.

Line 48 uses or to evaluate seed, which treats 0 as falsy despite being a valid seed documented in the constructor. This breaks reproducibility when seed is intentionally set to 0.

Proposed fix
-        base_seed = base_rng.seed or base_rng.randrange(0, 2**64)
+        base_seed = (
+            base_rng.seed if base_rng.seed is not None else base_rng.randrange(0, 2**64)
+        )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/common/hash_id_random_generator.py` around lines 46 - 49, The
truthiness check in HashIdRandomGenerator.from_base_rng incorrectly treats seed
0 as falsy; change the logic to preserve an explicit seed of 0 by testing for
None instead of using `or`. Specifically, in from_base_rng use a None check on
base_rng.seed (e.g., base_seed = base_rng.seed if base_rng.seed is not None else
base_rng.randrange(...)) so that a seed value of 0 is kept; this references the
from_base_rng method and the base_rng.seed attribute on RandomGenerator.

Comment on lines +51 to +56
def _supports_kwarg(obj: object, method_name: str, kwarg: str) -> bool:
"""Check if a method on an object accepts a specific keyword argument."""
method = getattr(obj, method_name, None)
if method is None:
return False
return kwarg in inspect.signature(method).parameters
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cd /root && find . -name "tokenizer.py" -path "*/aiperf/*" | head -5

Repository: ai-dynamo/aiperf

Length of output: 115


🏁 Script executed:

cat -n src/aiperf/common/tokenizer.py | head -70

Repository: ai-dynamo/aiperf

Length of output: 2839


🏁 Script executed:

cat -n src/aiperf/common/tokenizer.py | sed -n '51,70p'

Repository: ai-dynamo/aiperf

Length of output: 984


🏁 Script executed:

# Check usage of _supports_kwarg
rg "_supports_kwarg" --type py -B 2 -A 2

Repository: ai-dynamo/aiperf

Length of output: 5955


🏁 Script executed:

# Check usage of _apply_kwarg_overrides
rg "_apply_kwarg_overrides" --type py -B 2 -A 2

Repository: ai-dynamo/aiperf

Length of output: 3710


🏁 Script executed:

# Look for tokenizer instantiation and from_pretrained usage
rg "from_pretrained" src/aiperf/common/tokenizer.py -B 2 -A 2

Repository: ai-dynamo/aiperf

Length of output: 1313


Add error handling in _supports_kwarg for non-introspectable tokenizer methods.

Line 56 assumes inspect.signature(method) always succeeds. For some tokenizer backends (custom implementations, C extensions), signature introspection can raise ValueError or TypeError, causing tokenizer loading to fail. Wrap the call in a try-except to gracefully return False on introspection failure, matching the semantics of missing methods.

Proposed fix
 def _supports_kwarg(obj: object, method_name: str, kwarg: str) -> bool:
     """Check if a method on an object accepts a specific keyword argument."""
     method = getattr(obj, method_name, None)
     if method is None:
         return False
-    return kwarg in inspect.signature(method).parameters
+    try:
+        return kwarg in inspect.signature(method).parameters
+    except (TypeError, ValueError):
+        return False
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/common/tokenizer.py` around lines 51 - 56, The helper
_supports_kwarg currently calls inspect.signature(method) without guarding for
introspection failures; update _supports_kwarg to catch exceptions like
ValueError and TypeError raised by inspect.signature and return False in those
cases so non-introspectable tokenizer methods are treated the same as missing
methods (i.e., no kwarg support). Locate the _supports_kwarg function and wrap
the inspect.signature(method).parameters access in a try/except that returns
False on ValueError/TypeError, keeping existing behavior for missing methods.

Comment on lines +159 to +160
f"Kubernetes mode: skipped local client, compressed {self._conversation_count} "
"conversations into backing store)"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix the log message typo.

Line 160 has an extra closing parenthesis in the message ("backing store)"), which makes logs look sloppy.

Proposed fix
-                f"Kubernetes mode: skipped local client, compressed {self._conversation_count} "
-                "conversations into backing store)"
+                f"Kubernetes mode: skipped local client, compressed {self._conversation_count} "
+                "conversations into backing store"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/dataset_manager.py` around lines 159 - 160, The
Kubernetes-mode log message in dataset_manager.py builds an f-string containing
self._conversation_count and currently ends with an extra closing parenthesis in
the literal "backing store)"; update the string used in the logging call (the
f"...Kubernetes mode: skipped local client, compressed
{self._conversation_count} conversations into backing store)") to remove the
stray ')' so the message reads without the extra parenthesis.

Comment on lines 289 to 299
if hash_id not in self._cache:
# To ensure that the prompt doesn't merge chunks, we insert a BOS or EOS token
# at the beginning. Length is maintained and the prompt generates the expected
# number of tokens. If no BOS or EOS token is available, we don't insert one.
prompt_tokens: list[int] = []
if self.tokenizer.block_separation_token_id is not None:
prompt_tokens += [self.tokenizer.block_separation_token_id]
prompt_tokens += self._sample_tokens(current_block_size - 1)
else:
prompt_tokens += self._sample_tokens(current_block_size)

self._cache[hash_id] = prompt_tokens # store to cache
self._hash_id_corpus_rng.reseed_for_hash_id(hash_id)
self._cache[hash_id] = sample_tokens_from_corpus(
self._tokenized_corpus,
current_block_size,
self._hash_id_corpus_rng,
self.tokenizer.block_separation_token_id,
)

final_prompt.extend(self._cache[hash_id])

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Cache key is underspecified for variable block sizes.

Line 289 caches by hash_id only, but current_block_size can differ (final partial block, or different block_size across calls). This can reuse a block with the wrong length and corrupt prompt sizing/content.

Proposed fix
-            if hash_id not in self._cache:
+            cache_key = (hash_id, current_block_size)
+            if cache_key not in self._cache:
                 self._hash_id_corpus_rng.reseed_for_hash_id(hash_id)
-                self._cache[hash_id] = sample_tokens_from_corpus(
+                self._cache[cache_key] = sample_tokens_from_corpus(
                     self._tokenized_corpus,
                     current_block_size,
                     self._hash_id_corpus_rng,
                     self.tokenizer.block_separation_token_id,
                 )
-
-            final_prompt.extend(self._cache[hash_id])
+            final_prompt.extend(self._cache[cache_key])

Also update the cache annotation accordingly:

-        self._cache: dict[int, list[int]] = {}
+        self._cache: dict[tuple[int, int], list[int]] = {}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/generator/prompt.py` around lines 289 - 299, The cache key
uses only hash_id which breaks when current_block_size varies; change the
caching to include current_block_size (e.g., use a tuple key like (hash_id,
current_block_size)) so that self._cache stores size-specific samples; update
any annotations for self._cache accordingly, ensure you still call
self._hash_id_corpus_rng.reseed_for_hash_id(hash_id) and populate the cache
using sample_tokens_from_corpus(self._tokenized_corpus, current_block_size,
self._hash_id_corpus_rng, self.tokenizer.block_separation_token_id), and
continue to extend final_prompt with the cached value.

Comment on lines +138 to +146
hash_ids = trace["hash_ids"]
input_length = trace["input_length"]
final_block_size = input_length - (len(hash_ids) - 1) * block_size

tokens: list[int] = []
for i, hid in enumerate(hash_ids):
size = final_block_size if i == len(hash_ids) - 1 else block_size
tokens.extend(get_block_tokens(hid, size))
prompt = decode(tokens, skip_special_tokens=False)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Find and examine the MooncakeTrace validation logic
find . -name "mooncake_trace.py" -type f

Repository: ai-dynamo/aiperf

Length of output: 105


🏁 Script executed:

# Examine the parallel_convert.py file to understand context around the flagged code
wc -l src/aiperf/dataset/loader/parallel_convert.py

Repository: ai-dynamo/aiperf

Length of output: 109


🏁 Script executed:

# Find the sample_tokens_from_corpus function
rg "def sample_tokens_from_corpus" -A 10

Repository: ai-dynamo/aiperf

Length of output: 828


🏁 Script executed:

# Examine the MooncakeTrace validation logic
cat -n src/aiperf/dataset/loader/mooncake_trace.py | head -100

Repository: ai-dynamo/aiperf

Length of output: 4373


🏁 Script executed:

# Get the full sample_tokens_from_corpus implementation
rg "def sample_tokens_from_corpus" -A 30 src/aiperf/dataset/generator/prompt.py

Repository: ai-dynamo/aiperf

Length of output: 922


🏁 Script executed:

# Check the context around lines 138-146 in parallel_convert.py (lines 103-160)
sed -n '103,160p' src/aiperf/dataset/loader/parallel_convert.py

Repository: ai-dynamo/aiperf

Length of output: 2295


🏁 Script executed:

# Find the MooncakeTrace model definition
rg "class MooncakeTrace" -A 50 src/aiperf/dataset/loader/models.py

Repository: ai-dynamo/aiperf

Length of output: 2406


🏁 Script executed:

# Also check if there are any validators on MooncakeTrace
rg "class MooncakeTrace" -B 5 -A 80 src/aiperf/dataset/loader/models.py | head -120

Repository: ai-dynamo/aiperf

Length of output: 3636


Add validation to ensure final_block_size remains positive.

The calculation final_block_size = input_length - (len(hash_ids) - 1) * block_size can produce a non-positive value if input_length is smaller than the space required by preceding blocks. For example, with input_length=100, 5 hash_ids, and block_size=30, final_block_size becomes −20. This breaks the token sampling logic.

Add a validation check when hash_ids is present to ensure input_length >= (len(hash_ids) - 1) * block_size, or document this as a required invariant for callers.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/loader/parallel_convert.py` around lines 138 - 146,
Validate that final_block_size is positive before using it: compute
final_block_size as currently done (in the block handling hash_ids inside
parallel_convert.py), then check that input_length >= (len(hash_ids) - 1) *
block_size (or equivalently final_block_size > 0) and raise a clear ValueError
(or return/skip the trace) if the check fails; reference the variables hash_ids,
input_length, final_block_size and the functions get_block_tokens/decode so
maintainers can locate where to add the validation and the error message
describing the invalid sizes (include values for input_length, len(hash_ids),
and block_size in the error).

Comment on lines +38 to +72
def create_jsonl_file():
"""Create a temporary JSONL file with custom content."""
filenames = []

def _create_file(content_lines):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
for line in content_lines:
f.write(line + "\n")
filenames.append(f.name)
return f.name

yield _create_file

for fn in filenames:
Path(fn).unlink(missing_ok=True)


@pytest.fixture
def default_user_config() -> UserConfig:
return UserConfig(endpoint=EndpointConfig(model_names=["test-model"]))


@pytest.fixture
def mock_prompt_generator():
"""Mock PromptGenerator with required attributes for BaseTraceDatasetLoader."""
generator = Mock()
generator.generate.return_value = "Generated prompt text"
generator._cache = {}
generator._tokenized_corpus = list(range(100, 200))
generator._hash_id_corpus_rng = Mock()
generator._hash_id_corpus_rng.seed = 42
generator.tokenizer = Mock()
generator.tokenizer.resolved_name = "test-model"
generator.tokenizer.block_separation_token_id = None
return generator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add explicit type hints to fixtures and test methods.

Several function signatures in this file are untyped (fixtures and test_* methods). Please add parameter/return annotations (-> None for tests) to match repository standards.

Proposed pattern (apply consistently across this file)
+from collections.abc import Callable, Sequence
@@
 `@pytest.fixture`
-def create_jsonl_file():
+def create_jsonl_file() -> Callable[[Sequence[str]], str]:
@@
-    def _create_file(content_lines):
+    def _create_file(content_lines: Sequence[str]) -> str:
@@
 `@pytest.fixture`
-def mock_prompt_generator():
+def mock_prompt_generator() -> Mock:
@@
-    def test_hash_is_16_hex_chars(self, create_jsonl_file):
+    def test_hash_is_16_hex_chars(
+        self, create_jsonl_file: Callable[[Sequence[str]], str]
+    ) -> None:

As per coding guidelines, "Add type hints on ALL functions (params and return)".

Also applies to: 83-605

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/loader/test_base_trace_loader.py` around lines 38 - 72,
Add explicit type annotations to all untyped fixtures and test functions in this
file: annotate create_jsonl_file with its return/generator type (e.g.,
Iterator[Callable[[List[str]], str]]), mark default_user_config() -> UserConfig,
mock_prompt_generator() -> Mock (or PromptGenerator protocol), and add -> None
return annotations to all test_* functions and typed parameters where
applicable; update any local variables' types if needed to satisfy type checkers
(references: create_jsonl_file, default_user_config, mock_prompt_generator, and
all test_* functions).

Comment on lines +422 to +442
def test_turn_fields_populated(
self, create_jsonl_file, mock_prompt_generator, default_user_config
):
"""Turn objects should have correct timestamp, delay, max_tokens."""
content = [
'{"input_length": 100, "hash_ids": [1], "timestamp": 5000, "output_length": 42}'
]
filename = create_jsonl_file(content)

loader = MooncakeTraceDatasetLoader(
filename=filename,
user_config=default_user_config,
prompt_generator=mock_prompt_generator,
)
data = loader.load_dataset()
conversations = list(loader.convert_to_conversations(data))

turn = conversations[0].turns[0]
assert turn.timestamp == 5000
assert turn.max_tokens == 42

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

delay is documented but not actually validated.

The test description says delay is verified, but there is no assertion for turn.delay. This leaves a regression gap in turn field mapping.

Suggested test fix
-        content = [
-            '{"input_length": 100, "hash_ids": [1], "timestamp": 5000, "output_length": 42}'
-        ]
+        content = [
+            '{"input_length": 100, "hash_ids": [1], "timestamp": 5000, "delay": 250, "output_length": 42}'
+        ]
@@
         turn = conversations[0].turns[0]
         assert turn.timestamp == 5000
+        assert turn.delay == 250
         assert turn.max_tokens == 42
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/loader/test_base_trace_loader.py` around lines 422 - 442,
The test test_turn_fields_populated is missing an assertion for the turn.delay
field; update the test to assert that turn.delay equals the expected value
produced by MooncakeTraceDatasetLoader (i.e., the delay mapping implemented in
MooncakeTraceDatasetLoader.convert_to_conversations or loader.load_dataset), for
example add a line like assert turn.delay == <expected_delay> where
<expected_delay> is the loader's computed/default delay for the provided record
(commonly 0 if no delay field is present).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant