Skip to content

[WIP] Proof of concept mooncake coding#734

Draft
ajcasagrande wants to merge 18 commits intomainfrom
ajc/mooncake-coder
Draft

[WIP] Proof of concept mooncake coding#734
ajcasagrande wants to merge 18 commits intomainfrom
ajc/mooncake-coder

Conversation

@ajcasagrande
Copy link
Contributor

@ajcasagrande ajcasagrande commented Mar 4, 2026

aiperf profile --prompt-corpus coding

Summary by CodeRabbit

  • New Features

    • New CLI option --prompt-corpus (sonnet | coding) to choose synthetic prompt style.
    • Demo CLI to generate coding-style sample prompts and a coding-content generator.
  • Performance

    • Dataset generation now streams entries to reduce peak memory use.
    • Parallel conversion updated with deterministic per-hash seeding for reproducible prompt generation.
  • Documentation

    • CLI docs updated with choices table for prompt corpus.

Add support for Alibaba Bailian trace format (bailian_trace) with multi-turn
conversation linking via chat_id/parent_chat_id and 16-token SipHash blocks.
Extract common trace loading logic into BaseTraceDatasetLoader to share
infrastructure between Mooncake and Bailian loaders. Make block_size
configurable per-loader via plugin metadata default_block_size, and generalize
mooncake-specific validation to work with any trace dataset type.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
- Remove .value on enum in error message (use string-based enum directly)
- Validate mean is not None before cached prompt generation
- Add cycle detection in Bailian find_root to prevent infinite loops
- Reset filtering counters per load_dataset() call to avoid over-reporting

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Tokenizers like Kimi use `allow_special_tokens` instead of the standard
`add_special_tokens` for encode, and their `decode()` doesn't accept
`skip_special_tokens`. Passing unsupported kwargs triggers the slow
`PreTrainedTokenizer.super()` fallback path, causing ~5000x slower
decode (~204ms vs 0.04ms per 4500 tokens).

After loading, inspect the tokenizer's method signatures and override
the default call/encode/decode args to match.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Gracefully handle unrecognized 'type' field values during dataset type
inference by skipping the explicit type shortcut and falling through to
structural detection. This fixes Bailian traces (which use "type" for
request type, not dataset type) auto-detecting correctly. Also updates
CLI descriptions to reference both trace formats and adds tests for the
type field fallback behavior.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Add HashIdRandomGenerator that deterministically seeds per (trace_id,
hash_id) pair, enabling parallel token generation across workers without
lock contention or cache coordination. Extract parallel_convert module
to leverage multiprocessing with shared-memory token corpus. Stream
conversations through composers to the backing store instead of
materializing the full dataset in memory.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
parallel_decode and _init_worker were not forwarding tokenizer config
args, causing failures with tokenizers like Kimi that require
trust_remote_code=True.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
…net or coding)

Introduces PromptCorpus enum and CLI option to choose between Shakespeare sonnets
and realistic coding content for synthetic prompt generation. BaseTraceDatasetLoader
now accepts BaseGenerator to support pluggable content generators.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
… domains

Add 5 new bridge text tuples (security, distributed systems, observability,
data architecture, architecture tradeoffs) and 6 new conversation patterns
targeting blind spots in MoE expert activation. The deep reasoning patterns
(architecture_review, incident_response) produce longer multi-domain assistant
turns that cross expert boundaries. Rebalance conversation pool weight from
60 to 90 blocks to account for the wider variety (18 patterns total).

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
@github-actions
Copy link

github-actions bot commented Mar 4, 2026

Try out this PR

Quick install:

pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@7bdfac561b5917bb99cc7d15ecbd9c4f6729de94

Recommended with virtual environment (using uv):

uv venv --python 3.12 && source .venv/bin/activate
uv pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@7bdfac561b5917bb99cc7d15ecbd9c4f6729de94

Last updated for commit: 7bdfac5Browse code

@ajcasagrande ajcasagrande marked this pull request as draft March 4, 2026 15:33
@coderabbitai
Copy link

coderabbitai bot commented Mar 4, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds a CLI option to select prompt corpus (sonnet|coding), a HashIdRandomGenerator for deterministic per-hash_id RNG, converts dataset composers/loaders to streaming iterators, implements multiprocessing parallel conversion with a shared-memory token corpus, and introduces a coding-content generator plus tests and a demo.

Changes

Cohort / File(s) Summary
CLI & Config
docs/cli_options.md, pyproject.toml, src/aiperf/common/config/prompt_config.py, src/aiperf/common/enums/enums.py, src/aiperf/common/enums/__init__.py
Added --prompt-corpus option and PromptCorpus enum; prompt_corpus fields added to prompt configs; tiktoken dependency added; codespell ignore list extended.
Deterministic RNG
src/aiperf/common/hash_id_random_generator.py, tests/unit/common/test_hash_id_random_generator.py
New HashIdRandomGenerator enabling per-trace and per-hash_id deterministic reseeding; comprehensive unit tests added.
Tokenizer behavior
src/aiperf/common/tokenizer.py, tests/unit/common/test_tokenizer_kwarg_overrides.py
Detects supported decode kwargs and filters caller-supplied kwargs for decode; tests added to validate behavior.
Prompt generation
src/aiperf/dataset/generator/prompt.py, tests/unit/dataset/generator/test_prompt_generator.py
Introduced sample_tokens_from_corpus, integrated per-hash_id RNG, changed generate/_generate_cached_prompt to produce decoded prompts and accept block_size; removed decoded-cache; tests updated for HashId RNG integration.
Coding content generator & demo
src/aiperf/dataset/generator/__init__.py, src/aiperf/dataset/generator/coding_content.py (new), tests/unit/dataset/generator/test_coding_content_generator.py, demo_coding_content.py
Added CodingContentGenerator, exported it, added extensive unit tests and a demo CLI script for sampling coding-themed prompts.
Parallel conversion
src/aiperf/dataset/generator/parallel_decode.py (removed), src/aiperf/dataset/loader/parallel_convert.py (new), tests/unit/dataset/generator/test_parallel_decode.py (removed), tests/unit/dataset/loader/test_parallel_convert.py
Removed legacy parallel_decode; added parallel_convert implementing multiprocessing Pool conversion with shared-memory corpus and per-worker HashIdRandomGenerator; tests added/updated accordingly.
Trace loader & conversion
src/aiperf/dataset/loader/base_trace_loader.py, src/aiperf/dataset/protocols.py, tests/unit/dataset/loader/test_base_trace_loader.py, tests/unit/dataset/loader/test_bailian_trace.py, tests/unit/dataset/loader/test_trace.py
Switched to file-hash-based trace_id, replaced PromptGenerator with BaseGenerator, changed convert_to_conversations to return Iterator[Conversation] with num_workers/batch_size and single/parallel dispatch; tests adapted.
Dataset composers (streaming)
src/aiperf/dataset/composer/base.py, src/aiperf/dataset/composer/custom.py, src/aiperf/dataset/composer/synthetic.py, src/aiperf/dataset/composer/synthetic_rankings.py, tests/unit/dataset/composer/*
Changed create_dataset to yield Iterator[Conversation]; replaced batch finalization with _finalize_conversation(conversation, session_index); custom composer uses CodingContentGenerator when prompt_corpus is CODING; tests updated to materialize iterators.
Dataset manager & metadata
src/aiperf/dataset/dataset_manager.py, src/aiperf/common/models.py, tests/unit/dataset/test_dataset_manager.py, tests/unit/dataset/test_dataset_manager_inputs_json.py, tests/unit/dataset/conftest.py
Moved from in-memory conversations to mmap-backed/streaming storage via dataset client; DatasetMetadata.conversations now holds ConversationMetadata; payload generation made async; tests/fixtures updated to mock dataset client and use _conversation_count.
Synthesis & rolling hasher
src/aiperf/dataset/synthesis/rolling_hasher.py
Now forwards explicit block_size to prompt generator when hash_ids are provided.
Worker history clipping
src/aiperf/workers/worker.py, tests/unit/workers/test_worker.py
Added per-model Tokenizer caching and response truncation to implement multi-turn history clipping; tests added to validate OSL-based truncation.
Various tests adjusted/added
tests/unit/... (many files)
Updated many tests to handle iterator returns (often wrapping list(...)), removed parallel_decode patches/tests, renamed finalization tests, replaced _decoded_cache usages with _cache, and added many new tests for parallel conversion, coding generator, and HashId RNG.
Demos & tooling
demo_coding_content.py
Added demo script to exercise CodingContentGenerator and sample outputs.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐇 I nibble tokens from sonnets and code,
Seed each hash_id so paths will not goad,
Streams whisper out one conversation at time,
Workers share corpus — deterministic rhyme,
Hopping delighted: prompts now sing in mode! 🌿✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 71.23% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The PR title "[WIP] Proof of concept mooncake coding" is partially related to the changeset, referring to a real aspect of the change (the coding corpus feature), but it is vague and does not clearly summarize the main changes. The title uses a generic descriptor "mooncake coding" without conveying the specific implementation details. Consider a more specific title that captures the primary changes: e.g., "[WIP] Add coding prompt corpus and parallel dataset streaming", or similar. This would better convey the main scope of work.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Tip

CodeRabbit can generate a title for your PR based on the changes with custom instructions.

Set the reviews.auto_title_instructions setting to generate a title for your PR based on the changes in the PR with custom instructions.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/aiperf/dataset/dataset_manager.py (1)

232-253: ⚠️ Potential issue | 🟠 Major

Move filesystem operations off the event loop in _generate_inputs_json_file.

Line [242], Line [247], Line [253], and Line [272]-Line [273] do blocking filesystem I/O inside an async def, which can stall other service tasks during large input generation.

Proposed non-blocking async-safe patch
     async def _generate_inputs_json_file(self) -> None:
@@
         try:
             start_time = time.perf_counter()
-            file_path.parent.mkdir(parents=True, exist_ok=True)
+            await asyncio.to_thread(
+                file_path.parent.mkdir, parents=True, exist_ok=True
+            )
@@
-            temp_file_path.write_bytes(
-                orjson.dumps(
-                    inputs.model_dump(exclude_none=True, mode="json"),
-                    option=orjson.OPT_INDENT_2,
-                )
+            payload = orjson.dumps(
+                inputs.model_dump(exclude_none=True, mode="json"),
+                option=orjson.OPT_INDENT_2,
             )
-            temp_file_path.replace(file_path)
+            await asyncio.to_thread(temp_file_path.write_bytes, payload)
+            await asyncio.to_thread(temp_file_path.replace, file_path)
@@
         finally:
-            if temp_file_path.exists():
-                temp_file_path.unlink()
+            if await asyncio.to_thread(temp_file_path.exists):
+                await asyncio.to_thread(temp_file_path.unlink)
As per coding guidelines "Use async/await for ALL I/O operations - no `time.sleep`, no blocking calls".

Also applies to: 271-273

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/dataset_manager.py` around lines 232 - 253, The async
method _generate_inputs_json_file performs blocking filesystem calls
(file_path.parent.mkdir, temp_file_path.write_bytes, temp_file_path.replace and
potentially file_path.resolve()) on the event loop; refactor these to run off
the loop (e.g., use asyncio.get_running_loop().run_in_executor or an async file
library) so blocking mkdir/write/replace/resolve calls execute in a threadpool.
Concretely: wrap directory creation, writing the orjson bytes for
inputs.model_dump(...) and atomic replace in executor calls (or use aiofiles to
write the temp file), keep ModelEndpointInfo.from_user_config and
_generate_input_payloads as awaitable work on the loop, and ensure errors/time
measurements (start_time) remain consistent around the offloaded operations.
src/aiperf/dataset/generator/prompt.py (1)

284-300: ⚠️ Potential issue | 🔴 Critical

Cached hash block reuse can return incorrect prompt lengths.

When the last hash block is shorter than block_size, cached blocks are appended unsliced. This can over-generate tokens; the inverse path can under-generate if a short block was cached first.

Proposed fix: cache full blocks, slice per position
-        for index, hash_id in enumerate(hash_ids):
-            # For the last hash ID, use the remaining tokens as the block size
-            if index == len(hash_ids) - 1:
-                current_block_size = final_block_size
-
-            if hash_id not in self._cache:
-                self._hash_id_corpus_rng.reseed_for_hash_id(hash_id)
-                self._cache[hash_id] = sample_tokens_from_corpus(
-                    self._tokenized_corpus,
-                    current_block_size,
-                    self._hash_id_corpus_rng,
-                    self.tokenizer.block_separation_token_id,
-                )
-
-            final_prompt.extend(self._cache[hash_id])
+        for index, hash_id in enumerate(hash_ids):
+            current_block_size = (
+                final_block_size if index == len(hash_ids) - 1 else block_size
+            )
+
+            if hash_id not in self._cache:
+                self._hash_id_corpus_rng.reseed_for_hash_id(hash_id)
+                # Cache canonical full-size block per hash_id.
+                self._cache[hash_id] = sample_tokens_from_corpus(
+                    self._tokenized_corpus,
+                    block_size,
+                    self._hash_id_corpus_rng,
+                    self.tokenizer.block_separation_token_id,
+                )
+
+            final_prompt.extend(self._cache[hash_id][:current_block_size])
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/generator/prompt.py` around lines 284 - 300, The loop that
populates final_prompt caches possibly-short sampled blocks into self._cache
causing incorrect lengths when later reused; change it so
sample_tokens_from_corpus is always called with the full canonical block size
(not current_block_size) and the full block is cached under hash_id, then append
a slice of that cached full block of length current_block_size (use
final_block_size for the last index) to final_prompt; ensure reseeding with
self._hash_id_corpus_rng.reseed_for_hash_id(hash_id) remains before sampling and
keep tokenization/decoding via self.tokenizer.decode(final_prompt,
skip_special_tokens=False) unchanged.
🧹 Nitpick comments (4)
src/aiperf/dataset/protocols.py (1)

49-51: Update protocol docstring to match the new iterable contract.

convert_to_conversations now returns Iterable[Conversation], but the class docstring still says “list of Conversation objects.”

♻️ Suggested doc update
 class CustomDatasetLoaderProtocol(Protocol):
-    """Protocol for custom dataset loaders that load dataset from a file and convert it to a list of Conversation objects."""
+    """Protocol for custom dataset loaders that load dataset from a file and convert it to an iterable of Conversation objects."""
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/protocols.py` around lines 49 - 51, Update the
class/protocol docstring to reflect that convert_to_conversations returns an
Iterable[Conversation] instead of a list; locate the protocol/class containing
the convert_to_conversations method and change any wording like "list of
Conversation objects" to "iterable of Conversation objects
(Iterable[Conversation])" and, if present, update any examples or type
descriptions referencing CustomDatasetT or convert_to_conversations to match the
new iterable contract.
tests/unit/dataset/test_dataset_manager.py (2)

407-408: Tighten this assertion to prevent false positives.

At Line 408, > 0 can pass even if dataset sizing regresses. Prefer an exact match to expected count (or metadata length) to make this test meaningful.

Suggested assertion hardening
-        assert dataset_manager._conversation_count > 0
+        assert (
+            dataset_manager._conversation_count
+            == len(dataset_manager.dataset_metadata.conversations)
+        )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/test_dataset_manager.py` around lines 407 - 408, The
assertion `assert dataset_manager._conversation_count > 0` is too loose; replace
it with an exact equality check against the source of truth for the dataset size
(e.g., compute an expected_count from the loaded metadata or dataset used
earlier in the test and assert equality). Specifically, update the test to
compute expected_count (for example via len(dataset_manager._metadata) or the
variable that holds the test dataset entries) and assert
dataset_manager._conversation_count == expected_count so the test fails on
regressions; reference dataset_manager._conversation_count and the
metadata/dataset variable you already use in the test to derive expected_count.

465-466: These “verify” comments currently don’t verify anything.

Consider adding an explicit assertion for non-materialization, or remove/reword the comments to avoid implying a check that does not exist.

Also applies to: 490-491

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/test_dataset_manager.py` around lines 465 - 466, The
comments saying "Verify in-memory dataset was not materialized" are not backed
by any assertion; update the tests in tests/unit/dataset/test_dataset_manager.py
to either remove/reword those comments or add an explicit assertion that the
dataset was not materialized (for example call
DatasetManager.is_materialized(dataset_id) and assert False, or assert
dataset.materialized is False, or assert the expected on-disk path does not
exist via Path(...).exists() ). Locate the two comment occurrences (the literal
comment text around line 465 and the similar one around 490–491) and add one of
these clear assertions referencing the test's dataset/manager variables (e.g.,
dataset, dataset_id, manager) so the test actually verifies non-materialization.
tests/unit/dataset/loader/test_parallel_convert.py (1)

809-843: Make shared-memory cleanup validation explicit.

This test currently relies on indirect detection of leaks. Consider asserting cleanup directly (e.g., that close()/unlink() are called) so failures are immediate and localized to this test.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/loader/test_parallel_convert.py` around lines 809 - 843,
The test test_shared_memory_cleanup relies on indirect detection of leaked
shared memory; explicitly patch the shared-memory class used by parallel_convert
(e.g., multiprocessing.shared_memory.SharedMemory or the module-level wrapper
used in parallel_convert) and have the patched constructor return a mock with
close() and unlink() methods; trigger the RuntimeError via the mocked Pool.imap
as already done and then assert that the returned mock's close() and unlink()
were called (or that close()/unlink() were called for every created mock if
multiple are created), ensuring cleanup is validated directly after
parallel_convert raises.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/aiperf/common/hash_id_random_generator.py`:
- Around line 48-49: The code uses "base_rng.seed or base_rng.randrange(...)"
which treats seed=0 as falsy and loses reproducibility; change the logic to
explicitly check for None (e.g., use "base_seed = base_rng.seed if base_rng.seed
is not None else base_rng.randrange(0, 2**64)") when deriving base_seed before
returning via "return cls(base_seed, _internal=True)"; ensure you reference the
RandomGenerator's allowed seed range (0..2**64-1) and add a regression test that
constructs the base RNG with seed=0 and verifies the resulting
HashIdRandomGenerator (or the classmethod that wraps this logic) is reproducible
and uses seed 0 rather than being randomized.

In `@src/aiperf/common/tokenizer.py`:
- Around line 51-56: Wrap the inspect.signature call in _supports_kwarg with a
try/except and first ensure the retrieved attribute is callable (i.e., replace
the current getattr check with a callable check), returning False on exceptions
(ValueError, TypeError) so probing builtins won't raise during init; and update
_apply_kwarg_overrides so it tests __call__ for setting _call_args and tests
encode for setting _encode_args (do not use encode's support to gate
_call_args), ensuring you reference and adjust the logic around _supports_kwarg,
_apply_kwarg_overrides, __call__, encode, _call_args and _encode_args
accordingly.

In `@src/aiperf/dataset/dataset_manager.py`:
- Around line 157-161: The log string inside the self.info call in
dataset_manager (where self._compress_only is checked) has an extra closing
parenthesis at the end; update the message passed to self.info (the f-string
using self._conversation_count and the following literal) to remove the trailing
')' so the message ends "...conversations into backing store" (preferably
combine into a single f-string for clarity) in the method that contains the
self._compress_only check and the self.info(...) call.

In `@src/aiperf/dataset/generator/prompt.py`:
- Around line 213-218: The current fallback using "or" for computing
effective_block_size ignores explicit falsy values like block_size=0; update the
logic in the method that computes effective_block_size (the code that currently
falls back to self.config.input_tokens.block_size and
InputTokensDefaults.BLOCK_SIZE) to treat None as "use fallback" but validate any
provided block_size: if block_size is None, use
self.config.input_tokens.block_size or InputTokensDefaults.BLOCK_SIZE; if
block_size is not None, ensure it is a positive integer (>0) and raise a
ValueError (with a clear message) for invalid values (e.g., 0 or negative); then
pass the validated effective_block_size into _generate_cached_prompt(mean,
hash_ids, effective_block_size).

In `@src/aiperf/dataset/loader/base_trace_loader.py`:
- Around line 306-313: Do not silently default a missing input_length to 0;
instead fail fast: replace the current getattr(..., 0) usage with a direct
retrieval (e.g., input_length = getattr(trace, "input_length")) and add an
explicit check that raises a clear exception (ValueError or TypeError) if
input_length is None or missing before calling prompt_generator.generate; keep
using hash_ids as before but avoid masking malformed traces in
base_trace_loader.py/PromptGenerator.generate by ensuring trace.input_length is
validated first.

In `@src/aiperf/dataset/loader/parallel_convert.py`:
- Line 57: The block_cache currently uses only hash_id as a key (field
block_cache: dict[int, list[int]]) which causes incorrect reuse when the same
hash_id is requested with different token block sizes; update the cache to key
by both hash_id and size (e.g., use a tuple key like (hash_id, size) or a
combined unique key) and adjust all access points (including where block_cache
is read/written around the code referenced at lines ~120-126) to use the new
composite key so cached token lists are segregated by requested block size.
- Around line 132-149: The parallel path currently skips prompt generation when
trace lacks text_input and hash_ids is missing or empty, returning "" — change
the branch in the prompt-building logic to treat a missing/empty
trace["hash_ids"] but present trace["input_length"] as a single block: when
trace.get("hash_ids") is falsy but trace.get("input_length") exists, build a
tokens list of length input_length (one block) by calling get_block_tokens for
that single block and then decode(tokens, skip_special_tokens=False) to create
prompt; keep the existing hash_ids loop behavior when hash_ids is non-empty and
preserve use of block_size, final_block_size, get_block_tokens and decode.

In `@tests/unit/dataset/loader/test_parallel_convert.py`:
- Line 50: Tests are triggering Ruff unused-symbol warnings for unused
parameters; update the unused names to use underscore-prefixed or anonymous
names so linters ignore them. Specifically, change the tok.decode side_effect
from lambda ids, **kw: ... to lambda ids, **_: ... (or lambda ids, **_kw: ...),
and rename any unused function/test parameters like delay, tmp_path, kw to
_delay, _tmp_path, or _ (or remove them if safe) in the same test file so the
symbols are no longer flagged (look for occurrences around the provided examples
and the other mentioned lines).
- Around line 1000-1026: The test sets parallel_convert_mod._worker_state
directly but only clears it on the success path; wrap the assignment of
parallel_convert_mod._worker_state and the call to _process_batch(batch) in a
try/finally so the global is always reset to None, i.e., assign
parallel_convert_mod._worker_state = _WorkerState(...) before calling
_process_batch(batch) inside a try block and put
parallel_convert_mod._worker_state = None in the finally block to guarantee
teardown even if _process_batch raises.
- Around line 34-1062: Add explicit type hints to all fixtures, helper
functions, and test methods: annotate return types for fixtures like
sample_corpus() -> List[int], sample_corpus_array(sample_corpus: List[int]) ->
np.ndarray, mock_tokenizer() -> MagicMock, setup_worker(...) -> Iterator[None]
(or Generator[None, None, None]) and setup_worker_with_sep(...) ->
Iterator[None]; mark all test methods (e.g., TestSetDaemon.test_set_daemon_true,
TestProcessBatch.* etc.) with -> None; annotate the helper
_setup_and_process(self, corpus_array: np.ndarray, hash_ids: Sequence[int],
input_length: int, block_size: int, trace_id: str, seed: int = 42) -> str; add
any needed typing imports (List, Sequence, Iterator, Optional) at the top of the
file and adjust parameter types for fixtures/helpers to match their usage (e.g.,
sample_corpus_array: np.ndarray, mock_tokenizer: MagicMock) so every function
has full parameter and return annotations.

---

Outside diff comments:
In `@src/aiperf/dataset/dataset_manager.py`:
- Around line 232-253: The async method _generate_inputs_json_file performs
blocking filesystem calls (file_path.parent.mkdir, temp_file_path.write_bytes,
temp_file_path.replace and potentially file_path.resolve()) on the event loop;
refactor these to run off the loop (e.g., use
asyncio.get_running_loop().run_in_executor or an async file library) so blocking
mkdir/write/replace/resolve calls execute in a threadpool. Concretely: wrap
directory creation, writing the orjson bytes for inputs.model_dump(...) and
atomic replace in executor calls (or use aiofiles to write the temp file), keep
ModelEndpointInfo.from_user_config and _generate_input_payloads as awaitable
work on the loop, and ensure errors/time measurements (start_time) remain
consistent around the offloaded operations.

In `@src/aiperf/dataset/generator/prompt.py`:
- Around line 284-300: The loop that populates final_prompt caches
possibly-short sampled blocks into self._cache causing incorrect lengths when
later reused; change it so sample_tokens_from_corpus is always called with the
full canonical block size (not current_block_size) and the full block is cached
under hash_id, then append a slice of that cached full block of length
current_block_size (use final_block_size for the last index) to final_prompt;
ensure reseeding with self._hash_id_corpus_rng.reseed_for_hash_id(hash_id)
remains before sampling and keep tokenization/decoding via
self.tokenizer.decode(final_prompt, skip_special_tokens=False) unchanged.

---

Nitpick comments:
In `@src/aiperf/dataset/protocols.py`:
- Around line 49-51: Update the class/protocol docstring to reflect that
convert_to_conversations returns an Iterable[Conversation] instead of a list;
locate the protocol/class containing the convert_to_conversations method and
change any wording like "list of Conversation objects" to "iterable of
Conversation objects (Iterable[Conversation])" and, if present, update any
examples or type descriptions referencing CustomDatasetT or
convert_to_conversations to match the new iterable contract.

In `@tests/unit/dataset/loader/test_parallel_convert.py`:
- Around line 809-843: The test test_shared_memory_cleanup relies on indirect
detection of leaked shared memory; explicitly patch the shared-memory class used
by parallel_convert (e.g., multiprocessing.shared_memory.SharedMemory or the
module-level wrapper used in parallel_convert) and have the patched constructor
return a mock with close() and unlink() methods; trigger the RuntimeError via
the mocked Pool.imap as already done and then assert that the returned mock's
close() and unlink() were called (or that close()/unlink() were called for every
created mock if multiple are created), ensuring cleanup is validated directly
after parallel_convert raises.

In `@tests/unit/dataset/test_dataset_manager.py`:
- Around line 407-408: The assertion `assert dataset_manager._conversation_count
> 0` is too loose; replace it with an exact equality check against the source of
truth for the dataset size (e.g., compute an expected_count from the loaded
metadata or dataset used earlier in the test and assert equality). Specifically,
update the test to compute expected_count (for example via
len(dataset_manager._metadata) or the variable that holds the test dataset
entries) and assert dataset_manager._conversation_count == expected_count so the
test fails on regressions; reference dataset_manager._conversation_count and the
metadata/dataset variable you already use in the test to derive expected_count.
- Around line 465-466: The comments saying "Verify in-memory dataset was not
materialized" are not backed by any assertion; update the tests in
tests/unit/dataset/test_dataset_manager.py to either remove/reword those
comments or add an explicit assertion that the dataset was not materialized (for
example call DatasetManager.is_materialized(dataset_id) and assert False, or
assert dataset.materialized is False, or assert the expected on-disk path does
not exist via Path(...).exists() ). Locate the two comment occurrences (the
literal comment text around line 465 and the similar one around 490–491) and add
one of these clear assertions referencing the test's dataset/manager variables
(e.g., dataset, dataset_id, manager) so the test actually verifies
non-materialization.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: a7658d89-49fd-4e2f-be88-e8b746604ec3

📥 Commits

Reviewing files that changed from the base of the PR and between 3601096 and 8db6e96.

📒 Files selected for processing (36)
  • docs/cli_options.md
  • pyproject.toml
  • src/aiperf/common/config/prompt_config.py
  • src/aiperf/common/enums/__init__.py
  • src/aiperf/common/enums/enums.py
  • src/aiperf/common/hash_id_random_generator.py
  • src/aiperf/common/tokenizer.py
  • src/aiperf/dataset/composer/base.py
  • src/aiperf/dataset/composer/custom.py
  • src/aiperf/dataset/composer/synthetic.py
  • src/aiperf/dataset/composer/synthetic_rankings.py
  • src/aiperf/dataset/dataset_manager.py
  • src/aiperf/dataset/generator/__init__.py
  • src/aiperf/dataset/generator/coding_content.py
  • src/aiperf/dataset/generator/parallel_decode.py
  • src/aiperf/dataset/generator/prompt.py
  • src/aiperf/dataset/loader/base_trace_loader.py
  • src/aiperf/dataset/loader/parallel_convert.py
  • src/aiperf/dataset/protocols.py
  • src/aiperf/dataset/synthesis/rolling_hasher.py
  • tests/unit/common/test_hash_id_random_generator.py
  • tests/unit/common/test_tokenizer_kwarg_overrides.py
  • tests/unit/dataset/composer/test_base_composer.py
  • tests/unit/dataset/composer/test_custom_composer.py
  • tests/unit/dataset/composer/test_synthetic_composer.py
  • tests/unit/dataset/composer/test_synthetic_rankings_composer.py
  • tests/unit/dataset/conftest.py
  • tests/unit/dataset/generator/test_coding_content_generator.py
  • tests/unit/dataset/generator/test_parallel_decode.py
  • tests/unit/dataset/generator/test_prompt_generator.py
  • tests/unit/dataset/loader/test_bailian_trace.py
  • tests/unit/dataset/loader/test_base_trace_loader.py
  • tests/unit/dataset/loader/test_parallel_convert.py
  • tests/unit/dataset/loader/test_trace.py
  • tests/unit/dataset/test_dataset_manager.py
  • tests/unit/dataset/test_dataset_manager_inputs_json.py
💤 Files with no reviewable changes (2)
  • tests/unit/dataset/generator/test_parallel_decode.py
  • src/aiperf/dataset/generator/parallel_decode.py

Comment on lines +48 to +49
base_seed = base_rng.seed or base_rng.randrange(0, 2**64)
return cls(base_seed, _internal=True)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

head -60 src/aiperf/common/hash_id_random_generator.py | cat -n

Repository: ai-dynamo/aiperf

Length of output: 2733


🏁 Script executed:

cat -n src/aiperf/common/hash_id_random_generator.py | head -80

Repository: ai-dynamo/aiperf

Length of output: 3479


🏁 Script executed:

rg "base_rng" src/aiperf/common/hash_id_random_generator.py -A 3 -B 3

Repository: ai-dynamo/aiperf

Length of output: 428


🏁 Script executed:

rg "def from_base_rng" src/aiperf/common/hash_id_random_generator.py -A 10

Repository: ai-dynamo/aiperf

Length of output: 590


🏁 Script executed:

rg "\.seed" src/aiperf/common/hash_id_random_generator.py

Repository: ai-dynamo/aiperf

Length of output: 257


🏁 Script executed:

find . -name "*test*hash*" -o -name "*test*random*" 2>/dev/null | head -20

Repository: ai-dynamo/aiperf

Length of output: 311


🏁 Script executed:

rg "HashIdRandomGenerator" --type py -l

Repository: ai-dynamo/aiperf

Length of output: 480


🏁 Script executed:

rg "from_base_rng" --type py -A 3 -B 1

Repository: ai-dynamo/aiperf

Length of output: 8840


🏁 Script executed:

cat -n src/aiperf/common/random_generator.py | head -100

Repository: ai-dynamo/aiperf

Length of output: 4440


🏁 Script executed:

rg "class RandomGenerator" src/aiperf/common/random_generator.py -A 30

Repository: ai-dynamo/aiperf

Length of output: 1501


🏁 Script executed:

rg "self\.seed\s*=" src/aiperf/common/random_generator.py -B 3 -A 1

Repository: ai-dynamo/aiperf

Length of output: 42


🏁 Script executed:

cat -n tests/unit/common/test_hash_id_random_generator.py

Repository: ai-dynamo/aiperf

Length of output: 12438


🏁 Script executed:

rg "seed\s*=\s*0" tests/unit/common/test_random_generator.py -B 2 -A 2

Repository: ai-dynamo/aiperf

Length of output: 42


🏁 Script executed:

cat -n tests/unit/common/test_random_generator.py | head -150

Repository: ai-dynamo/aiperf

Length of output: 6468


Fix seed=0 handling in base_rng derivation.

Using base_rng.seed or ... treats 0 as falsy and replaces it with a random value, breaking reproducibility. Since RandomGenerator supports seed values from 0 to 2^64-1, this must be handled explicitly.

Proposed fix
-        base_seed = base_rng.seed or base_rng.randrange(0, 2**64)
+        base_seed = (
+            base_rng.seed
+            if base_rng.seed is not None
+            else base_rng.randrange(0, 2**64)
+        )

Add a regression test with seed=0 in the base RNG path to prevent this in the future.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/common/hash_id_random_generator.py` around lines 48 - 49, The code
uses "base_rng.seed or base_rng.randrange(...)" which treats seed=0 as falsy and
loses reproducibility; change the logic to explicitly check for None (e.g., use
"base_seed = base_rng.seed if base_rng.seed is not None else
base_rng.randrange(0, 2**64)") when deriving base_seed before returning via
"return cls(base_seed, _internal=True)"; ensure you reference the
RandomGenerator's allowed seed range (0..2**64-1) and add a regression test that
constructs the base RNG with seed=0 and verifies the resulting
HashIdRandomGenerator (or the classmethod that wraps this logic) is reproducible
and uses seed 0 rather than being randomized.

Comment on lines 157 to 161
if self._compress_only:
self.info(
f"Kubernetes mode: skipped local client, freed {conversation_count} "
"conversations from memory (workers handle all requests)"
f"Kubernetes mode: skipped local client, compressed {self._conversation_count} "
"conversations into backing store)"
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix unmatched parenthesis in Kubernetes log message.

The message at Line [159]-Line [160] ends with an extra ) ("backing store)"), which makes logs look inconsistent.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/dataset_manager.py` around lines 157 - 161, The log string
inside the self.info call in dataset_manager (where self._compress_only is
checked) has an extra closing parenthesis at the end; update the message passed
to self.info (the f-string using self._conversation_count and the following
literal) to remove the trailing ')' so the message ends "...conversations into
backing store" (preferably combine into a single f-string for clarity) in the
method that contains the self._compress_only check and the self.info(...) call.

Comment on lines +213 to +218
effective_block_size = (
block_size
or self.config.input_tokens.block_size
or InputTokensDefaults.BLOCK_SIZE
)
return self._generate_cached_prompt(mean, hash_ids, block_size)
return self._generate_cached_prompt(mean, hash_ids, effective_block_size)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve explicit block_size overrides and validate them.

Line [213]-Line [217] uses or fallback, so an explicit block_size=0 is ignored and replaced by defaults instead of being rejected.

Proposed explicit-None handling + validation
-            effective_block_size = (
-                block_size
-                or self.config.input_tokens.block_size
-                or InputTokensDefaults.BLOCK_SIZE
-            )
+            effective_block_size = (
+                block_size
+                if block_size is not None
+                else self.config.input_tokens.block_size
+                if self.config.input_tokens.block_size is not None
+                else InputTokensDefaults.BLOCK_SIZE
+            )
+            if effective_block_size <= 0:
+                raise ConfigurationError(
+                    "block_size must be greater than 0 when hash_ids are provided."
+                )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
effective_block_size = (
block_size
or self.config.input_tokens.block_size
or InputTokensDefaults.BLOCK_SIZE
)
return self._generate_cached_prompt(mean, hash_ids, block_size)
return self._generate_cached_prompt(mean, hash_ids, effective_block_size)
effective_block_size = (
block_size
if block_size is not None
else self.config.input_tokens.block_size
if self.config.input_tokens.block_size is not None
else InputTokensDefaults.BLOCK_SIZE
)
if effective_block_size <= 0:
raise ConfigurationError(
"block_size must be greater than 0 when hash_ids are provided."
)
return self._generate_cached_prompt(mean, hash_ids, effective_block_size)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/generator/prompt.py` around lines 213 - 218, The current
fallback using "or" for computing effective_block_size ignores explicit falsy
values like block_size=0; update the logic in the method that computes
effective_block_size (the code that currently falls back to
self.config.input_tokens.block_size and InputTokensDefaults.BLOCK_SIZE) to treat
None as "use fallback" but validate any provided block_size: if block_size is
None, use self.config.input_tokens.block_size or InputTokensDefaults.BLOCK_SIZE;
if block_size is not None, ensure it is a positive integer (>0) and raise a
ValueError (with a clear message) for invalid values (e.g., 0 or negative); then
pass the validated effective_block_size into _generate_cached_prompt(mean,
hash_ids, effective_block_size).

Comment on lines +306 to 313
hash_ids: list[int] = getattr(trace, "hash_ids", None) or []
input_length: int = getattr(trace, "input_length", 0)
prompt = self.prompt_generator.generate(
mean=input_length, stddev=0, hash_ids=[]
mean=input_length,
stddev=0,
hash_ids=hash_ids,
block_size=self._block_size,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not default missing input_length to 0 during hash-based prompt generation.

Line [307] masks malformed traces and can silently produce empty prompts. This should fail fast at the conversion source.

Proposed fail-fast patch
-                    input_length: int = getattr(trace, "input_length", 0)
+                    input_length: int = getattr(trace, "input_length")
Based on learnings: "rely on Pydantic validation and remove unnecessary None handling for these fields."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/loader/base_trace_loader.py` around lines 306 - 313, Do
not silently default a missing input_length to 0; instead fail fast: replace the
current getattr(..., 0) usage with a direct retrieval (e.g., input_length =
getattr(trace, "input_length")) and add an explicit check that raises a clear
exception (ValueError or TypeError) if input_length is None or missing before
calling prompt_generator.generate; keep using hash_ids as before but avoid
masking malformed traces in base_trace_loader.py/PromptGenerator.generate by
ensuring trace.input_length is validated first.

block_size: int
sep_token: int | None
sample_tokens: Callable[..., list[int]]
block_cache: dict[int, list[int]] = field(default_factory=dict)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Cache key should include token block size.

Caching by hash_id alone can reuse a token list of the wrong length when the same hash ID is requested with different size values (e.g., full block vs final partial block).

Proposed fix
-    block_cache: dict[int, list[int]] = field(default_factory=dict)
+    block_cache: dict[tuple[int, int], list[int]] = field(default_factory=dict)
@@
-    def get_block_tokens(hash_id: int, size: int) -> list[int]:
-        if hash_id in block_cache:
-            return block_cache[hash_id]
+    def get_block_tokens(hash_id: int, size: int) -> list[int]:
+        cache_key = (hash_id, size)
+        if cache_key in block_cache:
+            return block_cache[cache_key]
         hash_rng.reseed_for_hash_id(hash_id)
         tokens = sample_tokens(corpus, size, hash_rng, sep_token)
-        block_cache[hash_id] = tokens
+        block_cache[cache_key] = tokens
         return tokens

Also applies to: 120-126

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/loader/parallel_convert.py` at line 57, The block_cache
currently uses only hash_id as a key (field block_cache: dict[int, list[int]])
which causes incorrect reuse when the same hash_id is requested with different
token block sizes; update the cache to key by both hash_id and size (e.g., use a
tuple key like (hash_id, size) or a combined unique key) and adjust all access
points (including where block_cache is read/written around the code referenced
at lines ~120-126) to use the new composite key so cached token lists are
segregated by requested block size.

Comment on lines +132 to +149
if trace.get("text_input"):
# Literal prompt provided by the trace (no generation needed).
prompt = trace["text_input"]
elif trace.get("hash_ids"):
# Generate prompt from hash_id blocks. All blocks are full-sized
# except the last, which gets the remainder tokens.
hash_ids = trace["hash_ids"]
input_length = trace["input_length"]
final_block_size = input_length - (len(hash_ids) - 1) * block_size

tokens: list[int] = []
for i, hid in enumerate(hash_ids):
size = final_block_size if i == len(hash_ids) - 1 else block_size
tokens.extend(get_block_tokens(hid, size))
prompt = decode(tokens, skip_special_tokens=False)
else:
prompt = ""

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Parallel path drops prompt generation when hash_ids is empty/missing.

For traces that only provide input_length, Line 148 returns "" instead of generating text (single-threaded path generates with hash_ids=[]). This causes large datasets (parallel path) to silently produce empty prompts.

Proposed fix
-            elif trace.get("hash_ids"):
+            elif trace.get("hash_ids"):
                 # Generate prompt from hash_id blocks. All blocks are full-sized
                 # except the last, which gets the remainder tokens.
                 hash_ids = trace["hash_ids"]
                 input_length = trace["input_length"]
                 final_block_size = input_length - (len(hash_ids) - 1) * block_size
@@
-            else:
-                prompt = ""
+            elif trace.get("input_length") is not None:
+                input_length = int(trace["input_length"])
+                tokens = sample_tokens(corpus, input_length, hash_rng, sep_token)
+                prompt = decode(tokens, skip_special_tokens=False)
+            else:
+                prompt = ""
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/loader/parallel_convert.py` around lines 132 - 149, The
parallel path currently skips prompt generation when trace lacks text_input and
hash_ids is missing or empty, returning "" — change the branch in the
prompt-building logic to treat a missing/empty trace["hash_ids"] but present
trace["input_length"] as a single block: when trace.get("hash_ids") is falsy but
trace.get("input_length") exists, build a tokens list of length input_length
(one block) by calling get_block_tokens for that single block and then
decode(tokens, skip_special_tokens=False) to create prompt; keep the existing
hash_ids loop behavior when hash_ids is non-empty and preserve use of
block_size, final_block_size, get_block_tokens and decode.

Comment on lines +34 to +1062
@pytest.fixture
def sample_corpus():
"""A small corpus of token IDs for testing."""
return list(range(100, 200))


@pytest.fixture
def sample_corpus_array(sample_corpus):
"""Corpus as numpy int32 array."""
return np.array(sample_corpus, dtype=np.int32)


@pytest.fixture
def mock_tokenizer():
"""Mock tokenizer that decodes tokens into readable strings."""
tok = MagicMock()
tok.decode.side_effect = lambda ids, **kw: " ".join(f"t{i}" for i in ids)
return tok


@pytest.fixture
def setup_worker(sample_corpus_array, mock_tokenizer):
"""Set up module-level _worker_state for _process_batch tests."""
seed = 42
hash_rng = HashIdRandomGenerator(seed, _internal=True)
hash_rng.set_trace_id("test_trace")

parallel_convert_mod._worker_state = _WorkerState(
tokenizer=mock_tokenizer,
corpus=sample_corpus_array,
shm=MagicMock(),
hash_rng=hash_rng,
block_size=10,
sep_token=None,
sample_tokens=sample_tokens_from_corpus,
)
yield
parallel_convert_mod._worker_state = None


@pytest.fixture
def setup_worker_with_sep(sample_corpus_array, mock_tokenizer):
"""Set up worker with a separator token (BOS/EOS)."""
seed = 42
hash_rng = HashIdRandomGenerator(seed, _internal=True)
hash_rng.set_trace_id("test_trace")

parallel_convert_mod._worker_state = _WorkerState(
tokenizer=mock_tokenizer,
corpus=sample_corpus_array,
shm=MagicMock(),
hash_rng=hash_rng,
block_size=10,
sep_token=1,
sample_tokens=sample_tokens_from_corpus,
)
yield
parallel_convert_mod._worker_state = None


# -----------------------------------------------------------------------
# _set_daemon
# -----------------------------------------------------------------------


class TestSetDaemon:
"""Tests for daemon flag manipulation."""

def test_set_daemon_true(self):
"""Setting daemon to True should work on a non-daemon process."""
original = mp.current_process().daemon
try:
_set_daemon(True)
assert mp.current_process().daemon is True
finally:
_set_daemon(original)

def test_set_daemon_false(self):
"""Setting daemon to False should work."""
original = mp.current_process().daemon
try:
_set_daemon(False)
assert mp.current_process().daemon is False
finally:
_set_daemon(original)

def test_set_daemon_fallback_on_assertion_error(self):
"""If daemon= setter raises AssertionError, fallback to _config."""
proc = mp.current_process()
original = proc.daemon

with patch.object(
type(proc), "daemon", property(fset=Mock(side_effect=AssertionError))
):
_set_daemon(True)
assert proc._config["daemon"] is True

# Restore
proc._config["daemon"] = original


# -----------------------------------------------------------------------
# _process_batch
# -----------------------------------------------------------------------


class TestProcessBatch:
"""Tests for _process_batch worker function."""

def test_text_input_traces(self, setup_worker):
"""Traces with text_input should use the literal text."""
batch = [
(
"session-1",
[
{
"text_input": "Hello world",
"timestamp": 100,
"delay": None,
"output_length": 10,
},
],
),
]
results = _process_batch(batch)

assert len(results) == 1
sid, turns = results[0]
assert sid == "session-1"
assert len(turns) == 1
ts, delay, prompt, max_tokens = turns[0]
assert prompt == "Hello world"
assert ts == 100
assert max_tokens == 10

def test_hash_ids_traces(self, setup_worker, mock_tokenizer):
"""Traces with hash_ids should generate tokens and decode."""
batch = [
(
"session-1",
[
{
"hash_ids": [1, 2],
"input_length": 15,
"timestamp": 200,
"delay": 5,
"output_length": 20,
},
],
),
]
results = _process_batch(batch)

assert len(results) == 1
sid, turns = results[0]
assert sid == "session-1"
assert len(turns) == 1
ts, delay, prompt, max_tokens = turns[0]
assert ts == 200
assert delay == 5
assert max_tokens == 20
# decode was called with generated tokens
mock_tokenizer.decode.assert_called_once()
assert isinstance(prompt, str)
assert len(prompt) > 0

def test_empty_trace_no_hash_ids_no_text(self, setup_worker):
"""Traces without hash_ids or text_input produce empty prompt."""
batch = [
(
"session-1",
[
{
"timestamp": 300,
"delay": None,
"output_length": 5,
"input_length": 0,
},
],
),
]
results = _process_batch(batch)

_, turns = results[0]
_, _, prompt, _ = turns[0]
assert prompt == ""

def test_multiple_sessions_in_batch(self, setup_worker, mock_tokenizer):
"""Multiple sessions in one batch are all processed."""
batch = [
(
"s1",
[
{
"text_input": "prompt A",
"timestamp": 1,
"delay": None,
"output_length": 10,
}
],
),
(
"s2",
[
{
"text_input": "prompt B",
"timestamp": 2,
"delay": None,
"output_length": 20,
}
],
),
(
"s3",
[
{
"text_input": "prompt C",
"timestamp": 3,
"delay": None,
"output_length": 30,
}
],
),
]
results = _process_batch(batch)

assert len(results) == 3
assert results[0][0] == "s1"
assert results[1][0] == "s2"
assert results[2][0] == "s3"
assert results[0][1][0][2] == "prompt A"
assert results[1][1][0][2] == "prompt B"
assert results[2][1][0][2] == "prompt C"

def test_multi_turn_session(self, setup_worker, mock_tokenizer):
"""A session with multiple turns (traces) processes all turns."""
batch = [
(
"session-1",
[
{
"text_input": "turn 1",
"timestamp": 100,
"delay": None,
"output_length": 10,
},
{
"text_input": "turn 2",
"timestamp": 200,
"delay": 50,
"output_length": 20,
},
{
"text_input": "turn 3",
"timestamp": 300,
"delay": 100,
"output_length": 30,
},
],
),
]
results = _process_batch(batch)

_, turns = results[0]
assert len(turns) == 3
assert turns[0][2] == "turn 1"
assert turns[1][2] == "turn 2"
assert turns[2][2] == "turn 3"

def test_hash_id_block_cache_reuse(self, setup_worker, mock_tokenizer):
"""Same hash_id within a batch should reuse cached tokens."""
batch = [
(
"s1",
[
{
"hash_ids": [42],
"input_length": 10,
"timestamp": 1,
"delay": None,
"output_length": 5,
},
],
),
(
"s2",
[
{
"hash_ids": [42],
"input_length": 10,
"timestamp": 2,
"delay": None,
"output_length": 5,
},
],
),
]
results = _process_batch(batch)

# Both sessions with same hash_id should get same decoded output
prompt_1 = results[0][1][0][2]
prompt_2 = results[1][1][0][2]
assert prompt_1 == prompt_2

def test_different_hash_ids_produce_different_prompts(
self, setup_worker, mock_tokenizer
):
"""Different hash_ids should produce different token sequences."""
batch = [
(
"s1",
[
{
"hash_ids": [100],
"input_length": 10,
"timestamp": 1,
"delay": None,
"output_length": 5,
},
],
),
(
"s2",
[
{
"hash_ids": [200],
"input_length": 10,
"timestamp": 2,
"delay": None,
"output_length": 5,
},
],
),
]
results = _process_batch(batch)

prompt_1 = results[0][1][0][2]
prompt_2 = results[1][1][0][2]
assert prompt_1 != prompt_2

def test_final_block_size_calculation(self, setup_worker, mock_tokenizer):
"""Last hash block should get the remainder tokens."""
# block_size=10, input_length=25, 3 hash_ids
# first two blocks: 10 tokens each, last block: 25 - 2*10 = 5 tokens
batch = [
(
"s1",
[
{
"hash_ids": [1, 2, 3],
"input_length": 25,
"timestamp": 1,
"delay": None,
"output_length": 5,
},
],
),
]
_process_batch(batch)

# Verify decode was called (tokens were generated)
assert mock_tokenizer.decode.call_count == 1
decoded_tokens = mock_tokenizer.decode.call_args[0][0]
# 10 + 10 + 5 = 25 total tokens
assert len(decoded_tokens) == 25

def test_separator_token_prepended(self, setup_worker_with_sep, mock_tokenizer):
"""When sep_token is set, each block should have it prepended."""
batch = [
(
"s1",
[
{
"hash_ids": [1],
"input_length": 10,
"timestamp": 1,
"delay": None,
"output_length": 5,
},
],
),
]
_process_batch(batch)

decoded_tokens = mock_tokenizer.decode.call_args[0][0]
# With sep_token=1, first token in block should be 1
assert decoded_tokens[0] == 1

def test_mixed_text_input_and_hash_ids(self, setup_worker, mock_tokenizer):
"""Session with both text_input and hash_id traces."""
batch = [
(
"s1",
[
{
"text_input": "literal text",
"timestamp": 1,
"delay": None,
"output_length": 10,
},
{
"hash_ids": [5],
"input_length": 10,
"timestamp": 2,
"delay": None,
"output_length": 20,
},
],
),
]
results = _process_batch(batch)

_, turns = results[0]
assert turns[0][2] == "literal text"
assert turns[1][2] != "literal text" # Generated prompt

def test_none_fields_preserved(self, setup_worker):
"""None values for timestamp, delay, output_length are preserved."""
batch = [
(
"s1",
[
{
"text_input": "test",
"timestamp": None,
"delay": None,
"output_length": None,
},
],
),
]
results = _process_batch(batch)

ts, delay, _, max_tokens = results[0][1][0]
assert ts is None
assert delay is None
assert max_tokens is None


# -----------------------------------------------------------------------
# _init_worker
# -----------------------------------------------------------------------


class TestInitWorker:
"""Tests for _init_worker function."""

def test_init_worker_sets_up_state(self, sample_corpus_array, tmp_path):
"""_init_worker should populate _worker dict with all required fields."""
from multiprocessing import shared_memory

shm = shared_memory.SharedMemory(create=True, size=sample_corpus_array.nbytes)
try:
np.copyto(
np.ndarray(
sample_corpus_array.shape,
dtype=sample_corpus_array.dtype,
buffer=shm.buf,
),
sample_corpus_array,
)

mock_tok = MagicMock()
args = _WorkerInitArgs(
shm_name=shm.name,
corpus_len=len(sample_corpus_array),
tokenizer_name="test-model",
base_seed=42,
block_size=10,
sep_token=1,
trace_id="abc123",
)
with patch(
"aiperf.common.tokenizer.Tokenizer.from_pretrained",
return_value=mock_tok,
):
_init_worker(args)

state = parallel_convert_mod._worker_state
assert state is not None
assert state.tokenizer is mock_tok
assert state.block_size == 10
assert state.sep_token == 1
assert isinstance(state.hash_rng, HashIdRandomGenerator)
assert np.array_equal(state.corpus, sample_corpus_array)
finally:
parallel_convert_mod._worker_state = None
shm.close()
shm.unlink()

def test_init_worker_passes_tokenizer_config(self, sample_corpus_array):
"""_init_worker should forward trust_remote_code and revision to Tokenizer."""
from multiprocessing import shared_memory

shm = shared_memory.SharedMemory(create=True, size=sample_corpus_array.nbytes)
try:
np.copyto(
np.ndarray(
sample_corpus_array.shape,
dtype=sample_corpus_array.dtype,
buffer=shm.buf,
),
sample_corpus_array,
)

args = _WorkerInitArgs(
shm_name=shm.name,
corpus_len=len(sample_corpus_array),
tokenizer_name="test-model",
base_seed=42,
block_size=10,
sep_token=None,
trace_id="abc",
trust_remote_code=True,
revision="v2.0",
)
with patch(
"aiperf.common.tokenizer.Tokenizer.from_pretrained",
return_value=MagicMock(),
) as mock_from_pretrained:
_init_worker(args)

mock_from_pretrained.assert_called_once_with(
"test-model",
trust_remote_code=True,
revision="v2.0",
resolve_alias=False,
)
finally:
parallel_convert_mod._worker_state = None
shm.close()
shm.unlink()

def test_init_worker_default_tokenizer_config(self, sample_corpus_array):
"""_init_worker defaults to trust_remote_code=False and revision='main'."""
from multiprocessing import shared_memory

shm = shared_memory.SharedMemory(create=True, size=sample_corpus_array.nbytes)
try:
np.copyto(
np.ndarray(
sample_corpus_array.shape,
dtype=sample_corpus_array.dtype,
buffer=shm.buf,
),
sample_corpus_array,
)

args = _WorkerInitArgs(
shm_name=shm.name,
corpus_len=len(sample_corpus_array),
tokenizer_name="test-model",
base_seed=42,
block_size=10,
sep_token=None,
trace_id="abc",
)
with patch(
"aiperf.common.tokenizer.Tokenizer.from_pretrained",
return_value=MagicMock(),
) as mock_from_pretrained:
_init_worker(args)

mock_from_pretrained.assert_called_once_with(
"test-model",
trust_remote_code=False,
revision="main",
resolve_alias=False,
)
finally:
parallel_convert_mod._worker_state = None
shm.close()
shm.unlink()

def test_init_worker_sets_offline_env(self, sample_corpus_array):
"""Worker should set HF offline environment variables."""
import os
from multiprocessing import shared_memory

original_hf = os.environ.get("HF_HUB_OFFLINE")
original_tf = os.environ.get("TRANSFORMERS_OFFLINE")

shm = shared_memory.SharedMemory(create=True, size=sample_corpus_array.nbytes)
try:
np.copyto(
np.ndarray(
sample_corpus_array.shape,
dtype=sample_corpus_array.dtype,
buffer=shm.buf,
),
sample_corpus_array,
)

args = _WorkerInitArgs(
shm_name=shm.name,
corpus_len=len(sample_corpus_array),
tokenizer_name="test-model",
base_seed=42,
block_size=10,
sep_token=None,
trace_id="abc",
)
with patch(
"aiperf.common.tokenizer.Tokenizer.from_pretrained",
return_value=MagicMock(),
):
_init_worker(args)

assert os.environ.get("HF_HUB_OFFLINE") == "1"
assert os.environ.get("TRANSFORMERS_OFFLINE") == "1"
finally:
parallel_convert_mod._worker_state = None
shm.close()
shm.unlink()
# Restore env
if original_hf is None:
os.environ.pop("HF_HUB_OFFLINE", None)
else:
os.environ["HF_HUB_OFFLINE"] = original_hf
if original_tf is None:
os.environ.pop("TRANSFORMERS_OFFLINE", None)
else:
os.environ["TRANSFORMERS_OFFLINE"] = original_tf


# -----------------------------------------------------------------------
# parallel_convert — end-to-end
# -----------------------------------------------------------------------


class TestParallelConvert:
"""Tests for the parallel_convert orchestration function."""

def test_empty_sessions_returns_empty(self, sample_corpus):
"""Empty input returns empty output."""
result = list(
parallel_convert(
sessions=[],
tokenizer_name="test",
corpus=sample_corpus,
base_seed=42,
block_size=10,
sep_token=None,
trace_id="test",
)
)
assert result == []

def test_returns_conversation_objects(self, sample_corpus):
"""Output should be a list of Conversation objects."""
sessions = [
(
"s1",
[
{
"text_input": "hello",
"timestamp": 1,
"delay": None,
"output_length": 5,
}
],
),
]

with patch("aiperf.dataset.loader.parallel_convert.Pool") as MockPool:
mock_pool_instance = MagicMock()
MockPool.return_value.__enter__ = Mock(return_value=mock_pool_instance)
MockPool.return_value.__exit__ = Mock(return_value=False)

mock_pool_instance.imap.return_value = [
[("s1", [(1, None, "hello", 5)])],
]

result = list(
parallel_convert(
sessions=sessions,
tokenizer_name="test",
corpus=sample_corpus,
base_seed=42,
block_size=10,
sep_token=None,
trace_id="test",
)
)

assert len(result) == 1
assert isinstance(result[0], Conversation)
assert result[0].session_id == "s1"
assert len(result[0].turns) == 1
assert result[0].turns[0].timestamp == 1
assert result[0].turns[0].max_tokens == 5

def test_batching_splits_sessions(self, sample_corpus):
"""Sessions should be split into batches of batch_size."""
sessions = [
(
f"s{i}",
[
{
"text_input": f"p{i}",
"timestamp": i,
"delay": None,
"output_length": 5,
}
],
)
for i in range(5)
]

with patch("aiperf.dataset.loader.parallel_convert.Pool") as MockPool:
mock_pool_instance = MagicMock()
MockPool.return_value.__enter__ = Mock(return_value=mock_pool_instance)
MockPool.return_value.__exit__ = Mock(return_value=False)

mock_pool_instance.imap.return_value = [
[(f"s{i}", [(i, None, f"p{i}", 5)]) for i in range(2)],
[(f"s{i}", [(i, None, f"p{i}", 5)]) for i in range(2, 5)],
]

list(
parallel_convert(
sessions=sessions,
tokenizer_name="test",
corpus=sample_corpus,
base_seed=42,
block_size=10,
sep_token=None,
trace_id="test",
batch_size=2,
)
)

# map was called with batches of size 2
batches = mock_pool_instance.imap.call_args[0][1]
assert len(batches) == 3 # 5 sessions / 2 batch_size = 3 batches
assert len(batches[0]) == 2
assert len(batches[1]) == 2
assert len(batches[2]) == 1

def test_daemon_flag_restored(self, sample_corpus):
"""Daemon flag should be restored after Pool finishes."""
original_daemon = mp.current_process().daemon

with patch("aiperf.dataset.loader.parallel_convert.Pool") as MockPool:
mock_pool_instance = MagicMock()
MockPool.return_value.__enter__ = Mock(return_value=mock_pool_instance)
MockPool.return_value.__exit__ = Mock(return_value=False)
mock_pool_instance.imap.return_value = []

list(
parallel_convert(
sessions=[
(
"s1",
[
{
"text_input": "t",
"timestamp": 1,
"delay": None,
"output_length": 1,
}
],
)
],
tokenizer_name="test",
corpus=sample_corpus,
base_seed=42,
block_size=10,
sep_token=None,
trace_id="test",
)
)

assert mp.current_process().daemon == original_daemon

def test_shared_memory_cleanup(self, sample_corpus):
"""Shared memory should be cleaned up even on errors."""
with patch("aiperf.dataset.loader.parallel_convert.Pool") as MockPool:
mock_pool_instance = MagicMock()
MockPool.return_value.__enter__ = Mock(return_value=mock_pool_instance)
MockPool.return_value.__exit__ = Mock(return_value=False)
mock_pool_instance.imap.side_effect = RuntimeError("Pool error")

with pytest.raises(RuntimeError, match="Pool error"):
list(
parallel_convert(
sessions=[
(
"s1",
[
{
"text_input": "t",
"timestamp": 1,
"delay": None,
"output_length": 1,
}
],
)
],
tokenizer_name="test",
corpus=sample_corpus,
base_seed=42,
block_size=10,
sep_token=None,
trace_id="test",
)
)

# No leaked shared memory (if it leaked, subsequent tests would detect it)

def test_multi_turn_conversations(self, sample_corpus):
"""Sessions with multiple turns should produce multi-turn Conversations."""
with patch("aiperf.dataset.loader.parallel_convert.Pool") as MockPool:
mock_pool_instance = MagicMock()
MockPool.return_value.__enter__ = Mock(return_value=mock_pool_instance)
MockPool.return_value.__exit__ = Mock(return_value=False)

mock_pool_instance.imap.return_value = [
[("s1", [(100, None, "turn 1", 10), (200, 50, "turn 2", 20)])],
]

result = list(
parallel_convert(
sessions=[
(
"s1",
[
{
"text_input": "turn 1",
"timestamp": 100,
"delay": None,
"output_length": 10,
},
{
"text_input": "turn 2",
"timestamp": 200,
"delay": 50,
"output_length": 20,
},
],
)
],
tokenizer_name="test",
corpus=sample_corpus,
base_seed=42,
block_size=10,
sep_token=None,
trace_id="test",
)
)

assert len(result) == 1
conv = result[0]
assert len(conv.turns) == 2
assert conv.turns[0].timestamp == 100
assert conv.turns[0].delay is None
assert conv.turns[0].max_tokens == 10
assert conv.turns[1].timestamp == 200
assert conv.turns[1].delay == 50
assert conv.turns[1].max_tokens == 20

def test_pool_receives_correct_init_args(self, sample_corpus):
"""Pool should be initialized with correct arguments."""
with patch("aiperf.dataset.loader.parallel_convert.Pool") as MockPool:
mock_pool_instance = MagicMock()
MockPool.return_value.__enter__ = Mock(return_value=mock_pool_instance)
MockPool.return_value.__exit__ = Mock(return_value=False)
mock_pool_instance.imap.return_value = []

list(
parallel_convert(
sessions=[
(
"s1",
[
{
"text_input": "t",
"timestamp": 1,
"delay": None,
"output_length": 1,
}
],
)
],
tokenizer_name="my-tokenizer",
corpus=sample_corpus,
base_seed=12345,
block_size=64,
sep_token=7,
trace_id="trace_abc",
num_workers=4,
)
)

call_args = MockPool.call_args
assert call_args[0][0] == 4 # num_workers
assert call_args[0][1] is _init_worker
initargs = call_args[0][2]
assert len(initargs) == 1
args = initargs[0]
assert isinstance(args, _WorkerInitArgs)
assert args.tokenizer_name == "my-tokenizer"
assert args.base_seed == 12345
assert args.block_size == 64
assert args.sep_token == 7
assert args.trace_id == "trace_abc"
assert args.trust_remote_code is False
assert args.revision == "main"

def test_pool_receives_tokenizer_config(self, sample_corpus):
"""Pool init args should include trust_remote_code and revision."""
with patch("aiperf.dataset.loader.parallel_convert.Pool") as MockPool:
mock_pool_instance = MagicMock()
MockPool.return_value.__enter__ = Mock(return_value=mock_pool_instance)
MockPool.return_value.__exit__ = Mock(return_value=False)
mock_pool_instance.imap.return_value = []

list(
parallel_convert(
sessions=[
(
"s1",
[
{
"text_input": "t",
"timestamp": 1,
"delay": None,
"output_length": 1,
}
],
)
],
tokenizer_name="kimi-model",
corpus=sample_corpus,
base_seed=42,
block_size=10,
sep_token=None,
trace_id="test",
trust_remote_code=True,
revision="v2.0",
)
)

args = MockPool.call_args[0][2][0]
assert args.trust_remote_code is True
assert args.revision == "v2.0"


# -----------------------------------------------------------------------
# Determinism: _process_batch produces identical results with same seed
# -----------------------------------------------------------------------


class TestProcessBatchDeterminism:
"""Tests that _process_batch is deterministic across invocations."""

def _setup_and_process(
self, corpus_array, hash_ids, input_length, block_size, trace_id, seed=42
):
"""Helper: set up worker state and process a single batch."""
hash_rng = HashIdRandomGenerator(seed, _internal=True)
hash_rng.set_trace_id(trace_id)

mock_tok = MagicMock()
mock_tok.decode.side_effect = lambda ids, **kw: ",".join(str(i) for i in ids)

parallel_convert_mod._worker_state = _WorkerState(
tokenizer=mock_tok,
corpus=corpus_array,
shm=MagicMock(),
hash_rng=hash_rng,
block_size=block_size,
sep_token=None,
sample_tokens=sample_tokens_from_corpus,
)

batch = [
(
"s1",
[
{
"hash_ids": hash_ids,
"input_length": input_length,
"timestamp": 1,
"delay": None,
"output_length": 5,
}
],
),
]
result = _process_batch(batch)
parallel_convert_mod._worker_state = None
return result[0][1][0][2] # prompt string

def test_same_seed_same_trace_id_same_result(self, sample_corpus_array):
"""Identical seed + trace_id + hash_ids = identical prompt."""
prompt_1 = self._setup_and_process(
sample_corpus_array, [1, 2], 15, 10, "trace_a"
)
prompt_2 = self._setup_and_process(
sample_corpus_array, [1, 2], 15, 10, "trace_a"
)
assert prompt_1 == prompt_2

def test_different_trace_id_different_result(self, sample_corpus_array):
"""Different trace_ids produce different prompts."""
prompt_1 = self._setup_and_process(
sample_corpus_array, [1, 2], 15, 10, "trace_a"
)
prompt_2 = self._setup_and_process(
sample_corpus_array, [1, 2], 15, 10, "trace_b"
)
assert prompt_1 != prompt_2

def test_different_seed_different_result(self, sample_corpus_array):
"""Different seeds produce different prompts."""
prompt_1 = self._setup_and_process(
sample_corpus_array, [1], 10, 10, "trace_a", seed=42
)
prompt_2 = self._setup_and_process(
sample_corpus_array, [1], 10, 10, "trace_a", seed=99
)
assert prompt_1 != prompt_2

def test_different_hash_ids_different_result(self, sample_corpus_array):
"""Different hash_ids produce different prompts."""
prompt_1 = self._setup_and_process(sample_corpus_array, [10], 10, 10, "trace_a")
prompt_2 = self._setup_and_process(sample_corpus_array, [20], 10, 10, "trace_a")
assert prompt_1 != prompt_2
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add required type hints across fixtures, helpers, and tests.

This file introduces many new function definitions without parameter/return annotations. Please type all fixture functions, helper methods, and test methods (at minimum -> None for tests) to align with repo standards.

Example pattern to apply
 `@pytest.fixture`
-def sample_corpus():
+def sample_corpus() -> list[int]:
     return list(range(100, 200))

 class TestSetDaemon:
-    def test_set_daemon_true(self):
+    def test_set_daemon_true(self) -> None:
         ...

 class TestProcessBatchDeterminism:
-    def _setup_and_process(self, corpus_array, hash_ids, input_length, block_size, trace_id, seed=42):
+    def _setup_and_process(
+        self,
+        corpus_array: np.ndarray,
+        hash_ids: list[int],
+        input_length: int,
+        block_size: int,
+        trace_id: str,
+        seed: int = 42,
+    ) -> str:
         ...

As per coding guidelines: "Type hints required on ALL functions (params and return)".

🧰 Tools
🪛 Ruff (0.15.2)

[warning] 50-50: Unused lambda argument: kw

(ARG005)


[warning] 143-143: Unused method argument: setup_worker

(ARG002)


[warning] 164-164: Unpacked variable delay is never used

Prefix it with an underscore or any other dummy variable pattern

(RUF059)


[warning] 169-169: Unused method argument: setup_worker

(ARG002)


[warning] 200-200: Unused method argument: setup_worker

(ARG002)


[warning] 221-221: Unused method argument: setup_worker

(ARG002)


[warning] 221-221: Unused method argument: mock_tokenizer

(ARG002)


[warning] 268-268: Unused method argument: setup_worker

(ARG002)


[warning] 268-268: Unused method argument: mock_tokenizer

(ARG002)


[warning] 303-303: Unused method argument: setup_worker

(ARG002)


[warning] 303-303: Unused method argument: mock_tokenizer

(ARG002)


[warning] 339-339: Unused method argument: setup_worker

(ARG002)


[warning] 339-339: Unused method argument: mock_tokenizer

(ARG002)


[warning] 374-374: Unused method argument: setup_worker

(ARG002)


[warning] 400-400: Unused method argument: setup_worker_with_sep

(ARG002)


[warning] 422-422: Unused method argument: setup_worker

(ARG002)


[warning] 422-422: Unused method argument: mock_tokenizer

(ARG002)


[warning] 450-450: Unused method argument: setup_worker

(ARG002)


[warning] 481-481: Unused method argument: tmp_path

(ARG002)


[warning] 998-998: Unused lambda argument: kw

(ARG005)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/loader/test_parallel_convert.py` around lines 34 - 1062,
Add explicit type hints to all fixtures, helper functions, and test methods:
annotate return types for fixtures like sample_corpus() -> List[int],
sample_corpus_array(sample_corpus: List[int]) -> np.ndarray, mock_tokenizer() ->
MagicMock, setup_worker(...) -> Iterator[None] (or Generator[None, None, None])
and setup_worker_with_sep(...) -> Iterator[None]; mark all test methods (e.g.,
TestSetDaemon.test_set_daemon_true, TestProcessBatch.* etc.) with -> None;
annotate the helper _setup_and_process(self, corpus_array: np.ndarray, hash_ids:
Sequence[int], input_length: int, block_size: int, trace_id: str, seed: int =
42) -> str; add any needed typing imports (List, Sequence, Iterator, Optional)
at the top of the file and adjust parameter types for fixtures/helpers to match
their usage (e.g., sample_corpus_array: np.ndarray, mock_tokenizer: MagicMock)
so every function has full parameter and return annotations.

def mock_tokenizer():
"""Mock tokenizer that decodes tokens into readable strings."""
tok = MagicMock()
tok.decode.side_effect = lambda ids, **kw: " ".join(f"t{i}" for i in ids)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Clean up concrete unused symbols flagged by Ruff.

There are a few unambiguous unused symbols (kw, delay, tmp_path, kw) adding lint noise. Rename/remove them to keep the test module clean and avoid warning churn.

Minimal lint-cleanup diff
-    tok.decode.side_effect = lambda ids, **kw: " ".join(f"t{i}" for i in ids)
+    tok.decode.side_effect = lambda ids, **_kw: " ".join(f"t{i}" for i in ids)

-        ts, delay, prompt, max_tokens = turns[0]
+        ts, _delay, prompt, max_tokens = turns[0]

-    def test_init_worker_sets_up_state(self, sample_corpus_array, tmp_path):
+    def test_init_worker_sets_up_state(self, sample_corpus_array):

-        mock_tok.decode.side_effect = lambda ids, **kw: ",".join(str(i) for i in ids)
+        mock_tok.decode.side_effect = lambda ids, **_kw: ",".join(str(i) for i in ids)

Also applies to: 164-164, 481-481, 998-998

🧰 Tools
🪛 Ruff (0.15.2)

[warning] 50-50: Unused lambda argument: kw

(ARG005)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/loader/test_parallel_convert.py` at line 50, Tests are
triggering Ruff unused-symbol warnings for unused parameters; update the unused
names to use underscore-prefixed or anonymous names so linters ignore them.
Specifically, change the tok.decode side_effect from lambda ids, **kw: ... to
lambda ids, **_: ... (or lambda ids, **_kw: ...), and rename any unused
function/test parameters like delay, tmp_path, kw to _delay, _tmp_path, or _ (or
remove them if safe) in the same test file so the symbols are no longer flagged
(look for occurrences around the provided examples and the other mentioned
lines).

Comment on lines +1000 to +1026
parallel_convert_mod._worker_state = _WorkerState(
tokenizer=mock_tok,
corpus=corpus_array,
shm=MagicMock(),
hash_rng=hash_rng,
block_size=block_size,
sep_token=None,
sample_tokens=sample_tokens_from_corpus,
)

batch = [
(
"s1",
[
{
"hash_ids": hash_ids,
"input_length": input_length,
"timestamp": 1,
"delay": None,
"output_length": 5,
}
],
),
]
result = _process_batch(batch)
parallel_convert_mod._worker_state = None
return result[0][1][0][2] # prompt string
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Guarantee _worker_state teardown with finally.

parallel_convert_mod._worker_state is reset only on the success path. If _process_batch(batch) raises, global state leaks into subsequent tests and can cascade failures.

Safer teardown pattern
         parallel_convert_mod._worker_state = _WorkerState(
             tokenizer=mock_tok,
             corpus=corpus_array,
             shm=MagicMock(),
             hash_rng=hash_rng,
             block_size=block_size,
             sep_token=None,
             sample_tokens=sample_tokens_from_corpus,
         )

-        batch = [
-            (
-                "s1",
-                [
-                    {
-                        "hash_ids": hash_ids,
-                        "input_length": input_length,
-                        "timestamp": 1,
-                        "delay": None,
-                        "output_length": 5,
-                    }
-                ],
-            ),
-        ]
-        result = _process_batch(batch)
-        parallel_convert_mod._worker_state = None
+        try:
+            batch = [
+                (
+                    "s1",
+                    [
+                        {
+                            "hash_ids": hash_ids,
+                            "input_length": input_length,
+                            "timestamp": 1,
+                            "delay": None,
+                            "output_length": 5,
+                        }
+                    ],
+                ),
+            ]
+            result = _process_batch(batch)
+        finally:
+            parallel_convert_mod._worker_state = None
         return result[0][1][0][2]  # prompt string
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/loader/test_parallel_convert.py` around lines 1000 - 1026,
The test sets parallel_convert_mod._worker_state directly but only clears it on
the success path; wrap the assignment of parallel_convert_mod._worker_state and
the call to _process_batch(batch) in a try/finally so the global is always reset
to None, i.e., assign parallel_convert_mod._worker_state = _WorkerState(...)
before calling _process_batch(batch) inside a try block and put
parallel_convert_mod._worker_state = None in the finally block to guarantee
teardown even if _process_batch raises.

…code, and implicit exception chains

cause_chain only followed __cause__ (explicit chaining), missing __context__
(implicit chaining) used by transformers. Added ImportError to detection since
transformers raises it instead of ModuleNotFoundError for missing packages like
tiktoken. Added trust_remote_code detection. Fallback now reproduces the load
and prints the full traceback so unrecognized errors are still diagnosable.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
tests/unit/common/test_tokenizer_display.py (1)

181-194: Consider a regression case for multi-package ImportError messages.

Please add a case like "requires the following packages ...: tiktoken, sentencepiece" so parser behavior for comma-separated packages stays validated.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/common/test_tokenizer_display.py` around lines 181 - 194, Add a
regression test entry in tests/unit/common/test_tokenizer_display.py alongside
the existing tuples (the ImportError / transformers-style cases) that uses an
ImportError message containing multiple comma-separated packages (e.g.,
"requires the following packages that were not found in your environment:
tiktoken, sentencepiece") and assert the parser extracts both package names and
a combined display string; update the expected tuple to mirror the other cases'
shape (e.g., error type list ["TokenizerError","ImportError"], message substring
"requires the following packages ...: tiktoken, sentencepiece", display "Missing
Package: tiktoken, sentencepiece", and suggestion list including both package
names and a pip install suggestion like ["tiktoken","sentencepiece","pip install
tiktoken sentencepiece"]) so the parser behavior for comma-separated packages is
validated.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/aiperf/common/tokenizer_display.py`:
- Around line 183-194: The package-extraction logic in the
ModuleNotFoundError/ImportError branch incorrectly splits on "." and can produce
invalid pip commands for multi-package messages; update the code that processes
match.group(1) (inside the block that returns TokenizerErrorInsight and uses
_MISSING_PACKAGE_PATTERNS) to first split the captured string on
commas/whitespace/semicolons/“and”, strip surrounding punctuation from each
token, normalize each token to its top-level name (take substring before any
"."), deduplicate, then join tokens with spaces for the pip install string (and
use the same normalized display for title/causes/investigation/fixes) so the
produced "pip install" command is valid for multiple packages.
- Around line 296-306: The diagnostic helper _reproduce_traceback currently
forces AutoTokenizer.from_pretrained(..., trust_remote_code=True); change it to
default to trust_remote_code=False and only set True when an explicit opt-in
environment variable (e.g., AI_PERF_TRUST_REMOTE_CODE_DIAGNOSTIC) is present;
modify _reproduce_traceback to read that env var, pass its boolean value to
AutoTokenizer.from_pretrained, and update the call site that invokes
_reproduce_traceback to preserve behavior but not override the new opt-in logic.

---

Nitpick comments:
In `@tests/unit/common/test_tokenizer_display.py`:
- Around line 181-194: Add a regression test entry in
tests/unit/common/test_tokenizer_display.py alongside the existing tuples (the
ImportError / transformers-style cases) that uses an ImportError message
containing multiple comma-separated packages (e.g., "requires the following
packages that were not found in your environment: tiktoken, sentencepiece") and
assert the parser extracts both package names and a combined display string;
update the expected tuple to mirror the other cases' shape (e.g., error type
list ["TokenizerError","ImportError"], message substring "requires the following
packages ...: tiktoken, sentencepiece", display "Missing Package: tiktoken,
sentencepiece", and suggestion list including both package names and a pip
install suggestion like ["tiktoken","sentencepiece","pip install tiktoken
sentencepiece"]) so the parser behavior for comma-separated packages is
validated.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: acbb9394-6243-4b85-b7ad-ceb6c9198a76

📥 Commits

Reviewing files that changed from the base of the PR and between 8db6e96 and 70e40fa.

📒 Files selected for processing (5)
  • src/aiperf/common/models/error_models.py
  • src/aiperf/common/tokenizer_display.py
  • src/aiperf/controller/controller_utils.py
  • tests/unit/common/models/test_error_models.py
  • tests/unit/common/test_tokenizer_display.py

Comment on lines +183 to +194
if type_name in ("ModuleNotFoundError", "ImportError") and error_message:
for pattern in _MISSING_PACKAGE_PATTERNS:
if match := pattern.search(error_message):
packages = match.group(1).split(".")[0].strip().rstrip(".")
return TokenizerErrorInsight(
title=f"Missing Package: {packages}",
causes=[
f"The [cyan]{packages}[/cyan] package is not installed"
],
investigation=[f"Check: [cyan]pip show {packages}[/cyan]"],
fixes=[f"Install: [green]pip install {packages}[/green]"],
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Package extraction can emit invalid install commands for multi-package errors.

Line 186 only splits on ".". For ImportError messages listing multiple packages (e.g., tiktoken, sentencepiece), this can produce pip install tiktoken, sentencepiece, which is invalid syntax for pip.

🔧 Suggested parser fix
         if type_name in ("ModuleNotFoundError", "ImportError") and error_message:
             for pattern in _MISSING_PACKAGE_PATTERNS:
                 if match := pattern.search(error_message):
-                    packages = match.group(1).split(".")[0].strip().rstrip(".")
+                    raw = match.group(1).strip().rstrip(".")
+                    packages = [
+                        token.split(".")[0].strip()
+                        for token in re.split(r"[,\s]+", raw)
+                        if token.strip()
+                    ]
+                    package = packages[0]
+                    install_args = " ".join(packages)
                     return TokenizerErrorInsight(
-                        title=f"Missing Package: {packages}",
+                        title=f"Missing Package: {package}",
                         causes=[
-                            f"The [cyan]{packages}[/cyan] package is not installed"
+                            f"The [cyan]{package}[/cyan] package is not installed"
                         ],
-                        investigation=[f"Check: [cyan]pip show {packages}[/cyan]"],
-                        fixes=[f"Install: [green]pip install {packages}[/green]"],
+                        investigation=[f"Check: [cyan]pip show {package}[/cyan]"],
+                        fixes=[f"Install: [green]pip install {install_args}[/green]"],
                     )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/common/tokenizer_display.py` around lines 183 - 194, The
package-extraction logic in the ModuleNotFoundError/ImportError branch
incorrectly splits on "." and can produce invalid pip commands for multi-package
messages; update the code that processes match.group(1) (inside the block that
returns TokenizerErrorInsight and uses _MISSING_PACKAGE_PATTERNS) to first split
the captured string on commas/whitespace/semicolons/“and”, strip surrounding
punctuation from each token, normalize each token to its top-level name (take
substring before any "."), deduplicate, then join tokens with spaces for the pip
install string (and use the same normalized display for
title/causes/investigation/fixes) so the produced "pip install" command is valid
for multiple packages.

Comment on lines +296 to +306
def _reproduce_traceback(name: str) -> str | None:
"""Try loading the tokenizer to capture the full traceback for diagnostics."""
import traceback

try:
from transformers import AutoTokenizer

AutoTokenizer.from_pretrained(name, trust_remote_code=True)
except Exception:
return traceback.format_exc()
return None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# Read the function and surrounding context
cat -n src/aiperf/common/tokenizer_display.py | sed -n '290,310p'

Repository: ai-dynamo/aiperf

Length of output: 753


🏁 Script executed:

# Read the call site
cat -n src/aiperf/common/tokenizer_display.py | sed -n '330,340p'

Repository: ai-dynamo/aiperf

Length of output: 429


🏁 Script executed:

# Check the full function signature and implementation
cat -n src/aiperf/common/tokenizer_display.py | sed -n '296,310p'

Repository: ai-dynamo/aiperf

Length of output: 578


Do not force trust_remote_code=True in the diagnostic fallback path.

Line 303 executes AutoTokenizer.from_pretrained(..., trust_remote_code=True) during error display, which runs untrusted remote tokenizer code without user consent and introduces network/cache side effects solely for rendering diagnostics.

Make trust_remote_code optional and default to False, then require explicit opt-in via environment variable:

Suggested patch
+import os
 import re
 from dataclasses import dataclass
 from typing import TYPE_CHECKING
@@
-def _reproduce_traceback(name: str) -> str | None:
+def _reproduce_traceback(
+    name: str, *, trust_remote_code: bool = False
+) -> str | None:
     """Try loading the tokenizer to capture the full traceback for diagnostics."""
     import traceback
@@
-        AutoTokenizer.from_pretrained(name, trust_remote_code=True)
+        AutoTokenizer.from_pretrained(
+            name,
+            trust_remote_code=trust_remote_code,
+        )
@@
-    if is_fallback and name != "<unknown>":
-        tb = _reproduce_traceback(name)
+    if (
+        is_fallback
+        and name != "<unknown>"
+        and os.getenv("AIPERF_TOKENIZER_TRACEBACK_REPRO") == "1"
+    ):
+        tb = _reproduce_traceback(name, trust_remote_code=False)
         if tb:
             content += f"\n\n[bold]Traceback:[/bold]\n[dim]{tb.strip()}[/dim]"

Applies to lines 296–306 and call site 333–336.

🧰 Tools
🪛 Ruff (0.15.2)

[warning] 304-304: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/common/tokenizer_display.py` around lines 296 - 306, The
diagnostic helper _reproduce_traceback currently forces
AutoTokenizer.from_pretrained(..., trust_remote_code=True); change it to default
to trust_remote_code=False and only set True when an explicit opt-in environment
variable (e.g., AI_PERF_TRUST_REMOTE_CODE_DIAGNOSTIC) is present; modify
_reproduce_traceback to read that env var, pass its boolean value to
AutoTokenizer.from_pretrained, and update the call site that invokes
_reproduce_traceback to preserve behavior but not override the new opt-in logic.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
demo_coding_content.py (1)

93-94: Private attribute access is fragile.

Accessing gen._tool_pool directly couples this demo to the internal implementation of CodingContentGenerator. If this attribute is renamed or removed, the demo will break silently. Consider adding a public property or accepting this as an acknowledged fragility for a demo script.

Option: Add a defensive check
     print("Building generator (this tokenizes the pool, takes a few seconds)...")
     gen = make_generator()
-    pool_tokens = len(gen._tool_pool)
-    print(f"Tool pool: {pool_tokens:,} tokens\n")
+    pool_tokens = len(getattr(gen, "_tool_pool", []))
+    if pool_tokens:
+        print(f"Tool pool: {pool_tokens:,} tokens\n")
+    else:
+        print("Warning: Could not access tool pool size\n")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@demo_coding_content.py` around lines 93 - 94, The demo is directly accessing
the private attribute gen._tool_pool (fragile); update the demo to use a public
accessor on CodingContentGenerator (e.g., a property or method like
tool_pool_tokens or get_tool_pool_tokens) or, if you cannot change the class,
replace the direct access with a defensive getattr/hasattr check that falls back
gracefully (e.g., compute token count only if hasattr(gen, "tool_pool") or
getattr(gen, "_tool_pool", None) is not None) and log a clear message when the
information is unavailable; reference gen and CodingContentGenerator and avoid
relying on _tool_pool internals.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@demo_coding_content.py`:
- Around line 93-94: The demo is directly accessing the private attribute
gen._tool_pool (fragile); update the demo to use a public accessor on
CodingContentGenerator (e.g., a property or method like tool_pool_tokens or
get_tool_pool_tokens) or, if you cannot change the class, replace the direct
access with a defensive getattr/hasattr check that falls back gracefully (e.g.,
compute token count only if hasattr(gen, "tool_pool") or getattr(gen,
"_tool_pool", None) is not None) and log a clear message when the information is
unavailable; reference gen and CodingContentGenerator and avoid relying on
_tool_pool internals.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 5a993173-ebf0-4cba-a8bd-5d9d6b2dbe45

📥 Commits

Reviewing files that changed from the base of the PR and between 70e40fa and 385488b.

📒 Files selected for processing (5)
  • demo_coding_content.py
  • docs/cli_options.md
  • pyproject.toml
  • src/aiperf/common/tokenizer.py
  • tests/unit/common/test_tokenizer_kwarg_overrides.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • tests/unit/common/test_tokenizer_kwarg_overrides.py
  • src/aiperf/common/tokenizer.py

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/aiperf/workers/worker.py (1)

700-710: Broad exception catch is acceptable here given the safe fallback behavior.

The static analysis flags except Exception (BLE001), but in this context it's defensible: tokenizer operations can fail for numerous reasons (network issues, model not found, tokenization errors), and the fallback to original text is a safe no-op that doesn't corrupt data or lose information. The debug log captures the failure for observability.

If you want to be more explicit, you could catch the known exceptions from Tokenizer:

🔧 Optional: More specific exception handling
+from aiperf.common.tokenizer import Tokenizer, TokenizerError, AmbiguousTokenizerNameError
...
         try:
             tokenizer = await self._get_history_tokenizer(model_name)
             token_ids = tokenizer.encode(response_text)
-        except Exception as exc:
+        except (TokenizerError, AmbiguousTokenizerNameError, NotInitializedError) as exc:
             err_repr = repr(exc)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/workers/worker.py` around lines 700 - 710, The broad except in the
_get_history_tokenizer / tokenizer.encode block is flagged by BLE001; leave the
safe fallback behavior but either narrow the catch to tokenizer-specific
exceptions if available (e.g., TokenizerError) or explicitly silence the linter
and document the rationale: add a comment explaining the defensive catch and
append a "# noqa: BLE001" to the "except Exception as exc:" line so static
analysis is satisfied while keeping the debug log and return response_text
fallback intact.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/aiperf/workers/worker.py`:
- Around line 700-710: The broad except in the _get_history_tokenizer /
tokenizer.encode block is flagged by BLE001; leave the safe fallback behavior
but either narrow the catch to tokenizer-specific exceptions if available (e.g.,
TokenizerError) or explicitly silence the linter and document the rationale: add
a comment explaining the defensive catch and append a "# noqa: BLE001" to the
"except Exception as exc:" line so static analysis is satisfied while keeping
the debug log and return response_text fallback intact.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: c26b5690-e3d9-4f96-94ec-a8779aa5879a

📥 Commits

Reviewing files that changed from the base of the PR and between ac3d916 and b1dd72f.

📒 Files selected for processing (2)
  • src/aiperf/workers/worker.py
  • tests/unit/workers/test_worker.py

…n benchmarks

Add --pre-generate-responses flag that builds Anthropic-format tool-use
conversations (Read, Edit, Bash, Grep, Glob, Write) as assistant responses
in multi-turn datasets. Responses are stored as raw_messages on subsequent
turns, providing deterministic multi-turn context with realistic coding
content. Works with mooncake traces (hash-block prompts + token-count
responses) and synthetic datasets.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
…ponses

Switch generate_response() default from Anthropic content-blocks to OpenAI
tool_calls format since most benchmarking targets are OpenAI-compatible.
Anthropic format available via use_content_blocks=True for the in-progress
Anthropic endpoint. Both formats tested.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
… data

Style weights (85% silent, 10% text+tool, 5% parallel), tool distribution
(85% Bash, 5% Read, 3% Task, 2% Write/Edit, 1% Grep/Glob/TodoWrite),
realistic tool IDs (toolu_01 + 20 chars), and Bash description field all
derived from 1,482 captured API calls. Adds Task, TodoWrite, and Glob tools.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant