Skip to content

feat(dataset): add Conflux trace loader with subagent orchestration#767

Open
ajcasagrande wants to merge 7 commits intomainfrom
ajc/conflux-data-loader
Open

feat(dataset): add Conflux trace loader with subagent orchestration#767
ajcasagrande wants to merge 7 commits intomainfrom
ajc/conflux-data-loader

Conversation

@ajcasagrande
Copy link
Contributor

@ajcasagrande ajcasagrande commented Mar 17, 2026

Summary

  • Add end-to-end support for replaying Conflux proxy captures that contain parent + subagent conversation hierarchies, preserving the original spawn/join timing relationships

  • Introduce a message normalizer that converts Anthropic and OpenAI message formats into a canonical internal representation, enabling format-agnostic trace replay

  • Add a composable SubagentOrchestrator that coordinates multi-turn subagent spawns and joins across all three timing strategies (fixed-schedule, request-rate, user-centric-rate), with join-gate logic that holds parent turns until blocking children complete

    Key components

  • ConfluxLoader — parses Conflux JSONL traces, reconstructs conversation trees with agent-depth metadata, and emits Conversation objects with
    spawn/join annotations

  • MessageNormalizer — bidirectional Anthropic/OpenAI message normalization (tool calls, tool results, system/developer messages, multi-part content blocks)

  • SubagentOrchestrator — manages child conversation lifecycle, prerequisite tracking via TurnPrerequisite/PrerequisiteKind, and dispatch gating so replay fidelity is maintained regardless of server speed

  • SubagentMixin — shared timing-strategy integration for spawn/join awareness

  • Credit system, endpoint, session manager, and worker extensions — thread agent depth, extra params, and ground truth through the full request pipeline

Summary by CodeRabbit

New Features

  • Added support for Conflux Traces to replay AI coding assistant sessions with automatic format detection and explicit configuration.
  • Added --fixed-schedule-speedup parameter to control fixed schedule replay timing (e.g., 2.0× faster, 0.5× slower).
  • Added --conflux-include-utility-calls flag to include unattributed utility calls when loading Conflux traces.

Documentation

  • New tutorial: "Replaying AI Coding Sessions with Conflux Traces" covering concepts, configuration, and output reports.
  • Updated CLI documentation with auto-detection details for Conflux JSON/JSONL formats.

@github-actions github-actions bot added the feat label Mar 17, 2026
@github-actions
Copy link

github-actions bot commented Mar 17, 2026

Try out this PR

Quick install:

pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@aee1250dbf9f800bff1f8e50b1c86b716a334702

Recommended with virtual environment (using uv):

uv venv --python 3.12 && source .venv/bin/activate
uv pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@aee1250dbf9f800bff1f8e50b1c86b716a334702

Last updated for commit: aee1250Browse code

@coderabbitai
Copy link

coderabbitai bot commented Mar 17, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

This pull request adds comprehensive support for replaying Conflux AI coding session traces with subagent orchestration, message normalization across providers, and timing enhancements. Key additions include a ConfluxLoader for parsing and converting Conflux traces to conversations with parent-child relationships, a SubagentOrchestrator for managing spawned agent lifecycles and gated turns, a message normalizer for bidirectional Anthropic/OpenAI translation, and fixed-schedule timing with configurable speedup. Configuration and data models are extended throughout to support these features.

Changes

Cohort / File(s) Summary
Documentation & Configuration
README.md, docs/cli-options.md, docs/tutorials/conflux-trace.md, fern/versions/dev.yml, src/aiperf/cli_commands/analyze_trace.py, src/aiperf/dataset/synthesis/cli.py
Added Conflux trace documentation, CLI options for --fixed-schedule-speedup and --conflux-include-utility-calls, tutorial on replaying AI coding sessions, and updated analyze-trace command description and reporting.
Core Configuration Models
src/aiperf/common/config/config_defaults.py, src/aiperf/common/config/input_config.py, src/aiperf/common/config/user_config.py, src/aiperf/timing/config.py
Added FIXED_SCHEDULE_SPEEDUP default field, validation for conflicting speedup modes, auto-detection logic for fixed-schedule datasets, and timing configuration propagation of speedup values.
Enums & Core Data Models
src/aiperf/common/enums/enums.py, src/aiperf/common/enums/__init__.py, src/aiperf/common/models/dataset_models.py, src/aiperf/common/models/__init__.py
Added PrerequisiteKind enum for spawn-join gating, new model classes (TurnPrerequisite, TurnGroundTruth, SubagentSpawnInfo, ConversationOrigin) with extensive fields supporting subagent tracking and timing metadata.
Credit & Timing Infrastructure
src/aiperf/credit/structs.py, src/aiperf/credit/issuer.py, src/aiperf/credit/callback_handler.py, src/aiperf/timing/phase/credit_counter.py, src/aiperf/timing/phase/progress_tracker.py
Added agent-depth and parent-correlation tracking to Credit/TurnToSend, refactored session-slot logic to exclude child sessions, updated credit callbacks to invoke on_failed_credit and cleanup hooks, and propagated agent-depth through progress tracking.
Conflux Dataset Loading
src/aiperf/dataset/loader/models.py, src/aiperf/dataset/loader/conflux.py, src/aiperf/dataset/loader/__init__.py, src/aiperf/dataset/loader/conflux_analyzer.py
Implemented full ConfluxLoader with record parsing, spawn/join detection, file-based separation, origin tracking; added ConfluxRecord, ConfluxTokens, ConfluxHyperparameters data models; created ConfluxAnalysisStats for trace analysis without full pipeline.
Message Normalization
src/aiperf/dataset/message_normalizer.py
Added comprehensive message/tool normalization and emitter functions for bidirectional Anthropic↔OpenAI translation with support for images, tools, thinking blocks, and passthrough content.
Timing Strategies & Subagent Orchestration
src/aiperf/timing/strategies/core.py, src/aiperf/timing/strategies/subagent_mixin.py, src/aiperf/timing/strategies/fixed_schedule.py, src/aiperf/timing/strategies/request_rate.py, src/aiperf/timing/strategies/user_centric_rate.py, src/aiperf/timing/subagent_orchestrator.py
Added lifecycle hooks (on_failed_credit, cleanup) to strategy protocol; introduced SubagentMixin for consistent orchestration delegation; implemented SubagentOrchestrator with prerequisite-based gating, spawn/join tracking, and gate dispatch management; integrated subagent support into timing strategies with speedup scaling.
Conversation Source & Phase Runner
src/aiperf/timing/conversation_source.py, src/aiperf/timing/phase/runner.py, src/aiperf/endpoints/base_endpoint.py, src/aiperf/workers/inference_client.py
Extended ConversationSource with child-session methods, added turn-parameter merging to endpoint pipeline, refactored PhaseRunner for subagent injection and cleanup; added concurrency ramper helper.
Plugin System Updates
src/aiperf/plugin/enums.py, src/aiperf/plugin/plugins.py, src/aiperf/plugin/plugins.yaml, src/aiperf/plugin/schema/plugins.schema.json, src/aiperf/plugin/schema/schemas.py
Registered Conflux loader plugin with timing-support metadata flag; added supports_timing property to loader metadata schema; introduced supports_timing() API function.
Dataset & Inference Pipeline
src/aiperf/dataset/composer/custom.py, src/aiperf/dataset/dataset_manager.py, src/aiperf/dataset/synthesis/models.py, src/aiperf/dataset/synthesis/prefix_analyzer.py, src/aiperf/endpoints/openai_chat.py
Fixed type-guard for custom dataset detection; added timing-data tracking in DatasetMetadata; introduced MetricStats.from_values() for statistical computation; refactored prefix analyzer to use new stats factory; updated OpenAI endpoint message handling for raw_messages propagation.
Component & Integration Tests
tests/component_integration/dataset/test_subagent_orchestrator.py, tests/component_integration/dataset/test_conflux_timing_modes.py, tests/component_integration/sessions/test_sticky_routing.py, tests/component_integration/timing/*
Added end-to-end SubagentOrchestrator lifecycle tests, Conflux timing mode validation across strategies, and adjusted workload parameters in existing timing tests.
Unit Tests for New Features
tests/unit/common/config/test_input_config.py, tests/unit/dataset/loader/test_conflux.py, tests/unit/dataset/test_message_normalizer.py, tests/unit/timing/test_subagent_manager.py, tests/unit/timing/test_subagent_orchestrator_gating.py, tests/unit/timing/strategies/test_fixed_schedule.py, tests/unit/credit/test_child_session_bypass.py
Comprehensive coverage for speedup validation, Conflux record ingestion, message normalization round-trips, SubagentOrchestrator prerequisite gating, child-session bypass semantics, and fixed-schedule timing with speedup.
Test Infrastructure & Mocking
tests/unit/timing/conftest.py, tests/unit/timing/phase/test_runner.py, tests/unit/timing/strategies/test_subagent_mixin.py, tests/unit/credit/test_callback_handler.py, tests/unit/endpoints/test_openai_chat_completions.py, tests/unit/workers/test_session_manager.py, tests/unit/common/config/test_user_config_mooncake_trace.py, tests/component_integration/dataset/test_dataset_sampling.py, tests/unit/timing/test_subagent_manager_adversary.py
Added fixtures for fixed-schedule speedup threading, strategy cleanup/failure hooks, endpoint factory helpers, and extensive adversarial tests for orchestration edge cases; renamed timing-detection helper and refactored test configurations.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

The changes introduce interconnected systems (orchestration, timing strategies, credit handling) with substantial logic density and heterogeneous modifications across config, models, loaders, and infrastructure. While comprehensive test coverage mitigates risk, the complexity of subagent gating prerequisites, spawn/join semantics, and cross-cutting agent-depth propagation requires careful review across multiple functional areas.

Poem

🐰 A rabbit hops through tangled spawns and joins,
Where agents dance in parent-child rejoins,
With messages normalized and credits aligned,
And gated turns that leave no turns behind!
Now Conflux traces leap—swift, free, and bright,
As orchestrated subagents take flight! 🚀

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/aiperf/common/config/input_config.py (1)

303-313: ⚠️ Potential issue | 🟡 Minor

Refresh the Conflux CLI help text.

custom_dataset_type still documents the pre-Conflux option set, and conflux_include_utility_calls says the flag only works with explicit --custom-dataset-type conflux even though the Conflux loader can also be selected via auto-detection. Because these Field descriptions feed the generated CLI docs, the stale wording will leak into both --help output and docs/cli-options.md.

Suggested wording update
-            "Options: `single_turn` (JSONL with single exchanges), `multi_turn` (JSONL with conversation history), "
-            "`mooncake_trace`/`bailian_trace` (timestamped trace files), `random_pool` (directory of reusable prompts; "
+            "Options: `single_turn` (JSONL with single exchanges), `multi_turn` (JSONL with conversation history), "
+            "`mooncake_trace`/`bailian_trace`/`conflux` (timestamped trace files), `random_pool` (directory of reusable prompts; "
@@
-            "the main session timeline. Only applies to --custom-dataset-type conflux.",
+            "the main session timeline. Applies whenever the Conflux loader handles the input, "
+            "including the auto-detect path.",
Based on learnings, `docs/cli-options.md` is auto-generated from Pydantic Field definitions.

Also applies to: 371-383

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/common/config/input_config.py` around lines 303 - 313, Update the
Pydantic Field descriptions to refresh Conflux-related CLI help: revise the
custom_dataset_type Field description (symbol: custom_dataset_type) to remove
pre-Conflux phrasing and include the Conflux option and behaviors (including
that conflux can be selected via auto-detection, not only explicit
`--custom-dataset-type conflux`), and update the conflux_include_utility_calls
Field description (symbol: conflux_include_utility_calls) to state that the flag
applies when the Conflux loader is used either by explicit selection or by
auto-detection; ensure both descriptions are concise, accurate for generated
CLI/docs and keep mutual-exclusion / `--input-file` and sampling behavior notes
intact.
🧹 Nitpick comments (7)
src/aiperf/credit/structs.py (1)

49-51: Consider documenting new fields in the class docstring.

The Credit class docstring (lines 27-38) lists all attributes but the new agent_depth, subagent_type, and parent_correlation_id fields are not documented there. Adding documentation would maintain consistency with the existing attribute descriptions.

📝 Suggested docstring additions

Add to the Credit class docstring's Attributes section:

        agent_depth: Nesting depth of this agent in the subagent hierarchy (0 = top-level).
        subagent_type: Type identifier for subagent spawns (e.g., "child", "tool").
        parent_correlation_id: Correlation ID of the parent conversation for subagent tracking.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/credit/structs.py` around lines 49 - 51, Update the Credit class
docstring to document the new fields agent_depth, subagent_type, and
parent_correlation_id: in the Attributes section of class Credit add a brief
description for agent_depth (nesting depth of this agent, 0 = top-level),
subagent_type (type identifier for subagent spawns, e.g., "child" or "tool"),
and parent_correlation_id (correlation ID of the parent conversation for
subagent tracking) so the docstring matches the class attributes.
src/aiperf/common/enums/__init__.py (1)

80-80: Consider maintaining alphabetical order in __all__.

TurnThreadingMode is inserted between ConnectionReuseStrategy and CreditPhase, breaking the alphabetical order of the export list. Consider moving it to its correct alphabetical position (after TemperatureMetricUnitInfo and before VideoAudioCodec).

Suggested placement
     "TemperatureMetricUnit",
     "TemperatureMetricUnitInfo",
+    "TurnThreadingMode",
     "VideoAudioCodec",
     "VideoFormat",

And remove from line 80.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/common/enums/__init__.py` at line 80, The __all__ export list is
out of alphabetical order because "TurnThreadingMode" is currently placed
between "ConnectionReuseStrategy" and "CreditPhase"; move the string
"TurnThreadingMode" out of its current position and insert it into the correct
alphabetical spot in the __all__ list — after "TemperatureMetricUnitInfo" and
before "VideoAudioCodec" — ensuring the export ordering remains strictly
alphabetical.
tests/unit/dataset/test_message_normalizer.py (1)

49-54: Drop the Any annotation from variadic kwargs in these helpers.

These kwargs bags are just passthrough test data, so **extra: Any doesn't add a useful contract and goes against the repo guidance for variadic keyword arguments.

Suggested cleanup
 def _tool_use_block(
     tool_id: str = "toolu_01",
     name: str = "Read",
     input_dict: dict[str, Any] | None = None,
-    **extra: Any,
+    **extra,
 ) -> dict[str, Any]:
@@
 def _tool_result_block(
     tool_use_id: str = "toolu_01",
     content: str | list[dict[str, Any]] = "file contents here",
-    **extra: Any,
+    **extra,
 ) -> dict[str, Any]:
@@
-def _text_block(text: str, **extra: Any) -> dict[str, Any]:
+def _text_block(text: str, **extra) -> dict[str, Any]:
Based on learnings, in Python projects avoid adding type annotations to `**kwargs` like `**kwargs: Any`.

Also applies to: 78-82, 92-95

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/test_message_normalizer.py` around lines 49 - 54, The
helper functions (e.g., _tool_use_block) currently annotate the variadic kwargs
as **extra: Any which is unnecessary and conflicts with project guidance; remove
the explicit type annotation so the signature uses plain **extra, update all
similar helpers in this test file (the other instances noted around the file) to
drop the : Any on their **kwargs, and run the tests to ensure no behavioral
change.
tests/unit/timing/test_subagent_manager.py (1)

230-230: Drop the unused child_ids binding.

child_ids is never read in this test, so the extra unpack just leaves a lingering Ruff warning.

Suggested cleanup
-        orch, _, _, scheduler, child_ids = _make_orchestrator(is_background=True)
+        orch, _, _, scheduler, _child_ids = _make_orchestrator(is_background=True)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/timing/test_subagent_manager.py` at line 230, The test unpacks an
unused value from the helper _make_orchestrator, binding child_ids which is
never used; update the unpack to drop that binding (e.g., assign to _ or remove
from the tuple unpack) so child_ids is not created and the Ruff warning goes
away—locate the call to _make_orchestrator in
tests/unit/timing/test_subagent_manager.py and change the assignment "orch, _,
_, scheduler, child_ids = _make_orchestrator(...)" accordingly.
docs/tutorials/conflux-trace.md (1)

34-45: Replace the ASCII workflow sketch with Mermaid.

This block is explanatory structure, not literal command output, so it should use a Mermaid diagram to match the docs convention.

Suggested rewrite
 A coding session typically has a tree-shaped structure:
 
-```
-Parent Agent (main conversation)
-  Turn 1: User asks "refactor the auth module"
-  Turn 2: Agent reads files, thinks about approach
-    +-- Subagent A: searches codebase for auth references
-    +-- Subagent B: runs existing tests to check current behavior
-  Turn 3: Agent writes new code
-    +-- Subagent C: runs updated tests
-  Turn 4: Agent summarizes changes
-```
+```mermaid
+graph TD
+    P[Parent Agent]
+    T1[Turn 1: User asks "refactor the auth module"]
+    T2[Turn 2: Agent reads files and plans approach]
+    T3[Turn 3: Agent writes new code]
+    T4[Turn 4: Agent summarizes changes]
+    SA[Subagent A: search auth references]
+    SB[Subagent B: run existing tests]
+    SC[Subagent C: run updated tests]
+
+    P --> T1 --> T2 --> T3 --> T4
+    T2 --> SA
+    T2 --> SB
+    T3 --> SC
+```
As per coding guidelines, `**/*.md`: Use mermaid diagrams instead of ASCII art in markdown files.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/tutorials/conflux-trace.md` around lines 34 - 45, Replace the ASCII
workflow sketch with a Mermaid flowchart block: wrap a mermaid diagram in triple
backticks and create nodes for Parent Agent (P), Turn 1 (T1), Turn 2 (T2), Turn
3 (T3), Turn 4 (T4) and subagents SA, SB, SC, then connect them P --> T1 --> T2
--> T3 --> T4, T2 --> SA, T2 --> SB, and T3 --> SC so the explanatory structure
uses a mermaid graph (e.g., graph TD) instead of plain ASCII art.
tests/unit/timing/test_subagent_manager_adversary.py (2)

206-208: Prefix unused unpacked variables with underscore.

Static analysis flags child_ids as unused in these tests. Use _ prefix to indicate intentionally unused.

Suggested fix
     def test_cleanup_clears_all_state(self):
-        orch, _, _, _, child_ids = _make_orchestrator()
+        orch, _, _, _, _child_ids = _make_orchestrator()
         _spawn_children(orch)

...

     def test_double_cleanup_is_safe(self):
-        orch, _, _, _, child_ids = _make_orchestrator()
+        orch, _, _, _, _child_ids = _make_orchestrator()
         _spawn_children(orch)

Also applies to: 216-218

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/timing/test_subagent_manager_adversary.py` around lines 206 - 208,
The test declares unused variables (e.g., child_ids) from _make_orchestrator
which static analysis flags; update the unpacking in
test_cleanup_clears_all_state (and the similar case at lines 216-218) to prefix
unused names with an underscore (e.g., _child_ids or replace child_ids with _)
so the intent is explicit; locate the unpacking calls to _make_orchestrator in
test_cleanup_clears_all_state and the other test and rename the unused
binding(s) accordingly, leaving used bindings and calls to _spawn_children
untouched.

690-695: Prefix unused unpacked variables with underscore.

Static analysis flags child_ids and child_corr_ids as unused.

Suggested fix
     def test_background_child_error_does_not_affect_join(self):
         """A background child that errors should not touch pending joins
         or child_to_parent mappings."""
-        orch, _, _, _, child_ids = _make_orchestrator()
-        _, child_corr_ids = _spawn_children(orch)
+        orch, _, _, _, _child_ids = _make_orchestrator()
+        _, _child_corr_ids = _spawn_children(orch)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/timing/test_subagent_manager_adversary.py` around lines 690 - 695,
The test function test_background_child_error_does_not_affect_join currently
unpacks values into child_ids and child_corr_ids but doesn't use them; update
the unpacking to prefix those unused variables with an underscore so they are
treated as intentionally unused (e.g., change child_ids -> _child_ids and
child_corr_ids -> _child_corr_ids in the calls to _make_orchestrator and
_spawn_children) to satisfy static analysis and signal intent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/aiperf/dataset/message_normalizer.py`:
- Around line 309-349: The current logic batches all tool_results and appends
them after the synthesized user message, which reorders mixed Anthropic turns;
change the loop in the function that iterates over content so you build result
incrementally: keep accumulating text_parts/content_parts for the current user
chunk and whenever you encounter a "tool_result" block (the code that currently
creates tool_msg using _flatten_text_content and tool_use_id), first flush any
accumulated user chunk into result (using the existing text_parts/content_parts
logic), then append the tool_msg immediately to result; continue processing so
multiple interleaved tool_result and text/image blocks preserve original order;
keep using the same variables (text_parts, content_parts, tool_results no longer
batched) and preserve the final fallback return (result or [msg]).
- Around line 156-168: The loop that auto-detects provider from message content
misses plain Anthropic assistant text blocks; update the detection in the
messages -> content list branch of normalize_messages (the for msg in messages
loop) to treat text-only blocks as Anthropic: when block.get("type") == "text"
(or add "text" to _ANTHROPIC_MESSAGE_BLOCK_TYPES) or if the parent msg indicates
an Anthropic assistant role, return "anthropic" so text-only Anthropic captures
aren't misclassified as "openai".
- Around line 484-489: The loop in message_normalizer.py overwrites the variable
system each time a "system" or "developer" message is encountered, dropping
earlier instructions; change the handling inside that for messages loop so that
when role in ("system","developer") you merge the new msg.get("content") into
the existing system value instead of replacing it (e.g., if system is empty set
it to the content, otherwise append with a separator like "\n\n" or space);
reference the variable name system and the loop over messages to locate the
change and ensure you coerce the msg.get("content") to a string before
concatenation to avoid type issues.
- Around line 229-272: The code flattens all text blocks into text_parts and
reconstructs them later, which loses empty-block positions, double-newline
boundaries, and per-block citation associations; change the logic to preserve
per-block data: create and populate a text_blocks list (including empty strings)
alongside citations_by_block (or a parallel list mapping citations to each text
block) when iterating blocks (instead of skipping empties in text_parts), update
block_order entries to reference indices into text_blocks, and when setting out
include both the flattened combined_text for simple cases and the preserved
structure (e.g., out["_text_blocks"] and out["_citations"] as per-block lists)
whenever you also set out["_block_order"] so the emitter can re-emit exact
original blocks; ensure references to text_parts, citations, block_order,
tool_calls, thinking_blocks and the final out["_block_order"]/out["_citations"]
use the new per-block lists so no index is -1 and block-level metadata is
retained.
- Around line 608-612: The bare except in the block that tries to parse a string
with orjson.loads (when isinstance(arguments, str)) is too broad; replace the
broad except Exception with a specific except for orjson.JSONDecodeError (or the
appropriate ValueError subclass) so only JSON decode failures fall back to
assigning input_val = arguments while other exceptions propagate; update the
try/except around orjson.loads(arguments) that sets input_val accordingly.

---

Outside diff comments:
In `@src/aiperf/common/config/input_config.py`:
- Around line 303-313: Update the Pydantic Field descriptions to refresh
Conflux-related CLI help: revise the custom_dataset_type Field description
(symbol: custom_dataset_type) to remove pre-Conflux phrasing and include the
Conflux option and behaviors (including that conflux can be selected via
auto-detection, not only explicit `--custom-dataset-type conflux`), and update
the conflux_include_utility_calls Field description (symbol:
conflux_include_utility_calls) to state that the flag applies when the Conflux
loader is used either by explicit selection or by auto-detection; ensure both
descriptions are concise, accurate for generated CLI/docs and keep
mutual-exclusion / `--input-file` and sampling behavior notes intact.

---

Nitpick comments:
In `@docs/tutorials/conflux-trace.md`:
- Around line 34-45: Replace the ASCII workflow sketch with a Mermaid flowchart
block: wrap a mermaid diagram in triple backticks and create nodes for Parent
Agent (P), Turn 1 (T1), Turn 2 (T2), Turn 3 (T3), Turn 4 (T4) and subagents SA,
SB, SC, then connect them P --> T1 --> T2 --> T3 --> T4, T2 --> SA, T2 --> SB,
and T3 --> SC so the explanatory structure uses a mermaid graph (e.g., graph TD)
instead of plain ASCII art.

In `@src/aiperf/common/enums/__init__.py`:
- Line 80: The __all__ export list is out of alphabetical order because
"TurnThreadingMode" is currently placed between "ConnectionReuseStrategy" and
"CreditPhase"; move the string "TurnThreadingMode" out of its current position
and insert it into the correct alphabetical spot in the __all__ list — after
"TemperatureMetricUnitInfo" and before "VideoAudioCodec" — ensuring the export
ordering remains strictly alphabetical.

In `@src/aiperf/credit/structs.py`:
- Around line 49-51: Update the Credit class docstring to document the new
fields agent_depth, subagent_type, and parent_correlation_id: in the Attributes
section of class Credit add a brief description for agent_depth (nesting depth
of this agent, 0 = top-level), subagent_type (type identifier for subagent
spawns, e.g., "child" or "tool"), and parent_correlation_id (correlation ID of
the parent conversation for subagent tracking) so the docstring matches the
class attributes.

In `@tests/unit/dataset/test_message_normalizer.py`:
- Around line 49-54: The helper functions (e.g., _tool_use_block) currently
annotate the variadic kwargs as **extra: Any which is unnecessary and conflicts
with project guidance; remove the explicit type annotation so the signature uses
plain **extra, update all similar helpers in this test file (the other instances
noted around the file) to drop the : Any on their **kwargs, and run the tests to
ensure no behavioral change.

In `@tests/unit/timing/test_subagent_manager_adversary.py`:
- Around line 206-208: The test declares unused variables (e.g., child_ids) from
_make_orchestrator which static analysis flags; update the unpacking in
test_cleanup_clears_all_state (and the similar case at lines 216-218) to prefix
unused names with an underscore (e.g., _child_ids or replace child_ids with _)
so the intent is explicit; locate the unpacking calls to _make_orchestrator in
test_cleanup_clears_all_state and the other test and rename the unused
binding(s) accordingly, leaving used bindings and calls to _spawn_children
untouched.
- Around line 690-695: The test function
test_background_child_error_does_not_affect_join currently unpacks values into
child_ids and child_corr_ids but doesn't use them; update the unpacking to
prefix those unused variables with an underscore so they are treated as
intentionally unused (e.g., change child_ids -> _child_ids and child_corr_ids ->
_child_corr_ids in the calls to _make_orchestrator and _spawn_children) to
satisfy static analysis and signal intent.

In `@tests/unit/timing/test_subagent_manager.py`:
- Line 230: The test unpacks an unused value from the helper _make_orchestrator,
binding child_ids which is never used; update the unpack to drop that binding
(e.g., assign to _ or remove from the tuple unpack) so child_ids is not created
and the Ruff warning goes away—locate the call to _make_orchestrator in
tests/unit/timing/test_subagent_manager.py and change the assignment "orch, _,
_, scheduler, child_ids = _make_orchestrator(...)" accordingly.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 3170a231-b36a-4c40-a6ef-c9b9119b8eb2

📥 Commits

Reviewing files that changed from the base of the PR and between 1cdae72 and a0f0eff642401bd4825820e66d561d4ce62d5cc4.

📒 Files selected for processing (49)
  • README.md
  • docs/cli-options.md
  • docs/tutorials/conflux-trace.md
  • fern/versions/dev.yml
  • src/aiperf/common/config/config_defaults.py
  • src/aiperf/common/config/input_config.py
  • src/aiperf/common/config/user_config.py
  • src/aiperf/common/enums/__init__.py
  • src/aiperf/common/enums/enums.py
  • src/aiperf/common/models/__init__.py
  • src/aiperf/common/models/dataset_models.py
  • src/aiperf/credit/structs.py
  • src/aiperf/dataset/composer/custom.py
  • src/aiperf/dataset/dataset_manager.py
  • src/aiperf/dataset/loader/__init__.py
  • src/aiperf/dataset/loader/base_loader.py
  • src/aiperf/dataset/loader/base_trace_loader.py
  • src/aiperf/dataset/loader/conflux.py
  • src/aiperf/dataset/loader/models.py
  • src/aiperf/dataset/loader/mooncake_trace.py
  • src/aiperf/dataset/message_normalizer.py
  • src/aiperf/endpoints/base_endpoint.py
  • src/aiperf/endpoints/openai_chat.py
  • src/aiperf/plugin/enums.py
  • src/aiperf/plugin/plugins.py
  • src/aiperf/plugin/plugins.yaml
  • src/aiperf/plugin/schema/plugins.schema.json
  • src/aiperf/plugin/schema/schemas.py
  • src/aiperf/timing/config.py
  • src/aiperf/timing/conversation_source.py
  • src/aiperf/timing/phase/runner.py
  • src/aiperf/timing/strategies/fixed_schedule.py
  • src/aiperf/timing/strategies/request_rate.py
  • src/aiperf/timing/strategies/user_centric_rate.py
  • src/aiperf/timing/subagent_orchestrator.py
  • src/aiperf/workers/inference_client.py
  • src/aiperf/workers/session_manager.py
  • src/aiperf/workers/worker.py
  • tests/component_integration/dataset/test_subagent_orchestrator.py
  • tests/unit/common/config/test_input_config.py
  • tests/unit/common/config/test_user_config_mooncake_trace.py
  • tests/unit/dataset/loader/test_conflux.py
  • tests/unit/dataset/test_message_normalizer.py
  • tests/unit/endpoints/test_openai_chat_completions.py
  • tests/unit/timing/conftest.py
  • tests/unit/timing/strategies/test_fixed_schedule.py
  • tests/unit/timing/test_subagent_manager.py
  • tests/unit/timing/test_subagent_manager_adversary.py
  • tests/unit/workers/test_session_manager.py

Add end-to-end support for replaying Conflux proxy captures that contain
parent + subagent conversation hierarchies. Includes a message normalizer
(Anthropic/OpenAI -> canonical format), a composable SubagentOrchestrator
for timing strategies, and threading-mode-aware conversation dispatch
through SessionManager and Worker. Extends credit system, endpoint, and
all three timing strategies to handle multi-turn subagent spawns/joins.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
@ajcasagrande ajcasagrande force-pushed the ajc/conflux-data-loader branch from a0f0eff to a729fa5 Compare March 17, 2026 08:02
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (5)
src/aiperf/dataset/message_normalizer.py (5)

229-236: ⚠️ Potential issue | 🟠 Major

_block_order text indexing can go negative and break ordered emission.

When a text block is empty, Line 235 records len(text_parts) - 1 without appending text, which can produce -1. Later ordered emission indexes with that value, causing wrong replay (or IndexError when no text parts exist).

💡 Proposed fix
-    text_parts: list[str] = []
+    text_blocks: list[str] = []
@@
         if block_type == "text":
             text = block.get("text", "")
-            if text:
-                text_parts.append(text)
+            text_blocks.append(text if isinstance(text, str) else "")
             if "citations" in block:
                 citations.extend(block["citations"])
-            block_order.append(("text", len(text_parts) - 1))
+            block_order.append(("text", len(text_blocks) - 1))
@@
-    combined_text = "\n\n".join(text_parts) if text_parts else None
+    combined_text = "\n\n".join([t for t in text_blocks if t]) if text_blocks else None
+    if text_blocks:
+        out["_text_blocks"] = text_blocks
-    text = msg.get("content")
-    text_parts = text.split("\n\n") if isinstance(text, str) and text else []
+    text_parts = msg.get("_text_blocks", [])
+    if not isinstance(text_parts, list):
+        text = msg.get("content")
+        text_parts = text.split("\n\n") if isinstance(text, str) and text else []
@@
-        elif kind == "text" and idx < len(text_parts):
+        elif kind == "text" and 0 <= idx < len(text_parts):
             content_blocks.append({"type": "text", "text": text_parts[idx]})

Also applies to: 576-584

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/message_normalizer.py` around lines 229 - 236, The
block_order entry for text blocks may record len(text_parts) - 1 even when the
text is empty, producing -1; update the logic in the method handling block_type
== "text" (in message_normalizer.py) so you only append a ("text", index) entry
after you actually append a text part — e.g., capture the index as
len(text_parts) before appending and then append the index along with the text,
or guard the block_order.append call with the same truthy text check that wraps
text_parts.append; ensure citations handling remains unchanged; apply the same
fix to the duplicate code region around lines 576-584 (the other text-processing
block).

487-489: ⚠️ Potential issue | 🟠 Major

Earlier system/developer instructions are overwritten.

Only the last instruction survives; previous ones are dropped before Anthropic emission.

💡 Proposed fix
         if role in ("system", "developer"):
-            system = msg.get("content")
+            content = msg.get("content")
+            system = content if system is None else _merge_content(system, content)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/message_normalizer.py` around lines 487 - 489, The current
logic in message_normalizer.py overwrites the variable system when encountering
roles ("system","developer"), dropping earlier instructions; change this to
accumulate all system/developer contents instead of replacing them — e.g.,
collect into a list (system_messages) or concatenate the existing system string
with msg.get("content") so every system/developer message is preserved before
Anthropic emission; update any downstream uses (e.g., where system is
referenced) to use the combined value.

156-168: ⚠️ Potential issue | 🟠 Major

Provider autodetection still misses text-only Anthropic assistant blocks.

With provider=None, a message whose content is only {"type": "text"} blocks still falls through to "openai", so Anthropic normalization can be skipped.

💡 Proposed fix
         if isinstance(content, list):
             for block in content:
                 if not isinstance(block, dict):
                     continue
                 block_type = block.get("type")
                 if block_type in _ANTHROPIC_MESSAGE_BLOCK_TYPES:
                     return "anthropic"
+                if block_type == "text" and msg.get("role") == "assistant":
+                    return "anthropic"
                 if block_type == "image" and isinstance(block.get("source"), dict):
                     return "anthropic"
                 if block_type in _ALL_PASSTHROUGH_BLOCK_TYPES:
                     return "anthropic"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/message_normalizer.py` around lines 156 - 168, The
provider autodetection loop in function(s) iterating over messages and blocks
(variables: messages, content, block) fails to classify text-only Anthropic
assistant blocks as "anthropic"; update the detection logic inside that loop so
that a block with block.get("type") == "text" is treated as Anthropic (in
addition to existing checks against _ANTHROPIC_MESSAGE_BLOCK_TYPES and
_ALL_PASSTHROUGH_BLOCK_TYPES and the image/source check). Locate the loop that
currently checks block_type in _ANTHROPIC_MESSAGE_BLOCK_TYPES, block_type ==
"image" with dict source, and block_type in _ALL_PASSTHROUGH_BLOCK_TYPES, and
add a branch (or include "text") to return "anthropic" for plain text blocks so
provider=None correctly detects Anthropic messages.

309-349: ⚠️ Potential issue | 🟠 Major

Tool results are still reordered after synthesized user content.

Mixed Anthropic user turns are normalized as user first and all tool_result messages later, which changes original interleaving order.

💡 Proposed fix
-    tool_results: list[dict[str, Any]] = []
     text_parts: list[str] = []
     content_parts: list[dict[str, Any]] = []
     has_unconvertible_blocks = False
+    result: list[dict[str, Any]] = []
+
+    def _flush_user_chunk() -> None:
+        nonlocal text_parts, content_parts
+        if text_parts and not content_parts and not has_unconvertible_blocks:
+            result.append({"role": "user", "content": "\n\n".join(text_parts)})
+        elif text_parts or content_parts:
+            combined = [{"type": "text", "text": t} for t in text_parts]
+            combined.extend(content_parts)
+            result.append({"role": "user", "content": combined})
+        text_parts = []
+        content_parts = []
@@
         if block_type == "tool_result":
+            _flush_user_chunk()
             tool_msg: dict[str, Any] = {
                 "role": "tool",
                 "tool_call_id": block.get("tool_use_id", ""),
                 "content": _flatten_text_content(block.get("content")),
             }
@@
-            tool_results.append(tool_msg)
+            result.append(tool_msg)
@@
-    result: list[dict[str, Any]] = []
-
-    if text_parts and not content_parts and not has_unconvertible_blocks:
-        result.append({"role": "user", "content": "\n\n".join(text_parts)})
-    elif text_parts or content_parts:
-        combined = [{"type": "text", "text": t} for t in text_parts]
-        combined.extend(content_parts)
-        result.append({"role": "user", "content": combined})
-
-    result.extend(tool_results)
+    _flush_user_chunk()
     return result if result else [msg]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/message_normalizer.py` around lines 309 - 349, The current
normalization collects tool_result messages into tool_results and text/content
into text_parts/content_parts, then appends tool_results after synthesized user
content which reorders original interleaving; instead, preserve original
sequence by emitting entries as you iterate over blocks: when encountering a
"tool_result" create the tool_msg (same fields as in the block handling) and
append it directly to the output sequence, and when encountering
"text"/"image"/passthrough/unconvertible create the corresponding user content
pieces (either single text entries or content blocks) and append them in-place;
update logic around text_parts/content_parts/result to build result
incrementally during the loop (maintain has_unconvertible_blocks behavior) so
returned list preserves the original block order while keeping the same message
shapes (references: variables block, tool_results/tool_msg, text_parts,
content_parts, has_unconvertible_blocks, and final result).

608-612: ⚠️ Potential issue | 🟡 Minor

Narrow the orjson.loads fallback exception type.

Catching Exception here can hide non-JSON bugs; this fallback should only handle decode failures.

💡 Proposed fix
     if isinstance(arguments, str):
         try:
             input_val = orjson.loads(arguments)
-        except Exception:
+        except orjson.JSONDecodeError:
             input_val = arguments
According to the official orjson documentation for 3.10.x, what exception type(s) can orjson.loads() raise for invalid JSON input?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/message_normalizer.py` around lines 608 - 612, Replace the
broad except Exception around the orjson.loads(arguments) call with a narrow
except specifically for JSON decode failures (catch orjson.JSONDecodeError) so
only invalid-JSON falls back to treating arguments as a raw string; update the
block that assigns input_val from orjson.loads(arguments) (the variables
arguments and input_val) and ensure orjson.JSONDecodeError is
referenced/imported as needed.
🧹 Nitpick comments (10)
src/aiperf/common/config/user_config.py (1)

388-412: Consider specifying explicit encoding for file operations.

The open(path) call relies on the system default encoding. For robustness across different environments, consider specifying encoding="utf-8" explicitly.

Suggested fix
-        with open(path) as f:
+        with open(path, encoding="utf-8") as f:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/common/config/user_config.py` around lines 388 - 412, The file
reading uses open(path) without specifying encoding; update the context manager
to explicitly set encoding="utf-8" (e.g., replace "with open(path) as f:" with
"with open(path, encoding='utf-8') as f:" or use path.open(encoding='utf-8')")
so JSON/JSONL parsing via orjson behaves consistently across environments; no
other logic changes needed.
src/aiperf/workers/inference_client.py (1)

175-176: Avoid tracing the entire turn object here.

With Conflux replay, the last turn can carry large raw_messages and tool payloads. Defer formatting via lambda and log stable identifiers instead of serializing the whole turn.

As per coding guidelines "**/*.py: Use lambda for expensive logs: self.debug(lambda: f"{self._x()}")`, use direct strings for cheap logs".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/workers/inference_client.py` around lines 175 - 176, The current
trace call eagerly serializes the full turn (request_info.turns[-1]) which can
be large; change the trace invocation in the inference client to defer
formatting via a lambda and log stable identifiers only (e.g., turn.id or
turn.index and any small metadata) instead of the whole turn; update the branch
that checks self.is_trace_enabled to call self.trace(lambda: f"Calling inference
API for turn id={turn.id} index={turn.index}") (or similar small fields) so
expensive serialization is avoided while preserving useful context.
tests/unit/dataset/test_message_normalizer.py (1)

49-54: Drop Any from these variadic kwargs.

**extra: Any makes an intentionally open keyword bag look stricter than it is. Prefer untyped **extra here, or replace it with explicit named kwargs if the helper is meant to enforce a real contract.

Based on learnings, "avoid adding type annotations to **kwargs like **kwargs: Any. The variadic keyword arguments are inherently dynamic; leave **kwargs untyped or replace with explicit, named keyword parameters if a concrete contract is needed."

Also applies to: 78-82, 92-95

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/test_message_normalizer.py` around lines 49 - 54, The
helper function _tool_use_block (and the other similar helpers referenced)
currently type-annotates variadic kwargs as **extra: Any which falsely
constrains the dynamic kwargs; remove the explicit Any annotation and make them
untyped (**extra) or, if you intended a concrete contract, replace **extra with
explicit named parameters. Update the signature of _tool_use_block and the other
helper functions (the ones around the other occurrences) to drop the ": Any" on
**extra so callers can pass dynamic keyword args without the misleading
annotation.
src/aiperf/workers/session_manager.py (1)

71-79: Make store_response() enforce the threading policy.

Right now SKIP_ASSISTANT_RESPONSES and ISOLATED_TURNS stay correct only if every caller remembers to check should_store_response() first. Keeping that guard inside store_response() makes the session invariant much harder to break.

Suggested change
     def store_response(self, response_turn: Turn) -> None:
         """
         Store the response for the turn.
         """
-        self.turn_list.append(response_turn)
+        if not self.should_store_response():
+            return
+        self.turn_list.append(response_turn)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/workers/session_manager.py` around lines 71 - 79, store_response
currently unconditionally appends to self.turn_list and relies on callers to
call should_store_response first; change store_response (and reference
TurnThreadingMode constants: THREAD_ASSISTANT_RESPONSES,
SKIP_ASSISTANT_RESPONSES, ISOLATED_TURNS) to enforce the threading policy by
checking should_store_response() at the top and only appending response_turn to
self.turn_list when it returns True (for non-storing modes, make store_response
a no-op or optionally log/ignore instead of mutating turn_list) so the session
invariant cannot be broken by callers forgetting the guard.
tests/unit/timing/test_subagent_manager_adversary.py (3)

216-218: Prefix unused unpacked variable with underscore.

Same issue as above - child_ids is unpacked but unused.

Proposed fix
-        orch, _, _, _, child_ids = _make_orchestrator()
+        orch, _, _, _, _child_ids = _make_orchestrator()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/timing/test_subagent_manager_adversary.py` around lines 216 - 218,
The test test_double_cleanup_is_safe unpacks child_ids from _make_orchestrator()
but never uses it; update the unpack to prefix the unused variable with an
underscore (e.g., _child_ids or use a single underscore) when calling
_make_orchestrator() so the unused unpacked variable is clearly marked, keeping
the rest of the test and the _spawn_children(orch) call unchanged.

687-714: Prefix unused unpacked variables with underscores.

Both child_ids and child_corr_ids are unpacked but never used in this test.

Proposed fix
-    def test_background_child_error_does_not_affect_join(self):
-        """A background child that errors should not touch pending joins
-        or child_to_parent mappings."""
-        orch, _, _, _, child_ids = _make_orchestrator()
-        _, child_corr_ids = _spawn_children(orch)
+    def test_background_child_error_does_not_affect_join(self):
+        """A background child that errors should not touch pending joins
+        or child_to_parent mappings."""
+        orch, _, _, _, _child_ids = _make_orchestrator()
+        _, _child_corr_ids = _spawn_children(orch)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/timing/test_subagent_manager_adversary.py` around lines 687 - 714,
The test
TestBackgroundChildErrorIsolation.test_background_child_error_does_not_affect_join
unpacks child_ids and child_corr_ids from _make_orchestrator/_spawn_children but
never uses them; update the unpacking to use prefixed unused variables (e.g.,
_child_ids, _child_corr_ids or simply _, _) when calling _make_orchestrator()
and _spawn_children() so the unused values are clearly marked and lint warnings
are avoided.

206-208: Prefix unused unpacked variable with underscore.

Static analysis correctly identifies that child_ids is unpacked but never used in this test.

Proposed fix
-        orch, _, _, _, child_ids = _make_orchestrator()
+        orch, _, _, _, _child_ids = _make_orchestrator()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/timing/test_subagent_manager_adversary.py` around lines 206 - 208,
In test_cleanup_clears_all_state, the unpacked variable child_ids is unused;
update the unpacking from _make_orchestrator() to prefix that variable with an
underscore (e.g., _child_ids or just _ ) so static analysis no longer flags it;
adjust the assignment where orch, _, _, _, child_ids = _make_orchestrator() to
orch, _, _, _, _child_ids = _make_orchestrator() (or replace the last target
with a single underscore) to locate the change near the
test_cleanup_clears_all_state definition and the call to _make_orchestrator().
src/aiperf/timing/strategies/user_centric_rate.py (1)

146-149: Same coupling pattern as fixed_schedule.py.

This is consistent with the other timing strategies, but the same observation applies - consider ensuring the orchestrator is constructed with the dispatch function rather than patching it post-construction.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/timing/strategies/user_centric_rate.py` around lines 146 - 149,
The code patches the orchestrator's dispatch by assigning
self._subagents._dispatch = self._dispatch_turn after construction (in
user_centric_rate.py), which couples lifecycle and hides dependencies; instead
modify the subagents/orchestrator constructor to accept a dispatch callable
(e.g., dispatch or dispatch_fn) and pass self._dispatch_turn into that
constructor when creating self._subagents, then remove the post-construction
assignment in user_centric_rate.py (mirror the same change used to decouple in
fixed_schedule.py); update the subagent/orchestrator class initializer to store
and use the provided dispatch callable rather than relying on external
monkey-patching.
src/aiperf/timing/strategies/fixed_schedule.py (1)

68-69: Consider dependency injection over internal attribute assignment.

Directly assigning self._subagents._dispatch = self._dispatch_turn couples the strategy to the orchestrator's internal implementation. The orchestrator already accepts dispatch_fn in its constructor - this reassignment suggests the orchestrator may be instantiated elsewhere without the dispatch callback, then patched here.

If this pattern is intentional (e.g., orchestrator is shared/reused), document why. Otherwise, ensure the orchestrator is constructed with the dispatch function from the start.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/timing/strategies/fixed_schedule.py` around lines 68 - 69, The
code currently mutates the orchestrator internals by setting
self._subagents._dispatch = self._dispatch_turn; instead, construct or configure
the orchestrator with the dispatch callback (use its dispatch_fn constructor
parameter) so the orchestrator receives _dispatch_turn at instantiation, remove
this direct assignment, and if the orchestrator must be shared/reused, add a
clear public API (e.g., set_dispatch or a documented parameter) on the
orchestrator class and call that from FixedScheduleStrategy rather than writing
to _dispatch directly; update FixedScheduleStrategy where self._subagents is
created/initialized to pass self._dispatch_turn (or call the new setter) and add
a short docstring explaining the sharing behavior if applicable.
src/aiperf/timing/subagent_orchestrator.py (1)

100-130: Consider adding type stub for stop_checker.

The stop_checker parameter is typed as object (line 105), relying on duck typing for can_send_any_turn(). While this works, a Protocol type would provide better IDE support and documentation.

This is a minor suggestion - the current approach is functional and consistent with how other strategies handle this dependency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/timing/subagent_orchestrator.py` around lines 100 - 130, The
constructor currently types stop_checker as object; define a small Protocol
(e.g., StopChecker with def can_send_any_turn(self) -> bool) and use it as the
parameter and attribute type to improve IDE/type-checker support; update the
__init__ signature (stop_checker: StopChecker) and the instance attribute
(self._stop_checker: StopChecker) in the SubagentOrchestrator class so callers
and editors know the required can_send_any_turn() shape.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/aiperf/timing/conversation_source.py`:
- Around line 49-70: build_first_turn currently defaults agent_depth to 0 which
masks the sampled metadata; change the signature of build_first_turn to accept
agent_depth: int | None = None and when constructing TurnToSend use the metadata
value when agent_depth is None (e.g., agent_depth_to_send = agent_depth if
agent_depth is not None else getattr(self.metadata, "agent_depth", 0)), then
pass agent_depth_to_send into TurnToSend so child sessions inherit sampled
metadata unless an explicit agent_depth is provided.

In `@src/aiperf/timing/strategies/request_rate.py`:
- Around line 109-110: The runner currently constructs SubagentOrchestrator
without supplying the public dispatch callback and then mutates the private
_dispatch on _subagents (e.g., self._subagents._dispatch = self._dispatch_turn).
Instead, update runner instantiation to pass dispatch_fn=self._dispatch_turn (or
the strategy's dispatch method) into SubagentOrchestrator so the orchestrator
uses the provided callback; remove the post-construction rebinding of _dispatch
in RequestRateStrategy, UserCentricRateStrategy, and FixedScheduleStrategy and
ensure any references use the public dispatch_fn parameter.

In `@src/aiperf/workers/inference_client.py`:
- Around line 103-111: The code merges per-turn extra_params into
formatted_payload via self.endpoint.merge_turn_params (when
current_turn.extra_params exists) but later still derives record.model_name from
turn metadata, causing mismatch; fix by deriving the effective model once after
merging (inspect the merged formatted_payload for a model override key used by
self.endpoint, e.g., model or model_name) and assign that value to
record.model_name (and any other record fields that depend on the payload)
during the record enrichment path (where record is populated around the current
record creation/assignment) so the sent request and the recorded model are
consistent.

In `@tests/unit/dataset/loader/test_conflux.py`:
- Around line 41-77: The _make_record fixture builder uses truthy "or" fallbacks
which collapse explicit None into defaults (e.g., hyperparameters=None becomes
{"max_tokens":4096}); update _make_record to check for None explicitly for
optional fields (tokens, tools, messages, hyperparameters, maybe is_streaming)
and only substitute defaults when the argument is strictly None so tests can
pass a real null/absent case; locate the function named _make_record in the test
file and replace the "tokens or {...}", "tools or []", "messages or [...]", and
"hyperparameters or {...}" style fallbacks with explicit None checks (e.g.,
tokens if tokens is not None else {...}) to preserve passed None values.

---

Duplicate comments:
In `@src/aiperf/dataset/message_normalizer.py`:
- Around line 229-236: The block_order entry for text blocks may record
len(text_parts) - 1 even when the text is empty, producing -1; update the logic
in the method handling block_type == "text" (in message_normalizer.py) so you
only append a ("text", index) entry after you actually append a text part —
e.g., capture the index as len(text_parts) before appending and then append the
index along with the text, or guard the block_order.append call with the same
truthy text check that wraps text_parts.append; ensure citations handling
remains unchanged; apply the same fix to the duplicate code region around lines
576-584 (the other text-processing block).
- Around line 487-489: The current logic in message_normalizer.py overwrites the
variable system when encountering roles ("system","developer"), dropping earlier
instructions; change this to accumulate all system/developer contents instead of
replacing them — e.g., collect into a list (system_messages) or concatenate the
existing system string with msg.get("content") so every system/developer message
is preserved before Anthropic emission; update any downstream uses (e.g., where
system is referenced) to use the combined value.
- Around line 156-168: The provider autodetection loop in function(s) iterating
over messages and blocks (variables: messages, content, block) fails to classify
text-only Anthropic assistant blocks as "anthropic"; update the detection logic
inside that loop so that a block with block.get("type") == "text" is treated as
Anthropic (in addition to existing checks against _ANTHROPIC_MESSAGE_BLOCK_TYPES
and _ALL_PASSTHROUGH_BLOCK_TYPES and the image/source check). Locate the loop
that currently checks block_type in _ANTHROPIC_MESSAGE_BLOCK_TYPES, block_type
== "image" with dict source, and block_type in _ALL_PASSTHROUGH_BLOCK_TYPES, and
add a branch (or include "text") to return "anthropic" for plain text blocks so
provider=None correctly detects Anthropic messages.
- Around line 309-349: The current normalization collects tool_result messages
into tool_results and text/content into text_parts/content_parts, then appends
tool_results after synthesized user content which reorders original
interleaving; instead, preserve original sequence by emitting entries as you
iterate over blocks: when encountering a "tool_result" create the tool_msg (same
fields as in the block handling) and append it directly to the output sequence,
and when encountering "text"/"image"/passthrough/unconvertible create the
corresponding user content pieces (either single text entries or content blocks)
and append them in-place; update logic around text_parts/content_parts/result to
build result incrementally during the loop (maintain has_unconvertible_blocks
behavior) so returned list preserves the original block order while keeping the
same message shapes (references: variables block, tool_results/tool_msg,
text_parts, content_parts, has_unconvertible_blocks, and final result).
- Around line 608-612: Replace the broad except Exception around the
orjson.loads(arguments) call with a narrow except specifically for JSON decode
failures (catch orjson.JSONDecodeError) so only invalid-JSON falls back to
treating arguments as a raw string; update the block that assigns input_val from
orjson.loads(arguments) (the variables arguments and input_val) and ensure
orjson.JSONDecodeError is referenced/imported as needed.

---

Nitpick comments:
In `@src/aiperf/common/config/user_config.py`:
- Around line 388-412: The file reading uses open(path) without specifying
encoding; update the context manager to explicitly set encoding="utf-8" (e.g.,
replace "with open(path) as f:" with "with open(path, encoding='utf-8') as f:"
or use path.open(encoding='utf-8')") so JSON/JSONL parsing via orjson behaves
consistently across environments; no other logic changes needed.

In `@src/aiperf/timing/strategies/fixed_schedule.py`:
- Around line 68-69: The code currently mutates the orchestrator internals by
setting self._subagents._dispatch = self._dispatch_turn; instead, construct or
configure the orchestrator with the dispatch callback (use its dispatch_fn
constructor parameter) so the orchestrator receives _dispatch_turn at
instantiation, remove this direct assignment, and if the orchestrator must be
shared/reused, add a clear public API (e.g., set_dispatch or a documented
parameter) on the orchestrator class and call that from FixedScheduleStrategy
rather than writing to _dispatch directly; update FixedScheduleStrategy where
self._subagents is created/initialized to pass self._dispatch_turn (or call the
new setter) and add a short docstring explaining the sharing behavior if
applicable.

In `@src/aiperf/timing/strategies/user_centric_rate.py`:
- Around line 146-149: The code patches the orchestrator's dispatch by assigning
self._subagents._dispatch = self._dispatch_turn after construction (in
user_centric_rate.py), which couples lifecycle and hides dependencies; instead
modify the subagents/orchestrator constructor to accept a dispatch callable
(e.g., dispatch or dispatch_fn) and pass self._dispatch_turn into that
constructor when creating self._subagents, then remove the post-construction
assignment in user_centric_rate.py (mirror the same change used to decouple in
fixed_schedule.py); update the subagent/orchestrator class initializer to store
and use the provided dispatch callable rather than relying on external
monkey-patching.

In `@src/aiperf/timing/subagent_orchestrator.py`:
- Around line 100-130: The constructor currently types stop_checker as object;
define a small Protocol (e.g., StopChecker with def can_send_any_turn(self) ->
bool) and use it as the parameter and attribute type to improve IDE/type-checker
support; update the __init__ signature (stop_checker: StopChecker) and the
instance attribute (self._stop_checker: StopChecker) in the SubagentOrchestrator
class so callers and editors know the required can_send_any_turn() shape.

In `@src/aiperf/workers/inference_client.py`:
- Around line 175-176: The current trace call eagerly serializes the full turn
(request_info.turns[-1]) which can be large; change the trace invocation in the
inference client to defer formatting via a lambda and log stable identifiers
only (e.g., turn.id or turn.index and any small metadata) instead of the whole
turn; update the branch that checks self.is_trace_enabled to call
self.trace(lambda: f"Calling inference API for turn id={turn.id}
index={turn.index}") (or similar small fields) so expensive serialization is
avoided while preserving useful context.

In `@src/aiperf/workers/session_manager.py`:
- Around line 71-79: store_response currently unconditionally appends to
self.turn_list and relies on callers to call should_store_response first; change
store_response (and reference TurnThreadingMode constants:
THREAD_ASSISTANT_RESPONSES, SKIP_ASSISTANT_RESPONSES, ISOLATED_TURNS) to enforce
the threading policy by checking should_store_response() at the top and only
appending response_turn to self.turn_list when it returns True (for non-storing
modes, make store_response a no-op or optionally log/ignore instead of mutating
turn_list) so the session invariant cannot be broken by callers forgetting the
guard.

In `@tests/unit/dataset/test_message_normalizer.py`:
- Around line 49-54: The helper function _tool_use_block (and the other similar
helpers referenced) currently type-annotates variadic kwargs as **extra: Any
which falsely constrains the dynamic kwargs; remove the explicit Any annotation
and make them untyped (**extra) or, if you intended a concrete contract, replace
**extra with explicit named parameters. Update the signature of _tool_use_block
and the other helper functions (the ones around the other occurrences) to drop
the ": Any" on **extra so callers can pass dynamic keyword args without the
misleading annotation.

In `@tests/unit/timing/test_subagent_manager_adversary.py`:
- Around line 216-218: The test test_double_cleanup_is_safe unpacks child_ids
from _make_orchestrator() but never uses it; update the unpack to prefix the
unused variable with an underscore (e.g., _child_ids or use a single underscore)
when calling _make_orchestrator() so the unused unpacked variable is clearly
marked, keeping the rest of the test and the _spawn_children(orch) call
unchanged.
- Around line 687-714: The test
TestBackgroundChildErrorIsolation.test_background_child_error_does_not_affect_join
unpacks child_ids and child_corr_ids from _make_orchestrator/_spawn_children but
never uses them; update the unpacking to use prefixed unused variables (e.g.,
_child_ids, _child_corr_ids or simply _, _) when calling _make_orchestrator()
and _spawn_children() so the unused values are clearly marked and lint warnings
are avoided.
- Around line 206-208: In test_cleanup_clears_all_state, the unpacked variable
child_ids is unused; update the unpacking from _make_orchestrator() to prefix
that variable with an underscore (e.g., _child_ids or just _ ) so static
analysis no longer flags it; adjust the assignment where orch, _, _, _,
child_ids = _make_orchestrator() to orch, _, _, _, _child_ids =
_make_orchestrator() (or replace the last target with a single underscore) to
locate the change near the test_cleanup_clears_all_state definition and the call
to _make_orchestrator().

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: be6286e4-127e-4877-8cd3-dc33a7067056

📥 Commits

Reviewing files that changed from the base of the PR and between a0f0eff642401bd4825820e66d561d4ce62d5cc4 and a729fa5.

📒 Files selected for processing (50)
  • README.md
  • docs/cli-options.md
  • docs/tutorials/conflux-trace.md
  • fern/versions/dev.yml
  • src/aiperf/common/config/config_defaults.py
  • src/aiperf/common/config/input_config.py
  • src/aiperf/common/config/user_config.py
  • src/aiperf/common/enums/__init__.py
  • src/aiperf/common/enums/enums.py
  • src/aiperf/common/models/__init__.py
  • src/aiperf/common/models/dataset_models.py
  • src/aiperf/credit/issuer.py
  • src/aiperf/credit/structs.py
  • src/aiperf/dataset/composer/custom.py
  • src/aiperf/dataset/dataset_manager.py
  • src/aiperf/dataset/loader/__init__.py
  • src/aiperf/dataset/loader/base_loader.py
  • src/aiperf/dataset/loader/base_trace_loader.py
  • src/aiperf/dataset/loader/conflux.py
  • src/aiperf/dataset/loader/models.py
  • src/aiperf/dataset/loader/mooncake_trace.py
  • src/aiperf/dataset/message_normalizer.py
  • src/aiperf/endpoints/base_endpoint.py
  • src/aiperf/endpoints/openai_chat.py
  • src/aiperf/plugin/enums.py
  • src/aiperf/plugin/plugins.py
  • src/aiperf/plugin/plugins.yaml
  • src/aiperf/plugin/schema/plugins.schema.json
  • src/aiperf/plugin/schema/schemas.py
  • src/aiperf/timing/config.py
  • src/aiperf/timing/conversation_source.py
  • src/aiperf/timing/phase/runner.py
  • src/aiperf/timing/strategies/fixed_schedule.py
  • src/aiperf/timing/strategies/request_rate.py
  • src/aiperf/timing/strategies/user_centric_rate.py
  • src/aiperf/timing/subagent_orchestrator.py
  • src/aiperf/workers/inference_client.py
  • src/aiperf/workers/session_manager.py
  • src/aiperf/workers/worker.py
  • tests/component_integration/dataset/test_subagent_orchestrator.py
  • tests/unit/common/config/test_input_config.py
  • tests/unit/common/config/test_user_config_mooncake_trace.py
  • tests/unit/dataset/loader/test_conflux.py
  • tests/unit/dataset/test_message_normalizer.py
  • tests/unit/endpoints/test_openai_chat_completions.py
  • tests/unit/timing/conftest.py
  • tests/unit/timing/strategies/test_fixed_schedule.py
  • tests/unit/timing/test_subagent_manager.py
  • tests/unit/timing/test_subagent_manager_adversary.py
  • tests/unit/workers/test_session_manager.py
🚧 Files skipped from review as they are similar to previous changes (16)
  • src/aiperf/endpoints/openai_chat.py
  • src/aiperf/plugin/schema/plugins.schema.json
  • src/aiperf/dataset/dataset_manager.py
  • src/aiperf/dataset/composer/custom.py
  • docs/tutorials/conflux-trace.md
  • src/aiperf/timing/phase/runner.py
  • tests/unit/common/config/test_user_config_mooncake_trace.py
  • src/aiperf/endpoints/base_endpoint.py
  • src/aiperf/plugin/schema/schemas.py
  • tests/unit/common/config/test_input_config.py
  • README.md
  • src/aiperf/common/models/init.py
  • src/aiperf/plugin/plugins.py
  • src/aiperf/common/config/config_defaults.py
  • tests/unit/timing/conftest.py
  • fern/versions/dev.yml

Fix 8 actionable issues from PR #767 review: preserve tool_result
ordering in Anthropic user messages, merge system/developer messages
instead of overwriting, detect text-only Anthropic assistant blocks,
narrow orjson exception catch, default agent_depth from metadata,
add public set_dispatch API for SubagentOrchestrator, fix test helper
sentinel for explicit None, and update CLI help text for Conflux.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Detect spawn and join points in Conflux traces so parent turns that
consume child results are held until all blocking children complete.
Adds TurnPrerequisite model, PrerequisiteKind enum, SubagentMixin for
timing strategies, and join-gate logic in SubagentOrchestrator. Ensures
replay fidelity whether the server is faster or slower than the original.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (3)
docs/tutorials/conflux-trace.md (3)

198-204: Add language specifier to technical diagram code blocks.

The static analysis tool flagged these code blocks as missing language specifiers. Since these are conceptual timing diagrams (not executable code), adding text or plaintext as the language identifier satisfies the linter while preserving readability.

🔧 Fix for one example block (apply similarly to others)
-```
+```text
 Parent turn N:   Agent tool_use  ──► child runs
                  tool_result: "child output here"
 Parent turn N+1: JOIN — reads child result, continues
</details>


Also applies to: 229-255

<details>
<summary>🤖 Prompt for AI Agents</summary>

Verify each finding against the current code and only fix it if needed.

In @docs/tutorials/conflux-trace.md around lines 198 - 204, The fenced
timing-diagram code block containing the lines starting with "Parent turn N:
Agent tool_use ──► child runs" is missing a language specifier; update the
opening fence from totext (or ```plaintext) so the block becomes a
plain-text code block, and apply the same change to the other similar diagram
blocks (the one containing "Parent turn N+1: JOIN — reads child result,
continues" and the additional diagram blocks referenced around the same area) to
satisfy the linter while preserving formatting.


</details>

---

`36-45`: **Consider using a mermaid diagram for the agent hierarchy.**

The ASCII tree diagram showing parent/subagent structure would be clearer as a mermaid flowchart, aligning with the coding guidelines preference for mermaid over ASCII art.



<details>
<summary>📊 Suggested mermaid diagram</summary>

```diff
-```
-Parent Agent (main conversation)
-  Turn 1: User asks "refactor the auth module"
-  Turn 2: Agent reads files, thinks about approach
-    +-- Subagent A: searches codebase for auth references
-    +-- Subagent B: runs existing tests to check current behavior
-  Turn 3: Agent writes new code
-    +-- Subagent C: runs updated tests
-  Turn 4: Agent summarizes changes
-```
+```mermaid
+flowchart TD
+    P1[Turn 1: User asks 'refactor the auth module']
+    P2[Turn 2: Agent reads files, thinks about approach]
+    P3[Turn 3: Agent writes new code]
+    P4[Turn 4: Agent summarizes changes]
+    
+    P1 --> P2
+    P2 --> SA[Subagent A: searches codebase]
+    P2 --> SB[Subagent B: runs existing tests]
+    P2 --> P3
+    P3 --> SC[Subagent C: runs updated tests]
+    P3 --> P4
+```
</details>

<details>
<summary>🤖 Prompt for AI Agents</summary>

```
Verify each finding against the current code and only fix it if needed.

In `@docs/tutorials/conflux-trace.md` around lines 36 - 45, Replace the ASCII tree
diagram in the docs/tutorials/conflux-trace.md example (the "Parent Agent (main
conversation)" block showing Turn 1–4 and subagents A/B/C) with a mermaid
flowchart per the repo style guide; remove the ASCII art fence and insert a
fenced code block labeled "mermaid" that maps Turn 1→Turn 2→Subagent A/Subagent
B→Turn 3→Subagent C→Turn 4 using distinct node labels (e.g., P1, P2, SA, SB, P3,
SC, P4) and directed edges as shown in the suggested mermaid layout so the
hierarchy renders correctly.
```

</details>

---

`272-275`: **Add language specifier for terminal output block.**

This block shows example terminal output. Adding a `text` language specifier addresses the linter warning while preserving the output exactly as shown.



```diff
-```
+```text
 Loaded 3 agent threads + 5 utility calls skipped (28 total records)
 Converted 3 conversations (28 total turns, 2 subagent children incl. 0 orphans)
 ```
```

<details>
<summary>🤖 Prompt for AI Agents</summary>

Verify each finding against the current code and only fix it if needed.

In @docs/tutorials/conflux-trace.md around lines 272 - 275, Add a language
specifier "text" to the fenced code block that contains the two example terminal
lines ("Loaded 3 agent threads + 5 utility calls skipped (28 total records)" and
"Converted 3 conversations (28 total turns, 2 subagent children incl. 0
orphans)") by replacing the opening backticks () with text so the block
becomes a plain-text code block and the linter warning is resolved.


</details>

</blockquote></details>

</blockquote></details>

<details>
<summary>🤖 Prompt for all review comments with AI agents</summary>

Verify each finding against the current code and only fix it if needed.

Inline comments:
In @src/aiperf/timing/phase/runner.py:

  • Around line 205-220: The new subagent-detection logic dereferences
    self._conversation_source.dataset_metadata.conversations without ensuring
    dataset_metadata exists; update the has_subagents check in PhaseRunner (the
    block creating strategy_kwargs and computing has_subagents) to first verify
    self._conversation_source.dataset_metadata is truthy (or not None) before
    iterating its .conversations, and only import/instantiate SubagentOrchestrator
    and set strategy_kwargs["subagents"] when dataset_metadata is present and
    any(c.subagent_spawns for c in
    self._conversation_source.dataset_metadata.conversations) is True so sources
    without metadata won't raise before the phase lifecycle/error-publishing path
    runs.

In @src/aiperf/timing/strategies/subagent_mixin.py:

  • Around line 32-44: The on_failed_credit method currently skips terminate_child
    for errored/cancelled child credits when credit.is_final_turn is True, leaving
    spawn/join gates unresolved; update on_failed_credit in subagent_mixin.py so
    that any child credit with credit.agent_depth > 0 invokes
    self._subagents.terminate_child(credit) regardless of credit.is_final_turn
    (i.e., remove or change the not credit.is_final_turn guard), preserving the
    early return when self._subagents is falsy and keeping the agent_depth check to
    only target child agents.

In @tests/component_integration/dataset/test_conflux_timing_modes.py:

  • Around line 85-114: The test harness currently bypasses timing strategies by
    constructing SubagentOrchestrator with a mocked scheduler and a capturing
    dispatch_fn in _build_orchestrator; to exercise FIXED_SCHEDULE, REQUEST_RATE,
    and USER_CENTRIC_RATE either parameterize _build_orchestrator to accept a real
    timing/strategy implementation (or a timing_mode argument) and instantiate the
    corresponding strategy/scheduler used by SubagentOrchestrator (instead of
    MagicMock scheduler and a stubbed dispatch_fn), or if you intend only
    orchestrator-level tests, rename the suite to indicate "orchestrator-only"
    coverage; update references to SubagentOrchestrator, _build_orchestrator,
    dispatch_fn, and the timing mode constants (FIXED_SCHEDULE, REQUEST_RATE,
    USER_CENTRIC_RATE) so the chosen approach is clear and the real strategy
    _dispatch_turn paths are exercised when running the tests.

In @tests/unit/dataset/test_message_normalizer.py:

  • Around line 616-630: Add a new test case to cover the inverse ordering
    (tool_result followed by text) to prevent regressions in Anthropic tool_result
    ordering: create a message where content starts with one or more
    _tool_result_block(s) then a _text_block, call _normalize_anthropic_user and
    assert the resulting list preserves the tool entries before the user text (check
    roles and tool_call_id), then round-trip the normalized messages through
    to_anthropic_messages and assert the output matches the original Anthropic-style
    ordering; reference the existing test_text_plus_tool_result_split pattern and
    functions _normalize_anthropic_user and to_anthropic_messages when adding this
    case.

In @tests/unit/timing/test_subagent_manager.py:

  • Around line 239-253: In test_background_spawn_does_not_suspend_parent, the
    tuple returned by _make_orchestrator is unpacked into orch, _, _, scheduler,
    child_ids but child_ids is unused; change the final binding from child_ids to _
    to avoid the unused-variable warning (i.e., update the unpacking where
    _make_orchestrator(...) is called in
    test_background_spawn_does_not_suspend_parent).

In @tests/unit/timing/test_subagent_orchestrator_gating.py:

  • Around line 248-249: The test function
    test_child_gate_entry_fields_after_spawn unpacks child_ids from
    _make_simple_orchestrator() but never uses it; replace the unused binding with
    an underscore (e.g., orch, _, _, _ = _make_simple_orchestrator()) to satisfy the
    linter. Do the same replacement for the other similar test that currently
    unpacks child_ids (the other occurrence noted in the file) so both tests remove
    the unused variable without changing behavior or coverage.
  • Around line 662-664: The test currently allows creating
    TurnPrerequisite(kind=PrerequisiteKind.SPAWN_JOIN) with spawn_id None, but
    SPAWN_JOIN must require a concrete spawn_id; update the TurnPrerequisite model
    (e.g., in its constructor/post_init or Pydantic validators) to
    raise/validate when kind == PrerequisiteKind.SPAWN_JOIN and spawn_id is None,
    and change or remove test_spawn_join_prerequisite_spawn_id_defaults_to_none to
    assert that construction fails (or that validation raises) for a SPAWN_JOIN
    without spawn_id; refer to TurnPrerequisite, PrerequisiteKind.SPAWN_JOIN, and
    test_spawn_join_prerequisite_spawn_id_defaults_to_none when making these
    changes.

Nitpick comments:
In @docs/tutorials/conflux-trace.md:

  • Around line 198-204: The fenced timing-diagram code block containing the lines
    starting with "Parent turn N: Agent tool_use ──► child runs" is missing a
    language specifier; update the opening fence from totext (or
change to the other similar diagram blocks (the one containing "Parent turn N+1:
JOIN — reads child result, continues" and the additional diagram blocks
referenced around the same area) to satisfy the linter while preserving
formatting.
- Around line 36-45: Replace the ASCII tree diagram in the
docs/tutorials/conflux-trace.md example (the "Parent Agent (main conversation)"
block showing Turn 1–4 and subagents A/B/C) with a mermaid flowchart per the
repo style guide; remove the ASCII art fence and insert a fenced code block
labeled "mermaid" that maps Turn 1→Turn 2→Subagent A/Subagent B→Turn 3→Subagent
C→Turn 4 using distinct node labels (e.g., P1, P2, SA, SB, P3, SC, P4) and
directed edges as shown in the suggested mermaid layout so the hierarchy renders
correctly.
- Around line 272-275: Add a language specifier "text" to the fenced code block
that contains the two example terminal lines ("Loaded 3 agent threads + 5
utility calls skipped (28 total records)" and "Converted 3 conversations (28
total turns, 2 subagent children incl. 0 orphans)") by replacing the opening
backticks (```) with ```text so the block becomes a plain-text code block and
the linter warning is resolved.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: a1da85e5-da34-49b2-b73f-7b35957d91a3

📥 Commits

Reviewing files that changed from the base of the PR and between a729fa5 and c2f7ca0.

📒 Files selected for processing (30)
  • docs/cli-options.md
  • docs/tutorials/conflux-trace.md
  • src/aiperf/common/config/input_config.py
  • src/aiperf/common/enums/__init__.py
  • src/aiperf/common/enums/enums.py
  • src/aiperf/common/models/__init__.py
  • src/aiperf/common/models/dataset_models.py
  • src/aiperf/credit/callback_handler.py
  • src/aiperf/credit/issuer.py
  • src/aiperf/credit/structs.py
  • src/aiperf/dataset/loader/conflux.py
  • src/aiperf/dataset/message_normalizer.py
  • src/aiperf/timing/conversation_source.py
  • src/aiperf/timing/phase/runner.py
  • src/aiperf/timing/strategies/core.py
  • src/aiperf/timing/strategies/fixed_schedule.py
  • src/aiperf/timing/strategies/request_rate.py
  • src/aiperf/timing/strategies/subagent_mixin.py
  • src/aiperf/timing/strategies/user_centric_rate.py
  • src/aiperf/timing/subagent_orchestrator.py
  • tests/component_integration/dataset/test_conflux_timing_modes.py
  • tests/component_integration/dataset/test_subagent_orchestrator.py
  • tests/unit/credit/test_callback_handler.py
  • tests/unit/dataset/loader/test_conflux.py
  • tests/unit/dataset/test_message_normalizer.py
  • tests/unit/timing/phase/test_runner.py
  • tests/unit/timing/strategies/test_subagent_mixin.py
  • tests/unit/timing/test_subagent_manager.py
  • tests/unit/timing/test_subagent_manager_adversary.py
  • tests/unit/timing/test_subagent_orchestrator_gating.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • docs/cli-options.md
  • src/aiperf/common/enums/init.py

Comment on lines +32 to +44
def on_failed_credit(self, credit_return: CreditReturn) -> None:
"""Handle errored or cancelled child credit returns.

Called by CreditCallbackHandler for every errored/cancelled return,
regardless of can_send_any_turn() — this is gate-tracking cleanup,
not new work dispatch. The orchestrator's _satisfy_prerequisite checks
can_send_any_turn() internally before dispatching gated turns.
"""
if not self._subagents:
return
credit = credit_return.credit
if credit.agent_depth > 0 and not credit.is_final_turn:
self._subagents.terminate_child(credit)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Failed final child turns are skipped from gate cleanup.

on_failed_credit() is the only failure path that runs regardless of can_send_any_turn(). The not credit.is_final_turn guard means a child that errors or is cancelled on its last turn never calls terminate_child(), which can leave a blocking spawn/join gate unresolved.

Suggested fix
-        if credit.agent_depth > 0 and not credit.is_final_turn:
+        if credit.agent_depth > 0:
             self._subagents.terminate_child(credit)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/timing/strategies/subagent_mixin.py` around lines 32 - 44, The
on_failed_credit method currently skips terminate_child for errored/cancelled
child credits when credit.is_final_turn is True, leaving spawn/join gates
unresolved; update on_failed_credit in subagent_mixin.py so that any child
credit with credit.agent_depth > 0 invokes
self._subagents.terminate_child(credit) regardless of credit.is_final_turn
(i.e., remove or change the not credit.is_final_turn guard), preserving the
early return when self._subagents is falsy and keeping the agent_depth check to
only target child agents.

Comment on lines +85 to +114
def _build_orchestrator(
ds: DatasetMetadata,
*,
stop_can_send: bool = True,
) -> tuple[SubagentOrchestrator, list[TurnToSend], MagicMock]:
"""Build orchestrator from dataset with a capturing dispatch_fn."""
root_ids = [c.conversation_id for c in ds.conversations if c.agent_depth == 0]
sampler = _make_sampler(root_ids)
src = ConversationSource(ds, sampler)

scheduler = MagicMock()
scheduler.execute_async = MagicMock()
scheduler.schedule_at_perf_sec = MagicMock()
scheduler.schedule_later = MagicMock()
dispatched: list[TurnToSend] = []

orch = SubagentOrchestrator(
conversation_source=src,
credit_issuer=MagicMock(
issue_credit=AsyncMock(return_value=True),
try_issue_credit=AsyncMock(return_value=True),
),
stop_checker=MagicMock(
can_send_any_turn=MagicMock(return_value=stop_can_send),
can_start_new_session=MagicMock(return_value=stop_can_send),
),
scheduler=scheduler,
dispatch_fn=lambda turn: dispatched.append(turn),
)
return orch, dispatched, scheduler
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

These “all modes” tests bypass the timing strategies entirely.

_build_orchestrator() instantiates SubagentOrchestrator directly and captures dispatch_fn, so this harness never exercises FIXED_SCHEDULE, REQUEST_RATE, or USER_CENTRIC_RATE. A mode-specific regression in a strategy’s _dispatch_turn path would still pass here. Either parameterize the harness over the real strategy implementations or rename/scope the suite to orchestrator-only coverage.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/component_integration/dataset/test_conflux_timing_modes.py` around
lines 85 - 114, The test harness currently bypasses timing strategies by
constructing SubagentOrchestrator with a mocked scheduler and a capturing
dispatch_fn in _build_orchestrator; to exercise FIXED_SCHEDULE, REQUEST_RATE,
and USER_CENTRIC_RATE either parameterize _build_orchestrator to accept a real
timing/strategy implementation (or a timing_mode argument) and instantiate the
corresponding strategy/scheduler used by SubagentOrchestrator (instead of
MagicMock scheduler and a stubbed dispatch_fn), or if you intend only
orchestrator-level tests, rename the suite to indicate "orchestrator-only"
coverage; update references to SubagentOrchestrator, _build_orchestrator,
dispatch_fn, and the timing mode constants (FIXED_SCHEDULE, REQUEST_RATE,
USER_CENTRIC_RATE) so the chosen approach is clear and the real strategy
_dispatch_turn paths are exercised when running the tests.

Comment on lines +239 to +253
def test_background_spawn_does_not_suspend_parent(self):
orch, _, _, scheduler, child_ids = _make_orchestrator(is_background=True)
credit = _make_credit(
conv_id="conv_0", corr_id="parent-1", turn_index=2, num_turns=6
)
handled = orch.intercept(credit)

assert handled is False
assert "parent-1" not in orch._gated_turns
assert len(orch._child_to_gate) == 0

dispatched = orch._test_dispatched # type: ignore[attr-defined]
bg_dispatches = [d for d in dispatched if d.agent_depth == 1]
assert len(bg_dispatches) == 2
assert scheduler.execute_async.call_count == 0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove the unused child_ids binding.

Ruff already flags this unpack as unused. Replace it with _ so the new test file stays warning-free.

🧰 Tools
🪛 Ruff (0.15.6)

[warning] 240-240: Unpacked variable child_ids is never used

Prefix it with an underscore or any other dummy variable pattern

(RUF059)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/timing/test_subagent_manager.py` around lines 239 - 253, In
test_background_spawn_does_not_suspend_parent, the tuple returned by
_make_orchestrator is unpacked into orch, _, _, scheduler, child_ids but
child_ids is unused; change the final binding from child_ids to _ to avoid the
unused-variable warning (i.e., update the unpacking where
_make_orchestrator(...) is called in
test_background_spawn_does_not_suspend_parent).

Comment on lines +248 to +249
def test_child_gate_entry_fields_after_spawn(self) -> None:
orch, _, _, _, child_ids = _make_simple_orchestrator()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Drop the unused child_ids unpacking in these tests.

Both bindings are unused and already show up in Ruff. Swapping them to _ keeps the file lint-clean without changing coverage.

Also applies to: 494-494

🧰 Tools
🪛 Ruff (0.15.6)

[warning] 249-249: Unpacked variable child_ids is never used

Prefix it with an underscore or any other dummy variable pattern

(RUF059)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/timing/test_subagent_orchestrator_gating.py` around lines 248 -
249, The test function test_child_gate_entry_fields_after_spawn unpacks
child_ids from _make_simple_orchestrator() but never uses it; replace the unused
binding with an underscore (e.g., orch, _, _, _ = _make_simple_orchestrator())
to satisfy the linter. Do the same replacement for the other similar test that
currently unpacks child_ids (the other occurrence noted in the file) so both
tests remove the unused variable without changing behavior or coverage.

Comment on lines +662 to +664
def test_spawn_join_prerequisite_spawn_id_defaults_to_none(self) -> None:
prereq = TurnPrerequisite(kind=PrerequisiteKind.SPAWN_JOIN)
assert prereq.spawn_id is None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

SPAWN_JOIN without spawn_id should not be a valid model state.

A join prerequisite cannot be satisfied without a concrete spawn_id. Encoding TurnPrerequisite(kind=PrerequisiteKind.SPAWN_JOIN) as valid pushes malformed dataset problems to runtime instead of catching them during validation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/timing/test_subagent_orchestrator_gating.py` around lines 662 -
664, The test currently allows creating
TurnPrerequisite(kind=PrerequisiteKind.SPAWN_JOIN) with spawn_id None, but
SPAWN_JOIN must require a concrete spawn_id; update the TurnPrerequisite model
(e.g., in its constructor/__post_init__ or Pydantic validators) to
raise/validate when kind == PrerequisiteKind.SPAWN_JOIN and spawn_id is None,
and change or remove test_spawn_join_prerequisite_spawn_id_defaults_to_none to
assert that construction fails (or that validation raises) for a SPAWN_JOIN
without spawn_id; refer to TurnPrerequisite, PrerequisiteKind.SPAWN_JOIN, and
test_spawn_join_prerequisite_spawn_id_defaults_to_none when making these
changes.

Guard dataset_metadata access in PhaseRunner to prevent crash when
source has no metadata. Fix model_name alignment with extra_params
overrides, defer expensive trace logging, add inverse tool_result
ordering test, use mermaid/text fenced blocks in docs, alphabetize
__all__ exports, and document new Credit fields.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
…, and child credit bypass

Adds ConfluxAnalyzer for trace statistics (conversation topology, timing
distributions, subagent depth analysis) with auto-detection in the
analyze-trace CLI. Supports gzip/zstd-compressed base64 payloads in the
Conflux loader. Fixes child session credit handling so subagent turns
bypass session slot acquisition and stop-condition new-session checks,
preventing deadlocks when parent gates await child completion.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

♻️ Duplicate comments (2)
src/aiperf/workers/inference_client.py (1)

103-111: ⚠️ Potential issue | 🟠 Major

Keep record.model_name tied to the payload that was actually sent.

Line 107 skips merge_turn_params() for non-dict payloads, but Line 191 still overwrites record.model_name from extra_params["model"]. In that path the request goes out with the original model while metrics/export attribute it to the override. Reuse the effective model from the merged payload, or only override the record when the merge actually ran.

Also applies to: 189-193

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/workers/inference_client.py` around lines 103 - 111, The code
currently skips merge_turn_params for non-dict formatted_payload but later
unconditionally sets record.model_name from current_turn.extra_params["model"],
causing metrics to reflect an overridden model that wasn't actually sent; update
the logic around current_turn, formatted_payload, merge_turn_params and
record.model_name so that record.model_name is set from the actual payload
sent—either perform the merge for non-dict payloads (so the effective model in
formatted_payload reflects the merge) or only override record.model_name when
merge_turn_params actually ran (i.e., when isinstance(formatted_payload, dict)
and merge applied), and in the alternative ensure you extract the effective
model from the merged formatted_payload rather than directly from
extra_params["model"] before sending.
src/aiperf/dataset/message_normalizer.py (1)

291-309: ⚠️ Potential issue | 🟠 Major

Preserve per-text-block data when block_order is emitted.

Line 291 collapses all text blocks into one string, but Line 309 stores per-block indices. In ordered emit (Line 647), any text index > 0 is dropped, so interleaved messages can lose text blocks or ordering fidelity.

Proposed fix
@@
-    combined_text = "\n\n".join(text_parts) if text_parts else None
+    combined_text = "\n\n".join(text_parts) if text_parts else None
@@
-    if thinking_blocks and any(_get_meta(tc, "server_tool") for tc in tool_calls):
-        _set_meta(out, "block_order", block_order)
+    if thinking_blocks and any(_get_meta(tc, "server_tool") for tc in tool_calls):
+        _set_meta(out, "block_order", block_order)
+        _set_meta(out, "text_blocks", text_parts.copy())
@@
-    if isinstance(text, str):
-        text_parts = [text] if text else []
+    text_blocks = _get_meta(msg, "text_blocks")
+    if isinstance(text_blocks, list):
+        text_parts = [t for t in text_blocks if isinstance(t, str)]
+    elif isinstance(text, str):
+        text_parts = [text] if text else []

Also applies to: 635-649

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/message_normalizer.py` around lines 291 - 309, The code
currently collapses all text blocks into a single combined_text (variable
combined_text built from text_parts) before potentially emitting a block_order
meta and that loses per-block text when interleaved tool messages exist; change
the logic in the section that builds out (where combined_text, out["content"],
and _set_meta(out, "block_order", block_order) are handled) so that if
thinking_blocks is present and any tool_calls have _get_meta(..., "server_tool")
true you preserve the original text_parts as per-block content (e.g., set a meta
like _set_meta(out, "text_blocks", text_parts) or
out["content_blocks"]=text_parts) while still setting out["content"] to the
joined fallback for consumers that expect a single string; ensure _set_meta is
used to store the per-block list before setting "block_order" so the ordered
emit code can reference per-block text indices instead of losing text >0.
🧹 Nitpick comments (7)
tests/component_integration/timing/test_poisson_rate.py (1)

171-173: Consider aligning “sustained stress” semantics with the reduced workload.

With 30 total requests, this may behave more like a short functional check than sustained stress. Consider either increasing volume again or renaming the test to reflect the lighter scope.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/component_integration/timing/test_poisson_rate.py` around lines 171 -
173, The test's current parameters (num_sessions=10, qps=150.0,
turns_per_session=3) only generate ~30 requests which reads like a short
functional check; either raise the workload (e.g., increase num_sessions, qps,
or turns_per_session used in the Poisson-rate test) to produce a sustained
stress profile or update the test name/docstring to reflect it is a light/smoke
check—modify the call site that sets num_sessions, qps, and turns_per_session or
rename the test function to indicate "light" or "smoke" scope accordingly.
src/aiperf/dataset/synthesis/cli.py (1)

222-267: Prefer removing “what” comments in the Conflux table block.

Line 222, Line 234, Line 251, and Line 267 restate behavior already obvious from code. Keep comments only when they explain non-obvious rationale.

As per coding guidelines, "Add comments only for 'why?' not 'what' - do not over-comment code".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/synthesis/cli.py` around lines 222 - 267, Remove the
redundant "what" comments in the Conflux table block—specifically the comments
that restate obvious behavior around the Models and Per-agent breakdown
sections; keep only meaningful "why" comments. Locate the blocks that build
model_table (guarded by stats.models_used) and agent_table (guarded by
stats.agent_breakdown) and delete the explanatory comments like "# Models" and
"# Per-agent breakdown" (and any similar comments adjacent to the
model_short/_fmt_tokens rows) while preserving any comments that explain
non-obvious rationale.
tests/component_integration/dataset/test_conflux_timing_modes.py (3)

596-596: Prefix unused scheduler variable with underscore.

-        orch, dispatched, scheduler = _build_orchestrator(ds)
+        orch, dispatched, _scheduler = _build_orchestrator(ds)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/component_integration/dataset/test_conflux_timing_modes.py` at line
596, The variable returned from _build_orchestrator is currently assigned to
orch, dispatched, scheduler but scheduler is unused; rename it to _scheduler (or
prefix with an underscore) where the unpack happens in the test (the call to
_build_orchestrator in test_conflux_timing_modes.py) so the unused variable
follows the convention and linting rules, leaving orch and dispatched unchanged.

501-501: Prefix unused dispatched variable with underscore.

Static analysis flagged this as unused.

-        orch, dispatched, _ = _build_orchestrator(ds)
+        orch, _dispatched, _ = _build_orchestrator(ds)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/component_integration/dataset/test_conflux_timing_modes.py` at line
501, The tuple unpack from _build_orchestrator currently assigns the second
value to dispatched but it's not used; change the unpack to prefix that variable
with an underscore (e.g., replace dispatched with _dispatched or just _ ) so
static analysis no longer flags it, keeping _build_orchestrator and orch
unchanged.

419-419: Prefix unused scheduler variable with underscore.

Static analysis flagged this unpacked variable as unused. Prefix with underscore to indicate intentional disuse.

-        orch, dispatched, scheduler = _build_orchestrator(ds)
+        orch, dispatched, _scheduler = _build_orchestrator(ds)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/component_integration/dataset/test_conflux_timing_modes.py` at line
419, The unpacked variable from the _build_orchestrator call has an unused third
element; change the tuple unpack to match orch, dispatched, _scheduler (or orch,
dispatched, _ ) so the unused scheduler is prefixed with an underscore to
satisfy static analysis and indicate intentional disuse of the third return
value from _build_orchestrator.
tests/unit/credit/test_child_session_bypass.py (1)

569-623: Redundant test setup pattern.

The test creates handler with mock_concurrency, resets mocks in a loop (line 581), then creates handler2 with mock_concurrency2 to actually count calls. The first loop with resets serves no verification purpose. Consider simplifying to use only the second handler.

♻️ Suggested simplification

Remove the first loop (lines 580-589) since it doesn't verify anything meaningful due to the reset_mock() calls. The second handler setup and loop already covers the full scenario verification.

-        for i, (conv, tidx, nturns, depth, _) in enumerate(returns):
-            mock_concurrency.reset_mock()
-            credit = _credit(
-                credit_id=i,
-                conv=conv,
-                turn_index=tidx,
-                num_turns=nturns,
-                agent_depth=depth,
-            )
-            await handler.on_credit_return("worker-1", _credit_return(credit))
-
-        # Count total session releases across all calls
-        # We reset mocks between calls, so check the last call only confirms pattern.
-        # Instead, let's do it without reset:
         mock_concurrency2 = MagicMock(
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/credit/test_child_session_bypass.py` around lines 569 - 623, The
test contains a redundant setup and loop: it constructs handler with
mock_concurrency, iterates calling handler.on_credit_return while resetting
mocks each iteration, but then re-runs the real verification with
handler2/mock_concurrency2; remove the first handler loop and related
reset_mock() usage to simplify the test. Specifically, delete the initial
for-loop that calls handler.on_credit_return and mock_concurrency.reset_mock(),
leaving only the second setup that creates mock_concurrency2, handler2,
registers the phase, iterates creating credits via _credit and calling
handler2.on_credit_return, and asserts
mock_concurrency2.release_session_slot.call_count == 2; keep the helper
functions _credit and _credit_return and the returns list intact.
src/aiperf/timing/subagent_orchestrator.py (1)

735-746: Catch specific exceptions instead of broad Exception.

The static analyzer flags except Exception as too broad. Consider catching more specific exceptions or at minimum logging the exception details for debugging.

♻️ Proposed fix
         try:
             issued = await self._credit_issuer.issue_credit(turn)
-        except Exception:
+        except Exception as exc:
             self.warning(
-                f"Exception issuing credit for child {corr_id}, releasing from gate"
+                f"Exception issuing credit for child {corr_id}, releasing from gate: {exc}"
             )
             issued = False
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/timing/subagent_orchestrator.py` around lines 735 - 746, Replace
the broad "except Exception" around the await
self._credit_issuer.issue_credit(turn) call with more specific exception
handling for the errors the credit issuer can raise (e.g., CreditIssuanceError
or IssueCreditError) or, if uncertain, catch Exception but log the actual
exception details; update the block around
self._credit_issuer.issue_credit(turn) to either list the specific Exception
types or call self.exception(...) / include exc_info in the warning message so
the exception details are recorded while preserving the existing flow that sets
issued = False, and ensure corr_id, issued, _release_child(corr_id), and
_dispatch(gated) remain unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/cli-options.md`:
- Around line 59-60: Replace the single inline description "Auto-detects the
format: - Conflux JSON (file or directory of files): conversation structure,
token distributions, timing - JSONL traces (Mooncake/Bailian): ISL/OSL
distributions, prefix cache hit rates" with proper markdown bullets: keep the
lead "Auto-detects the format:" followed by two top-level bullet items for
"Conflux JSON" and "JSONL traces" and use sub-bullets (or parenthetical text) to
show the specifics ("file or directory of files: conversation structure, token
distributions, timing" for Conflux JSON; "Mooncake/Bailian: ISL/OSL
distributions, prefix cache hit rates" for JSONL traces) so each detail appears
on its own line and is easy to read.

In `@docs/tutorials/conflux-trace.md`:
- Around line 198-202: Replace the ASCII art code fences with Mermaid diagrams:
locate the text fences containing the lines beginning with "Parent turn N:  
Agent tool_use  ──► child runs" and the following "tool_result: 'child output
here'" (and the similar block at lines ~229-255) and convert each to a mermaid
sequenceDiagram that declares participants (e.g., Parent, Child), then
represents the "tool_use" call as a message from Parent to Child, the child's
run as Child->>Parent with the "tool_result" payload, and the "JOIN — reads
child result, continues" as a subsequent message or note; ensure the code fence
language is changed to "mermaid" and the diagram conveys the same participants
and message ordering as the original ASCII timeline.

In `@src/aiperf/common/config/user_config.py`:
- Around line 398-402: The probe-only read for JSON in
_should_auto_enable_fixed_schedule() uses orjson.loads() on a 1MB slice
(_PROBE_BYTES) which will fail for large JSON files; change the logic so that
when path.suffix == ".json" you read the entire file contents (not just the
probe) before calling orjson.loads(), or replace this with a streaming/iterative
JSON parser if memory is a concern; specifically update the block that currently
reads probe = f.read(_PROBE_BYTES) and calls orjson.loads(probe) to instead
obtain the full JSON string (or use a streaming parser) so orjson.loads()
receives a complete document.
- Around line 392-396: validate_timing_mode() currently falls back to
_count_dataset_entries() which reads self.input.file and thus returns 0 or wrong
counts for directory inputs and JSON arrays; update validate_timing_mode() to
resolve directory inputs the same way as the earlier block (select the first
.json file when path.is_dir()) and pass that resolved file path into the
counting logic instead of relying on self.input.file, and extend
_count_dataset_entries() (or add a helper) to detect JSON array files and count
array elements (and keep counting newline-delimited JSON as before) so
request_count reflects the actual dataset entries.

In `@src/aiperf/dataset/loader/conflux.py`:
- Around line 406-407: The file probe block that currently does "except
Exception: return False" should be changed to only catch expected I/O and parse
errors; replace it with a narrow except clause (for example: except (OSError,
IOError, UnicodeDecodeError, ValueError, json.JSONDecodeError) as e: return
False) and let any other exceptions propagate (or re-raise them) so real bugs
surface; update the exception handling in the probe function/block that contains
"except Exception: return False" accordingly.

In `@src/aiperf/dataset/message_normalizer.py`:
- Around line 656-659: The ordered assistant emitter
_emit_anthropic_assistant_ordered is dropping message-level cache_control
metadata; update that function to restore the original message["cache_control"]
(or set it to message.get("cache_control")) on the returned dict just like
_emit_anthropic_assistant does, ensuring the returned object includes
"cache_control" when present so ordered messages preserve metadata.

In `@src/aiperf/dataset/synthesis/cli.py`:
- Around line 57-64: The _save_report function should be converted to async and
stop doing blocking I/O and wrong JSON serialization: change _save_report to
async def _save_report(...), call stats.model_dump() (or equivalent) and
serialize with orjson.dumps(data) to get bytes, then run blocking filesystem ops
(output_file.parent.mkdir and output_file.write_bytes(serialized_bytes)) inside
await asyncio.to_thread(...) (or use asyncio.to_thread for each blocking call).
Also update callers analyze_trace and _analyze_conflux to await
_save_report(...) so the async call is awaited. Ensure you import asyncio and
orjson and use Path.write_bytes for writing the serialized bytes.

In `@src/aiperf/timing/subagent_orchestrator.py`:
- Around line 711-725: The code registers the ChildGateEntry in _child_to_gate
after dispatching the child, causing a race where _release_child may miss the
entry if the child returns immediately; move the registration so that when
is_blocking and gated_turn_index is not None you create and assign
self._child_to_gate[session.x_correlation_id] =
ChildGateEntry(parent_corr_id=parent_corr_id, gated_turn_index=gated_turn_index,
prereq_key=f"spawn_join:{spawn_id}") before calling either
self._scheduler.execute_async(self._issue_child_credit_or_release(...)) or
self._dispatch(turn); keep the existing conditions (is_blocking and
gated_turn_index) and increment self._stats.children_spawned after dispatch as
before.

In `@tests/component_integration/dataset/test_dataset_sampling.py`:
- Line 220: Replace the ambiguous Unicode multiplication sign in the comment
"Should have 6 sessions × 3 turns = 18 requests" with a plain ASCII 'x' so it
reads "Should have 6 sessions x 3 turns = 18 requests"; update that comment in
tests/component_integration/dataset/test_dataset_sampling.py (the comment string
itself) to avoid lint/search/copy-paste issues.

In `@tests/component_integration/sessions/test_sticky_routing.py`:
- Line 74: In the test_sticky_routing.py comments that use the Unicode
multiplication sign '×' (e.g., the comment "10 sessions × 2 turns = 20 requests"
and the other similar comment), replace '×' with the ASCII letter 'x' so the
comments read "10 sessions x 2 turns = 20 requests" (and the analogous one) to
satisfy Ruff RUF003; update the comment strings in that file where they appear.

In `@tests/component_integration/timing/test_advanced_scenarios.py`:
- Around line 141-145: The assertions for total credits and timeout counts are
still using hardcoded values after reducing the scenario; change the test to
compute expected totals from the scenario config (use config.expected_requests
or derive from num_sessions * turns_per_session) and update the assertions
accordingly: compute total_requests = config.expected_requests (or num_sessions
* turns_per_session), set expected_credits = total_requests (or whatever formula
the test uses), and recompute the seeded timeout count (or relax it) instead of
asserting the old 9/31 split so the assertions reflect the new 24-request
scenario; update references in the test that currently use the hardcoded 40 and
9/31 values.

In `@tests/component_integration/timing/test_session_turn_delays.py`:
- Line 154: Replace the non-ASCII multiplication character in the inline comment
on the assertion (the line with assert result.request_count == 30) to avoid
RUF003; change "10 × 3" to a plain ASCII form such as "10 x 3" or "10 * 3" so
the comment uses only ASCII characters.

In `@tests/unit/dataset/loader/test_conflux.py`:
- Line 2420: The test's pytest.raises assertion uses an unescaped "." in the
match string which acts as a regex wildcard; update the match argument to a raw
string with an escaped dot (e.g., r"No \.json files found") so the assertion
matches the literal ".json" text in the error message (locate the
pytest.raises(...) call in the test_conflux unit test).

---

Duplicate comments:
In `@src/aiperf/dataset/message_normalizer.py`:
- Around line 291-309: The code currently collapses all text blocks into a
single combined_text (variable combined_text built from text_parts) before
potentially emitting a block_order meta and that loses per-block text when
interleaved tool messages exist; change the logic in the section that builds out
(where combined_text, out["content"], and _set_meta(out, "block_order",
block_order) are handled) so that if thinking_blocks is present and any
tool_calls have _get_meta(..., "server_tool") true you preserve the original
text_parts as per-block content (e.g., set a meta like _set_meta(out,
"text_blocks", text_parts) or out["content_blocks"]=text_parts) while still
setting out["content"] to the joined fallback for consumers that expect a single
string; ensure _set_meta is used to store the per-block list before setting
"block_order" so the ordered emit code can reference per-block text indices
instead of losing text >0.

In `@src/aiperf/workers/inference_client.py`:
- Around line 103-111: The code currently skips merge_turn_params for non-dict
formatted_payload but later unconditionally sets record.model_name from
current_turn.extra_params["model"], causing metrics to reflect an overridden
model that wasn't actually sent; update the logic around current_turn,
formatted_payload, merge_turn_params and record.model_name so that
record.model_name is set from the actual payload sent—either perform the merge
for non-dict payloads (so the effective model in formatted_payload reflects the
merge) or only override record.model_name when merge_turn_params actually ran
(i.e., when isinstance(formatted_payload, dict) and merge applied), and in the
alternative ensure you extract the effective model from the merged
formatted_payload rather than directly from extra_params["model"] before
sending.

---

Nitpick comments:
In `@src/aiperf/dataset/synthesis/cli.py`:
- Around line 222-267: Remove the redundant "what" comments in the Conflux table
block—specifically the comments that restate obvious behavior around the Models
and Per-agent breakdown sections; keep only meaningful "why" comments. Locate
the blocks that build model_table (guarded by stats.models_used) and agent_table
(guarded by stats.agent_breakdown) and delete the explanatory comments like "#
Models" and "# Per-agent breakdown" (and any similar comments adjacent to the
model_short/_fmt_tokens rows) while preserving any comments that explain
non-obvious rationale.

In `@src/aiperf/timing/subagent_orchestrator.py`:
- Around line 735-746: Replace the broad "except Exception" around the await
self._credit_issuer.issue_credit(turn) call with more specific exception
handling for the errors the credit issuer can raise (e.g., CreditIssuanceError
or IssueCreditError) or, if uncertain, catch Exception but log the actual
exception details; update the block around
self._credit_issuer.issue_credit(turn) to either list the specific Exception
types or call self.exception(...) / include exc_info in the warning message so
the exception details are recorded while preserving the existing flow that sets
issued = False, and ensure corr_id, issued, _release_child(corr_id), and
_dispatch(gated) remain unchanged.

In `@tests/component_integration/dataset/test_conflux_timing_modes.py`:
- Line 596: The variable returned from _build_orchestrator is currently assigned
to orch, dispatched, scheduler but scheduler is unused; rename it to _scheduler
(or prefix with an underscore) where the unpack happens in the test (the call to
_build_orchestrator in test_conflux_timing_modes.py) so the unused variable
follows the convention and linting rules, leaving orch and dispatched unchanged.
- Line 501: The tuple unpack from _build_orchestrator currently assigns the
second value to dispatched but it's not used; change the unpack to prefix that
variable with an underscore (e.g., replace dispatched with _dispatched or just _
) so static analysis no longer flags it, keeping _build_orchestrator and orch
unchanged.
- Line 419: The unpacked variable from the _build_orchestrator call has an
unused third element; change the tuple unpack to match orch, dispatched,
_scheduler (or orch, dispatched, _ ) so the unused scheduler is prefixed with an
underscore to satisfy static analysis and indicate intentional disuse of the
third return value from _build_orchestrator.

In `@tests/component_integration/timing/test_poisson_rate.py`:
- Around line 171-173: The test's current parameters (num_sessions=10,
qps=150.0, turns_per_session=3) only generate ~30 requests which reads like a
short functional check; either raise the workload (e.g., increase num_sessions,
qps, or turns_per_session used in the Poisson-rate test) to produce a sustained
stress profile or update the test name/docstring to reflect it is a light/smoke
check—modify the call site that sets num_sessions, qps, and turns_per_session or
rename the test function to indicate "light" or "smoke" scope accordingly.

In `@tests/unit/credit/test_child_session_bypass.py`:
- Around line 569-623: The test contains a redundant setup and loop: it
constructs handler with mock_concurrency, iterates calling
handler.on_credit_return while resetting mocks each iteration, but then re-runs
the real verification with handler2/mock_concurrency2; remove the first handler
loop and related reset_mock() usage to simplify the test. Specifically, delete
the initial for-loop that calls handler.on_credit_return and
mock_concurrency.reset_mock(), leaving only the second setup that creates
mock_concurrency2, handler2, registers the phase, iterates creating credits via
_credit and calling handler2.on_credit_return, and asserts
mock_concurrency2.release_session_slot.call_count == 2; keep the helper
functions _credit and _credit_return and the returns list intact.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: f16b7336-0d16-4c94-b0b5-7647b717bedc

📥 Commits

Reviewing files that changed from the base of the PR and between c2f7ca0 and 22b0093.

📒 Files selected for processing (35)
  • docs/cli-options.md
  • docs/tutorials/conflux-trace.md
  • src/aiperf/cli_commands/analyze_trace.py
  • src/aiperf/common/config/user_config.py
  • src/aiperf/common/enums/__init__.py
  • src/aiperf/credit/callback_handler.py
  • src/aiperf/credit/issuer.py
  • src/aiperf/credit/structs.py
  • src/aiperf/dataset/loader/conflux.py
  • src/aiperf/dataset/loader/conflux_analyzer.py
  • src/aiperf/dataset/message_normalizer.py
  • src/aiperf/dataset/synthesis/cli.py
  • src/aiperf/dataset/synthesis/models.py
  • src/aiperf/dataset/synthesis/prefix_analyzer.py
  • src/aiperf/timing/conversation_source.py
  • src/aiperf/timing/phase/credit_counter.py
  • src/aiperf/timing/phase/progress_tracker.py
  • src/aiperf/timing/phase/runner.py
  • src/aiperf/timing/strategies/core.py
  • src/aiperf/timing/strategies/subagent_mixin.py
  • src/aiperf/timing/subagent_orchestrator.py
  • src/aiperf/workers/inference_client.py
  • tests/component_integration/dataset/test_conflux_timing_modes.py
  • tests/component_integration/dataset/test_dataset_sampling.py
  • tests/component_integration/sessions/test_sticky_routing.py
  • tests/component_integration/timing/conftest.py
  • tests/component_integration/timing/test_advanced_scenarios.py
  • tests/component_integration/timing/test_constant_rate.py
  • tests/component_integration/timing/test_fixed_schedule.py
  • tests/component_integration/timing/test_poisson_rate.py
  • tests/component_integration/timing/test_session_turn_delays.py
  • tests/unit/credit/test_callback_handler.py
  • tests/unit/credit/test_child_session_bypass.py
  • tests/unit/dataset/loader/test_conflux.py
  • tests/unit/dataset/test_message_normalizer.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/aiperf/timing/strategies/subagent_mixin.py
  • src/aiperf/credit/structs.py

Comment on lines +59 to 60
Auto-detects the format: - Conflux JSON (file or directory of files): conversation structure, token distributions, timing - JSONL traces (Mooncake/Bailian): ISL/OSL distributions, prefix cache hit rates

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Format the auto-detection details as proper bullets.

The current single-line - ... - ... text is hard to read. Split into separate bullet lines for stable markdown rendering.

Proposed fix
-Auto-detects the format: - Conflux JSON (file or directory of files): conversation structure, token distributions, timing - JSONL traces (Mooncake/Bailian): ISL/OSL distributions, prefix cache hit rates
+Auto-detects the format:
+- Conflux JSON (file or directory of files): conversation structure, token distributions, timing
+- JSONL traces (Mooncake/Bailian): ISL/OSL distributions, prefix cache hit rates
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Auto-detects the format: - Conflux JSON (file or directory of files): conversation structure, token distributions, timing - JSONL traces (Mooncake/Bailian): ISL/OSL distributions, prefix cache hit rates
Auto-detects the format:
- Conflux JSON (file or directory of files): conversation structure, token distributions, timing
- JSONL traces (Mooncake/Bailian): ISL/OSL distributions, prefix cache hit rates
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/cli-options.md` around lines 59 - 60, Replace the single inline
description "Auto-detects the format: - Conflux JSON (file or directory of
files): conversation structure, token distributions, timing - JSONL traces
(Mooncake/Bailian): ISL/OSL distributions, prefix cache hit rates" with proper
markdown bullets: keep the lead "Auto-detects the format:" followed by two
top-level bullet items for "Conflux JSON" and "JSONL traces" and use sub-bullets
(or parenthetical text) to show the specifics ("file or directory of files:
conversation structure, token distributions, timing" for Conflux JSON;
"Mooncake/Bailian: ISL/OSL distributions, prefix cache hit rates" for JSONL
traces) so each detail appears on its own line and is easy to read.

Comment on lines +198 to +202
```text
Parent turn N: Agent tool_use ──► child runs
tool_result: "child output here"
Parent turn N+1: JOIN — reads child result, continues
```
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Replace ASCII timeline diagrams with Mermaid.

These timeline blocks are currently ASCII art in text fences. Convert them to Mermaid to align with repo docs standards.

As per coding guidelines **/*.md: "Use mermaid diagrams instead of ASCII art in markdown files".

Also applies to: 229-255

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/tutorials/conflux-trace.md` around lines 198 - 202, Replace the ASCII
art code fences with Mermaid diagrams: locate the text fences containing the
lines beginning with "Parent turn N:   Agent tool_use  ──► child runs" and the
following "tool_result: 'child output here'" (and the similar block at lines
~229-255) and convert each to a mermaid sequenceDiagram that declares
participants (e.g., Parent, Child), then represents the "tool_use" call as a
message from Parent to Child, the child's run as Child->>Parent with the
"tool_result" payload, and the "JOIN — reads child result, continues" as a
subsequent message or note; ensure the code fence language is changed to
"mermaid" and the diagram conveys the same participants and message ordering as
the original ASCII timeline.

Comment on lines +392 to +396
if path.is_dir():
json_files = sorted(path.glob("*.json"))
if not json_files:
return None
path = json_files[0]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Keep auto-detection and request counting on the same dataset semantics.

Lines 392-408 now accept directories and regular .json inputs, but the fallback in validate_timing_mode() still uses _count_dataset_entries(), which just counts non-empty lines on self.input.file. That returns 0 for directory inputs and bogus counts for JSON arrays, so fixed-schedule runs can start with the wrong request_count or terminate immediately.

Also applies to: 398-408

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/common/config/user_config.py` around lines 392 - 396,
validate_timing_mode() currently falls back to _count_dataset_entries() which
reads self.input.file and thus returns 0 or wrong counts for directory inputs
and JSON arrays; update validate_timing_mode() to resolve directory inputs the
same way as the earlier block (select the first .json file when path.is_dir())
and pass that resolved file path into the counting logic instead of relying on
self.input.file, and extend _count_dataset_entries() (or add a helper) to detect
JSON array files and count array elements (and keep counting newline-delimited
JSON as before) so request_count reflects the actual dataset entries.

Comment on lines +398 to +402
if path.suffix == ".json":
probe = f.read(_PROBE_BYTES)
try:
data = orjson.loads(probe)
except JSONDecodeError:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The 1 MB .json probe will disable auto-detection on large files.

Line 401 parses only the first megabyte of a .json file. Unless the entire document fits in that buffer, orjson.loads() is guaranteed to raise JSONDecodeError, which means _should_auto_enable_fixed_schedule() silently falls back to False for large trace datasets. Use a streaming parser or read the full JSON document here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/common/config/user_config.py` around lines 398 - 402, The
probe-only read for JSON in _should_auto_enable_fixed_schedule() uses
orjson.loads() on a 1MB slice (_PROBE_BYTES) which will fail for large JSON
files; change the logic so that when path.suffix == ".json" you read the entire
file contents (not just the probe) before calling orjson.loads(), or replace
this with a streaming/iterative JSON parser if memory is a concern; specifically
update the block that currently reads probe = f.read(_PROBE_BYTES) and calls
orjson.loads(probe) to instead obtain the full JSON string (or use a streaming
parser) so orjson.loads() receives a complete document.

Comment on lines +406 to +407
except Exception:
return False
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid blind except Exception in file probe.

This swallows unrelated logic errors during format detection. Restrict to expected I/O/parse failures so real bugs still surface.

Proposed fix
-        except Exception:
+        except OSError:
             return False
🧰 Tools
🪛 Ruff (0.15.6)

[warning] 406-406: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/loader/conflux.py` around lines 406 - 407, The file probe
block that currently does "except Exception: return False" should be changed to
only catch expected I/O and parse errors; replace it with a narrow except clause
(for example: except (OSError, IOError, UnicodeDecodeError, ValueError,
json.JSONDecodeError) as e: return False) and let any other exceptions propagate
(or re-raise them) so real bugs surface; update the exception handling in the
probe function/block that contains "except Exception: return False" accordingly.


# Should have 10 sessions × 3 turns = 30 requests
assert len(result.jsonl) == 30
# Should have 6 sessions × 3 turns = 18 requests
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Replace ambiguous unicode multiplication sign in comment.

Line 220 uses ×, which is flagged by lint and can cause copy/paste/search inconsistencies. Prefer plain x.

Suggested patch
-        # Should have 6 sessions × 3 turns = 18 requests
+        # Should have 6 sessions x 3 turns = 18 requests
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Should have 6 sessions × 3 turns = 18 requests
# Should have 6 sessions x 3 turns = 18 requests
🧰 Tools
🪛 Ruff (0.15.6)

[warning] 220-220: Comment contains ambiguous × (MULTIPLICATION SIGN). Did you mean x (LATIN SMALL LETTER X)?

(RUF003)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/component_integration/dataset/test_dataset_sampling.py` at line 220,
Replace the ambiguous Unicode multiplication sign in the comment "Should have 6
sessions × 3 turns = 18 requests" with a plain ASCII 'x' so it reads "Should
have 6 sessions x 3 turns = 18 requests"; update that comment in
tests/component_integration/dataset/test_dataset_sampling.py (the comment string
itself) to avoid lint/search/copy-paste issues.

)
# 15 sessions × 3 turns = 45 requests
assert result.request_count == 45
# 10 sessions × 2 turns = 20 requests
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Replace ambiguous multiplication symbol in comments to satisfy Ruff (RUF003).

Line 74 and Line 104 use ×, which triggers lint warnings; replace with plain x.

Suggested fix
-        # 10 sessions × 2 turns = 20 requests
+        # 10 sessions x 2 turns = 20 requests
...
-        # 6 sessions × 3 turns = 18 requests
+        # 6 sessions x 3 turns = 18 requests

Also applies to: 104-104

🧰 Tools
🪛 Ruff (0.15.6)

[warning] 74-74: Comment contains ambiguous × (MULTIPLICATION SIGN). Did you mean x (LATIN SMALL LETTER X)?

(RUF003)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/component_integration/sessions/test_sticky_routing.py` at line 74, In
the test_sticky_routing.py comments that use the Unicode multiplication sign '×'
(e.g., the comment "10 sessions × 2 turns = 20 requests" and the other similar
comment), replace '×' with the ASCII letter 'x' so the comments read "10
sessions x 2 turns = 20 requests" (and the analogous one) to satisfy Ruff
RUF003; update the comment strings in that file where they appear.

Comment on lines +141 to 145
num_sessions=8,
qps=0,
turns_per_session=4,
concurrency=10,
turns_per_session=3,
concurrency=8,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Update the hardcoded request/error totals after shrinking the scenario.

With the new values on Lines 141-144, this test now drives 24 requests, but the assertions below still expect 40 total credits and a 9/31 timeout split. The test will fail as-is; derive the totals from config.expected_requests and recompute or relax the seeded timeout count.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/component_integration/timing/test_advanced_scenarios.py` around lines
141 - 145, The assertions for total credits and timeout counts are still using
hardcoded values after reducing the scenario; change the test to compute
expected totals from the scenario config (use config.expected_requests or derive
from num_sessions * turns_per_session) and update the assertions accordingly:
compute total_requests = config.expected_requests (or num_sessions *
turns_per_session), set expected_credits = total_requests (or whatever formula
the test uses), and recompute the seeded timeout count (or relax it) instead of
asserting the old 9/31 split so the assertions reflect the new 24-request
scenario; update references in the test that currently use the hardcoded 40 and
9/31 values.

result = cli.run_sync(cmd, timeout=40.0)

assert result.request_count == 80 # 20 × 4
assert result.request_count == 30 # 10 × 3
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Replace the multiplication sign in the inline comment.

Ruff flags × here as ambiguous text (RUF003), so this otherwise harmless change will still fail lint.

🧰 Tools
🪛 Ruff (0.15.6)

[warning] 154-154: Comment contains ambiguous × (MULTIPLICATION SIGN). Did you mean x (LATIN SMALL LETTER X)?

(RUF003)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/component_integration/timing/test_session_turn_delays.py` at line 154,
Replace the non-ASCII multiplication character in the inline comment on the
assertion (the line with assert result.request_count == 30) to avoid RUF003;
change "10 × 3" to a plain ASCII form such as "10 x 3" or "10 * 3" so the
comment uses only ASCII characters.

def test_empty_directory_raises(self, tmp_path, default_user_config):
"""Loading from an empty directory raises FileNotFoundError."""
loader = ConfluxLoader(filename=str(tmp_path), user_config=default_user_config)
with pytest.raises(FileNotFoundError, match="No .json files found"):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Escape regex metacharacter in match pattern.

The . in "No .json files found" is a regex metacharacter that matches any character. Use a raw string with escaped dot for precise matching.

🔧 Proposed fix
-        with pytest.raises(FileNotFoundError, match="No .json files found"):
+        with pytest.raises(FileNotFoundError, match=r"No \.json files found"):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
with pytest.raises(FileNotFoundError, match="No .json files found"):
with pytest.raises(FileNotFoundError, match=r"No \.json files found"):
🧰 Tools
🪛 Ruff (0.15.6)

[warning] 2420-2420: Pattern passed to match= contains metacharacters but is neither escaped nor raw

(RUF043)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/loader/test_conflux.py` at line 2420, The test's
pytest.raises assertion uses an unescaped "." in the match string which acts as
a regex wildcard; update the match argument to a raw string with an escaped dot
(e.g., r"No \.json files found") so the assertion matches the literal ".json"
text in the error message (locate the pytest.raises(...) call in the
test_conflux unit test).

Comment on lines +48 to +49
if system is not None:
msgs.insert(0, {"role": "system", "content": system})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be a stupid question but why do we need this default system prompt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a mistake. good catch.

Comment on lines +488 to +490
else:
# is_subagent=None: un-enriched record, classify heuristically
unclassified_ids.append(agent_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this will ever be a case. I could be wrong though if you have seen this in conflux traces.

Comment on lines +181 to +186
result_text = _stringify_block_content(block.get("content", ""))
if (
"queued for running" in result_text
or "Async agent launched" in result_text
):
saw_queued_result = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this deterministic? How can you be sure either of these are in the content block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it sort of is for claude code, but not really guaranteed. I do wish there was a simpler more robust way to do it.

spawn_turn_to_children: dict[
int, list[tuple[str, list[ConfluxRecord], Conversation]]
] = {}
for child_agent_id in child_ids:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach is cool. With slower servers though, these spawn points can disobey the actual timestamps by quite a bit. It feels like a tradeoff between testing workload shape and testing the behavior of an agent workload.

I like it though because the workload shape is pretty well covered by mooncake, but real agent behavior and content is not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. It does have an interesting future ability to expand and shrink the turn gaps between agents and primary while preserving the ordering.

In the future I envision being able to support a mode of loading multiple of these traces and running them using custom timing strategies to either be realistic, or saturate concurrency load, etc.

Another option we could do is make the timestamps relative instead of absolute, such that if there is a 2s gap between requests, the slow server will still wait 2s, even if the expected time has elapsed.

Comment on lines +414 to +415
include_orphans = self.user_config.input.conflux_include_utility_calls

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does conflux really proxy these orphaned calls?

If so I don't think we should add any support for them, doesn't seem necessary for our goal of replaying the agent behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it does. However I should have updated the name of them. They were labelled as "orphan" at first because they didn't have a spawn parent, but in reality they are utility calls to haiku. @bk-nvidia had mentioned that because they are a different model, they would most likely be routed to a separate server in production, which is why I default it to not include but give the options.

Comment on lines +160 to +168
if provider is None:
provider = _detect_provider(messages, tools)

# Strip Conflux metadata
messages = [{k: v for k, v in m.items() if k != "tokens"} for m in messages]

if provider == "anthropic":
messages = _normalize_anthropic_messages(messages)
tools = _normalize_anthropic_tools(tools) if tools else tools
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about normalizing responses API schema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. I really wish there was a great out of the box library to do this though. I need to generate some codex traces to play around with.

Comment on lines +966 to +967
output_tokens=tokens.output if tokens else None,
output_reasoning_tokens=tokens.output_reasoning if tokens else None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be using the output tokens from the trace as the max output length for the replay instead of max output length from the hyperparameters.

max tokens = tokens.output + tokens.output_reasoning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. thats pretty important.

Copy link
Contributor

@lvogel04 lvogel04 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is great work and I like the overall approach but I think there is a lot of wasted space in the conflux trace that makes the trace format more complex than it needs to be. Some of the info is just helpful metadata, some is necessary to replay and I think we should only support a conflux trace that is specific to replay.

This would include:

agent_id
is_subagent
timestamp
duration_ms
completed_at
provider (maybe for normalization but probably not)
messages
tools
subset of hyperparameters
base64 (maybe)

This would be much cleaner. I can work with ben on getting this supported.

Replace TurnThreadingMode with ConversationContextMode from origin/main.
Conflux loader now uses MESSAGE_ARRAY_WITH_RESPONSES (was ISOLATED_TURNS).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (3)
docs/cli-options.md (1)

59-60: ⚠️ Potential issue | 🟡 Minor

Split the auto-detection details into proper bullet lines.
Line 59 is currently a single inline sentence with embedded bullets, which is hard to read and renders inconsistently.

Proposed fix
-Auto-detects the format: - Conflux JSON (file or directory of files): conversation structure, token distributions, timing - JSONL traces (Mooncake/Bailian): ISL/OSL distributions, prefix cache hit rates
+Auto-detects the format:
+- Conflux JSON (file or directory of files): conversation structure, token distributions, timing
+- JSONL traces (Mooncake/Bailian): ISL/OSL distributions, prefix cache hit rates
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/cli-options.md` around lines 59 - 60, The single inline sentence
"Auto-detects the format: - Conflux JSON (file or directory of files):
conversation structure, token distributions, timing - JSONL traces
(Mooncake/Bailian): ISL/OSL distributions, prefix cache hit rates" should be
split into separate readable bullet lines: one bullet for Conflux JSON with its
details and one bullet for JSONL traces with its details; update the
docs/cli-options.md sentence to replace the embedded hyphenated list with two
proper markdown bullets so each format and its description renders clearly.
src/aiperf/dataset/loader/conflux.py (1)

406-407: ⚠️ Potential issue | 🟡 Minor

Narrow the probe exception handler to expected failures only.
Line 406 catches all exceptions and can silently mask real bugs in format detection.

Proposed fix
-        except Exception:
+        except OSError:
             return False

Based on learnings: In the aiperf codebase, prefer fail-fast at the source of logic errors rather than adding downstream defensive guards that mask bugs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/loader/conflux.py` around lines 406 - 407, The try/except
in conflux.py that currently does "except Exception: return False" around the
format-detection logic is too broad and can hide real bugs; update that block to
only catch the specific, expected errors (e.g., ValueError, KeyError, TypeError,
json.JSONDecodeError, UnicodeDecodeError as appropriate for the
parsing/validation code used) and return False for those cases, but re-raise any
other exceptions so logic errors surface; keep the same return semantics for
expected parse/validation failures and add no-op or minimal context logging if
needed.
tests/unit/dataset/loader/test_conflux.py (1)

2420-2420: ⚠️ Potential issue | 🟡 Minor

Escape the dot in pytest.raises(..., match=...) to avoid wildcard matching.

"No .json files found" treats . as “any character”; this can produce false-positive matches.

Proposed fix
-        with pytest.raises(FileNotFoundError, match="No .json files found"):
+        with pytest.raises(FileNotFoundError, match=r"No \.json files found"):
#!/bin/bash
rg -n 'pytest\.raises\(FileNotFoundError,\s*match=' tests/unit/dataset/loader/test_conflux.py
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/loader/test_conflux.py` at line 2420, The test uses
pytest.raises(FileNotFoundError, match="No .json files found") where the dot is
unescaped and acts as a regex wildcard; update the match argument to escape the
dot (e.g. use a raw string r"No \.json files found" or escape it as "No \\.json
files found") so the assertion matches the literal ".json" extension; locate the
pytest.raises call in the test (the invocation containing match="No .json files
found") and replace the string accordingly.
🧹 Nitpick comments (2)
tests/unit/dataset/loader/test_conflux.py (2)

274-304: Normalize test method names to the required test_<function>_<scenario>_<expected> format.

A large subset of tests in this module uses shortened names (test_create, test_defaults, etc.), which doesn’t match the required test naming contract.

As per coding guidelines, "Test naming convention: test_<function>_<scenario>_<expected> (e.g., test_parse_config_missing_field_raises_error)".

Also applies to: 327-417, 476-2489

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/loader/test_conflux.py` around lines 274 - 304, The test
methods in this file (e.g., test_create and test_defaults) do not follow the
required naming convention; rename each to the pattern
test_<function>_<scenario>_<expected> (for example,
test_confluxrecord_create_successful or test_confluxrecord_defaults_are_none)
and apply the same renaming for other short names in this module (lines
referenced in the review: blocks around test_unified_fields_roundtrip and the
ranges 327-417, 476-2489) so their names describe the function under test, the
scenario, and the expected outcome; ensure references to these tests in any test
discovery code or fixtures are updated accordingly (look for ConfluxRecord
usages and any direct calls to test_create/test_defaults to update).

379-379: Move function-scoped imports to module top level.

These imports are currently inside test functions; please hoist them to the file header to match test style requirements. Imports like Conversation, Turn, and conflux_mod should be at the module level alongside existing imports.

Also applies to: 391, 405, 1548, 1581, 1603, 2213, 2230

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/loader/test_conflux.py` at line 379, Several tests import
symbols inside test functions; hoist those function-scoped imports to the module
top-level by moving imports like Conversation, Turn and the module import
(conflux_mod) out of tests and placing them with the existing imports at the top
of the file; update any references in functions to use the now-top-level
Conversation, Turn, and conflux_mod names and remove the in-function import
statements (apply same change for occurrences around the other noted locations:
the blocks that import Conversation, Turn, and conflux_mod).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/aiperf/common/models/dataset_models.py`:
- Around line 106-121: The TurnPrerequisite model allows spawn_id to be None
even when kind == PrerequisiteKind.SPAWN_JOIN; add a Pydantic validator (e.g., a
`@root_validator` or `@validator` on "spawn_id") in class TurnPrerequisite to raise
a ValueError if self.kind (or the validated values dict) equals
PrerequisiteKind.SPAWN_JOIN and spawn_id is None or empty, referencing the
TurnPrerequisite class, the kind field, spawn_id field, and the
PrerequisiteKind.SPAWN_JOIN enum to locate where to add the check.

In `@src/aiperf/dataset/loader/conflux.py`:
- Around line 818-824: When parent_id is None the code currently skips
converting orphan groups, which drops files that contain only utility calls;
update the branch that handles parent_id is None to still iterate threaded_data
and append converted conversations (call self._build_conversation(agent_id,
records, is_child=False) for each agent_id) when the conflux
include-utility-calls option is enabled (the CLI flag
--conflux-include-utility-calls or the corresponding attribute used in this
module), ensuring conversations list is populated with orphan utility-call
groups instead of returning an empty dataset.

In `@tests/unit/dataset/loader/test_conflux.py`:
- Around line 104-108: The functions and fixtures in this test file lack
parameter and return type annotations; update each function (e.g.,
_build_session_file, _build_team_session, _load_conversations and all fixtures
referenced between lines ~274–2465) to include explicit types for every
parameter and the return type—use Path for tmp_path (from pathlib), and typing
types such as list[dict[str, Any]] or List[Dict[str, Any]] for records and
Any/Dict/List for other complex values, importing Any, Dict, List from typing as
needed; ensure all test fixtures and helper functions have complete annotations
(parameters and -> return types) to satisfy the repo-wide typing requirement.

---

Duplicate comments:
In `@docs/cli-options.md`:
- Around line 59-60: The single inline sentence "Auto-detects the format: -
Conflux JSON (file or directory of files): conversation structure, token
distributions, timing - JSONL traces (Mooncake/Bailian): ISL/OSL distributions,
prefix cache hit rates" should be split into separate readable bullet lines: one
bullet for Conflux JSON with its details and one bullet for JSONL traces with
its details; update the docs/cli-options.md sentence to replace the embedded
hyphenated list with two proper markdown bullets so each format and its
description renders clearly.

In `@src/aiperf/dataset/loader/conflux.py`:
- Around line 406-407: The try/except in conflux.py that currently does "except
Exception: return False" around the format-detection logic is too broad and can
hide real bugs; update that block to only catch the specific, expected errors
(e.g., ValueError, KeyError, TypeError, json.JSONDecodeError, UnicodeDecodeError
as appropriate for the parsing/validation code used) and return False for those
cases, but re-raise any other exceptions so logic errors surface; keep the same
return semantics for expected parse/validation failures and add no-op or minimal
context logging if needed.

In `@tests/unit/dataset/loader/test_conflux.py`:
- Line 2420: The test uses pytest.raises(FileNotFoundError, match="No .json
files found") where the dot is unescaped and acts as a regex wildcard; update
the match argument to escape the dot (e.g. use a raw string r"No \.json files
found" or escape it as "No \\.json files found") so the assertion matches the
literal ".json" extension; locate the pytest.raises call in the test (the
invocation containing match="No .json files found") and replace the string
accordingly.

---

Nitpick comments:
In `@tests/unit/dataset/loader/test_conflux.py`:
- Around line 274-304: The test methods in this file (e.g., test_create and
test_defaults) do not follow the required naming convention; rename each to the
pattern test_<function>_<scenario>_<expected> (for example,
test_confluxrecord_create_successful or test_confluxrecord_defaults_are_none)
and apply the same renaming for other short names in this module (lines
referenced in the review: blocks around test_unified_fields_roundtrip and the
ranges 327-417, 476-2489) so their names describe the function under test, the
scenario, and the expected outcome; ensure references to these tests in any test
discovery code or fixtures are updated accordingly (look for ConfluxRecord
usages and any direct calls to test_create/test_defaults to update).
- Line 379: Several tests import symbols inside test functions; hoist those
function-scoped imports to the module top-level by moving imports like
Conversation, Turn and the module import (conflux_mod) out of tests and placing
them with the existing imports at the top of the file; update any references in
functions to use the now-top-level Conversation, Turn, and conflux_mod names and
remove the in-function import statements (apply same change for occurrences
around the other noted locations: the blocks that import Conversation, Turn, and
conflux_mod).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: d59f3d8f-1dcd-4e56-977e-00c5c49bf88a

📥 Commits

Reviewing files that changed from the base of the PR and between 22b0093 and 65ef714.

📒 Files selected for processing (11)
  • README.md
  • docs/cli-options.md
  • src/aiperf/common/enums/__init__.py
  • src/aiperf/common/enums/enums.py
  • src/aiperf/common/models/dataset_models.py
  • src/aiperf/dataset/composer/custom.py
  • src/aiperf/dataset/dataset_manager.py
  • src/aiperf/dataset/loader/conflux.py
  • src/aiperf/workers/inference_client.py
  • tests/unit/dataset/loader/test_conflux.py
  • tests/unit/workers/test_session_manager.py
🚧 Files skipped from review as they are similar to previous changes (3)
  • tests/unit/workers/test_session_manager.py
  • src/aiperf/dataset/composer/custom.py
  • src/aiperf/dataset/dataset_manager.py

Comment on lines +106 to +121
class TurnPrerequisite(AIPerfBaseModel):
"""A condition that must be satisfied before a turn dispatches.

Used by the SubagentOrchestrator to gate turn dispatch on prerequisite
completion. Currently supports 'spawn_join' (all blocking children from
a spawn must complete). Extensible to other gate types.
"""

kind: PrerequisiteKind = Field(
description="Prerequisite type.",
)
spawn_id: str | None = Field(
default=None,
description="For spawn_join: which spawn's children must complete.",
)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Enforce spawn_id when prerequisite kind is SPAWN_JOIN.
Right now, Line 117 allows spawn_id=None even when Line 114 is SPAWN_JOIN, which can produce invalid gating prerequisites.

Proposed fix
-from pydantic import Field, field_validator
+from pydantic import Field, field_validator, model_validator
@@
 class TurnPrerequisite(AIPerfBaseModel):
@@
     spawn_id: str | None = Field(
         default=None,
         description="For spawn_join: which spawn's children must complete.",
     )
+
+    `@model_validator`(mode="after")
+    def _validate_spawn_join(self) -> "TurnPrerequisite":
+        if self.kind == PrerequisiteKind.SPAWN_JOIN and not self.spawn_id:
+            raise ValueError("spawn_id is required when kind=spawn_join")
+        return self
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class TurnPrerequisite(AIPerfBaseModel):
"""A condition that must be satisfied before a turn dispatches.
Used by the SubagentOrchestrator to gate turn dispatch on prerequisite
completion. Currently supports 'spawn_join' (all blocking children from
a spawn must complete). Extensible to other gate types.
"""
kind: PrerequisiteKind = Field(
description="Prerequisite type.",
)
spawn_id: str | None = Field(
default=None,
description="For spawn_join: which spawn's children must complete.",
)
class TurnPrerequisite(AIPerfBaseModel):
"""A condition that must be satisfied before a turn dispatches.
Used by the SubagentOrchestrator to gate turn dispatch on prerequisite
completion. Currently supports 'spawn_join' (all blocking children from
a spawn must complete). Extensible to other gate types.
"""
kind: PrerequisiteKind = Field(
description="Prerequisite type.",
)
spawn_id: str | None = Field(
default=None,
description="For spawn_join: which spawn's children must complete.",
)
`@model_validator`(mode="after")
def _validate_spawn_join(self) -> "TurnPrerequisite":
if self.kind == PrerequisiteKind.SPAWN_JOIN and not self.spawn_id:
raise ValueError("spawn_id is required when kind=spawn_join")
return self
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/common/models/dataset_models.py` around lines 106 - 121, The
TurnPrerequisite model allows spawn_id to be None even when kind ==
PrerequisiteKind.SPAWN_JOIN; add a Pydantic validator (e.g., a `@root_validator`
or `@validator` on "spawn_id") in class TurnPrerequisite to raise a ValueError if
self.kind (or the validated values dict) equals PrerequisiteKind.SPAWN_JOIN and
spawn_id is None or empty, referencing the TurnPrerequisite class, the kind
field, spawn_id field, and the PrerequisiteKind.SPAWN_JOIN enum to locate where
to add the check.

Comment on lines +818 to +824
else:
blocking_count = 0
background_count = 0
for agent_id, records in threaded_data.items():
conversations.append(
self._build_conversation(agent_id, records, is_child=False)
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve included orphan utility calls when no parent thread is found.
In the parent_id is None branch, orphan groups are not converted at all. If a file contains only utility calls (and --conflux-include-utility-calls is enabled), this returns an empty dataset.

Proposed fix
         else:
             blocking_count = 0
             background_count = 0
             for agent_id, records in threaded_data.items():
                 conversations.append(
                     self._build_conversation(agent_id, records, is_child=False)
                 )
+            for orphan_id, orphan_records in orphan_data.items():
+                conversations.append(
+                    self._build_conversation(orphan_id, orphan_records, is_child=False)
+                )
+                background_count += 1
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/dataset/loader/conflux.py` around lines 818 - 824, When parent_id
is None the code currently skips converting orphan groups, which drops files
that contain only utility calls; update the branch that handles parent_id is
None to still iterate threaded_data and append converted conversations (call
self._build_conversation(agent_id, records, is_child=False) for each agent_id)
when the conflux include-utility-calls option is enabled (the CLI flag
--conflux-include-utility-calls or the corresponding attribute used in this
module), ensuring conversations list is populated with orphan utility-call
groups instead of returning an empty dataset.

Comment on lines +104 to +108
def _build_session_file(tmp_path, records: list[dict]) -> str:
"""Write records to a JSON file and return the path."""
path = tmp_path / "session.json"
path.write_bytes(orjson.dumps(records))
return str(path)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
python - <<'PY'
import ast
from pathlib import Path

path = Path("tests/unit/dataset/loader/test_conflux.py")
tree = ast.parse(path.read_text())

def iter_args(fn):
    args = []
    args.extend(fn.args.posonlyargs)
    args.extend(fn.args.args)
    args.extend(fn.args.kwonlyargs)
    if fn.args.vararg:
        args.append(fn.args.vararg)
    if fn.args.kwarg:
        args.append(fn.args.kwarg)
    return args

for node in ast.walk(tree):
    if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
        missing_params = []
        for a in iter_args(node):
            if a.arg in {"self", "cls"}:
                continue
            if a.annotation is None:
                missing_params.append(a.arg)
        missing_return = node.returns is None
        if missing_params or missing_return:
            print(
                f"{node.lineno}: {node.name} | "
                f"missing_params={missing_params} missing_return={missing_return}"
            )
PY

Repository: ai-dynamo/aiperf

Length of output: 10402


Add type annotations to all function parameters and return types.

Multiple functions and fixtures throughout the file are missing parameter and/or return type annotations, violating the repo-wide typing requirement. This includes helper functions like _build_session_file (line 104), _build_team_session (line 111), _load_conversations (line 1107), and test fixtures/methods across lines 274–2465.

Example:

def _build_session_file(tmp_path, records: list[dict]) -> str:  # Missing type for tmp_path

Should be:

def _build_session_file(tmp_path: Path, records: list[dict[str, object]]) -> str:

Per coding guidelines: "Include type hints on ALL functions (parameters and return types)."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/loader/test_conflux.py` around lines 104 - 108, The
functions and fixtures in this test file lack parameter and return type
annotations; update each function (e.g., _build_session_file,
_build_team_session, _load_conversations and all fixtures referenced between
lines ~274–2465) to include explicit types for every parameter and the return
type—use Path for tmp_path (from pathlib), and typing types such as
list[dict[str, Any]] or List[Dict[str, Any]] for records and Any/Dict/List for
other complex values, importing Any, Dict, List from typing as needed; ensure
all test fixtures and helper functions have complete annotations (parameters and
-> return types) to satisfy the repo-wide typing requirement.

…q key

Extract shared helpers to eliminate duplication: _register_spawn for 4x
copy-pasted spawn wiring, _decode_request_payload for base64 decode,
_parse_timestamp_s shared between loader and analyzer. Use model_fields
instead of hardcoded field list in _extract_extra_params. Fix latent bug
where prereq_key used a hardcoded string instead of PrerequisiteKind enum.
Clear LRU cache after load; skip dict copy when "tokens" key absent.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants