Skip to content

feat: add Conflux proxy capture dataset loader for verbatim session r…#772

Open
ajcasagrande wants to merge 4 commits intomainfrom
ajc/fconflux-simple
Open

feat: add Conflux proxy capture dataset loader for verbatim session r…#772
ajcasagrande wants to merge 4 commits intomainfrom
ajc/fconflux-simple

Conversation

@ajcasagrande
Copy link
Contributor

@ajcasagrande ajcasagrande commented Mar 19, 2026

…eplay

Introduces ConfluxLoader for replaying Claude Code and Codex sessions captured by Conflux proxy. Supports agent_id grouping, timestamp-based delays for fixed-schedule replay, per-turn extra_params/hyperparameters, and optional utility call inclusion. Refactors ChatEndpoint payload construction to apply extra_params before endpoint-level overrides.

Summary by CodeRabbit

  • New Features

    • Support for Conflux proxy-capture datasets with agent grouping, timestamp-based replay, and optional inclusion of utility calls
    • New fixed-schedule speedup option to scale replay timing
    • Per-turn metadata (input tokens & extra params) preserved into requests and honored when building payloads
  • Documentation

    • CLI docs updated for Conflux dataset type and fixed-schedule speedup flags
  • Tests

    • Extensive unit and integration tests covering Conflux loading, conversion, replay, and payload behavior

…eplay

Introduces ConfluxLoader for replaying Claude Code and Codex sessions
captured by Conflux proxy. Supports agent_id grouping, timestamp-based
delays for fixed-schedule replay, per-turn extra_params/hyperparameters,
and optional utility call inclusion. Refactors ChatEndpoint payload
construction to apply extra_params before endpoint-level overrides.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
@ajcasagrande ajcasagrande requested a review from lvogel04 March 19, 2026 18:43
@github-actions
Copy link

github-actions bot commented Mar 19, 2026

Try out this PR

Quick install:

pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@fb02230d3af266abd8252f5257192a9e8bd294c2

Recommended with virtual environment (using uv):

uv venv --python 3.12 && source .venv/bin/activate
uv pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@fb02230d3af266abd8252f5257192a9e8bd294c2

Last updated for commit: fb02230Browse code

@github-actions github-actions bot added the feat label Mar 19, 2026
@codecov
Copy link

codecov bot commented Mar 19, 2026

Codecov Report

❌ Patch coverage is 98.12500% with 3 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/aiperf/common/config/input_config.py 71.42% 1 Missing and 1 partial ⚠️
src/aiperf/endpoints/openai_chat.py 94.44% 0 Missing and 1 partial ⚠️

📢 Thoughts on this report? Let us know!

@coderabbitai
Copy link

coderabbitai bot commented Mar 19, 2026

Walkthrough

Adds full Conflux proxy-capture support (loader, models, plugin), introduces fixed-schedule speedup and Conflux CLI flags, propagates per-turn extra params and token metadata into payloads, updates fixed-schedule timing scaling, and adds extensive unit and integration tests.

Changes

Cohort / File(s) Summary
Documentation & CLI
docs/cli-options.md
Documented --fixed-schedule-speedup <float>, added conflux to --custom-dataset-type choices, and introduced --conflux-include-utility-calls.
Input Configuration
src/aiperf/common/config/input_config.py
Added `fixed_schedule_speedup: float
Core Data Models
src/aiperf/common/models/dataset_models.py
Extended Turn with input_tokens and extra_params, propagated by copy_with_stripped_media().
Conflux Loader & Models
src/aiperf/dataset/loader/conflux.py, src/aiperf/dataset/loader/models.py, src/aiperf/dataset/loader/__init__.py
Added ConfluxRecord/ConfluxTokens Pydantic models and implemented ConfluxLoader: can auto-detect single-file or directory JSON, group by agent_id, optional utility-call grouping, timestamp sorting, file-prefix merging, and conversion to Conversation/Turn with token and extra-param extraction.
Composer Auto-detection
src/aiperf/dataset/composer/custom.py
Switched first-line JSON parsing to orjson.loads, added robust fallback on parse/type errors to filename-only inference.
Timing & Fixed Schedule
src/aiperf/timing/config.py, src/aiperf/timing/strategies/fixed_schedule.py
Added fixed_schedule_speedup field to CreditPhaseConfig, propagated from input config; applied reciprocal time scaling in fixed-schedule strategy and updated logging.
Endpoint Payloads
src/aiperf/endpoints/openai_chat.py
Refactored ChatEndpoint.format_payload() to merge per-turn extra_params earlier, integrate endpoint extra, and preserve/merge stream_options.include_usage.
Plugin Registration & Enums
src/aiperf/plugin/enums.py, src/aiperf/plugin/plugins.yaml
Registered custom_dataset_loader.conflux plugin with supports_timing: true; updated plugin metadata for bailian_trace and mooncake_trace; adjusted enum docstring example.
Tests — Unit & Integration
tests/unit/dataset/loader/test_conflux.py, tests/unit/dataset/composer/test_custom_composer.py, tests/unit/endpoints/test_openai_chat_completions.py, tests/integration/test_conflux_loader.py
Added comprehensive unit and integration tests for Conflux models/loader/composer, fixed-schedule speedup replay via CLI, and extra_params merging behavior in payload formatting.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐰 I hopped through JSON, timestamps in tow,
Agents grouped neatly, replay set to go,
Utility calls hidden or shown by a flag,
Extra params hitch a ride in each request tag,
Tests guard the burrow — hop, speedup, and flow!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 38.69% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately describes the main feature: adding a Conflux proxy capture dataset loader for session replay.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/aiperf/dataset/loader/conflux.py`:
- Around line 183-195: The Turn.max_tokens is being derived only from observed
output and _extract_extra_params() strips original hyperparameter caps; update
the loader to preserve captured limits by first checking the record
hyperparameters for keys ("max_completion_tokens", "max_output_tokens",
"max_tokens") and using that value if present, otherwise fall back to the
existing tokens-based total_output logic; implement a helper like
_extract_max_tokens(record: ConfluxRecord) and call it when constructing Turn
(instead of only using record.tokens), and ensure _build_conversation() uses the
preserved Turn.max_tokens rather than replacing it with observed output.
- Around line 55-57: The current probe (used by can_load and _probe_file) only
reads up to _PROBE_BYTES and rejects files if the first array element is
truncated; update cls._probe_file to read progressively (starting with
_PROBE_BYTES) and continue reading additional chunks until the first JSON
object/array element is fully closed (track nesting and string/escape state to
detect a proper close) or until a larger hard cap (e.g., 1MB) is reached, then
pass that complete prefix to the existing parsing check; ensure references to
_PROBE_BYTES remain as the initial chunk size and that can_load uses the revised
_probe_file logic so valid Conflux captures with large first records are
accepted.

In `@tests/unit/endpoints/test_openai_chat_completions.py`:
- Around line 478-493: ChatEndpoint.format_payload is mutating the nested
stream_options dict taken from Turn.extra_params / EndpointInfo.extra, leaking
state; update format_payload to perform a shallow copy of the stream_options
dict (e.g., new_stream_opts = dict(orig_stream_opts) or use copy()) before
adding include_usage so the original Turn.extra_params/EndpointInfo.extra is not
modified, and update the test to assert that turn.extra_params (or its
stream_options entry) remains unchanged after calling format_payload. Ensure
references: ChatEndpoint.format_payload, Turn.extra_params, EndpointInfo.extra,
and payload["stream_options"] are the targets to change.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 8b9b2299-287f-44d3-8980-aafa9c16da3e

📥 Commits

Reviewing files that changed from the base of the PR and between e6919c5 and 997abf8.

📒 Files selected for processing (11)
  • docs/cli-options.md
  • src/aiperf/common/config/input_config.py
  • src/aiperf/common/models/dataset_models.py
  • src/aiperf/dataset/loader/__init__.py
  • src/aiperf/dataset/loader/conflux.py
  • src/aiperf/dataset/loader/models.py
  • src/aiperf/endpoints/openai_chat.py
  • src/aiperf/plugin/enums.py
  • src/aiperf/plugin/plugins.yaml
  • tests/unit/dataset/loader/test_conflux.py
  • tests/unit/endpoints/test_openai_chat_completions.py

Allow scaling fixed schedule timestamps to replay traces faster or
slower (e.g. 2.0 = twice as fast). Includes validation against
conflicting --synthesis-speedup-ratio and bumps Conflux probe buffer
to 1MB for reliable format detection.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Copy link
Contributor

@lvogel04 lvogel04 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm great work, one thing we need to address is conflux unified exports are pretty printed JSON, at least the ones I have exported from the CLI, so without the --custom-dataset-type conflux flag you will hit an error because the first element parsed is [.

This error came from testing my export without the flag:

Error: JSONDecodeError Reason: JSONDecodeError('unexpected end of data: line 1 column 2 (char 1)')

With the flag it works great. Can we add support for this and test coverage?

I would also like to test this more once we have the conflux export supported which I believe Ben is cooking up.

… stream_options

The Conflux probe now loads and validates the full JSON array instead of
parsing a truncated prefix, supporting both pretty-printed and compact
formats. The composer falls through to filename-only detection when the
first line is not valid JSON (e.g. "[" in indented exports) or parses as
a non-dict (e.g. compact arrays). ChatEndpoint.format_payload no longer
mutates nested stream_options from Turn.extra_params across retries.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
@ajcasagrande ajcasagrande requested a review from lvogel04 March 20, 2026 03:47
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
tests/unit/dataset/loader/test_conflux.py (2)

78-78: Consider using normal construction instead of model_construct.

Using model_construct() bypasses Pydantic validation, which may hide issues if InputConfig field requirements change. For test fixtures, consider either providing all required fields or using model_validate() with a minimal but complete dict.

Alternative approach
-        input=InputConfig.model_construct(
-            conflux_include_utility_calls=include_utility,
-        ),
+        input=InputConfig(
+            conflux_include_utility_calls=include_utility,
+        ),

This ensures validation runs and catches any missing required fields.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/loader/test_conflux.py` at line 78, Replace the bypassed
Pydantic construction call InputConfig.model_construct(...) in the test with
validated construction: either call the InputConfig(...) constructor with all
required fields populated, or call InputConfig.model_validate({...}) with a
minimal but complete dict so Pydantic validation runs; update the test fixture
that references InputConfig.model_construct to use InputConfig(...) or
InputConfig.model_validate and ensure all required fields for InputConfig are
provided.

692-799: Consider parametrizing repetitive test cases.

The TestExtractExtraParams class contains many similar test methods that differ primarily in input data and expected output. Parametrization could reduce duplication while maintaining clarity.

Example parametrization approach
`@pytest.mark.parametrize`(
    "hyperparameters,expected",
    [
        (None, None),
        ({}, None),
        ({"temperature": 0.7, "top_p": 0.9}, {"temperature": 0.7, "top_p": 0.9}),
        ({"temperature": 0.5, "max_tokens": 1000}, {"temperature": 0.5}),
        ({"temperature": 0.5, "max_output_tokens": 2000}, {"temperature": 0.5}),
        ({"temperature": 0.7, "top_k": None, "stop": None}, {"temperature": 0.7}),
        ({"max_tokens": 100, "max_output_tokens": 200}, None),
        ({"temperature": None, "top_p": None}, None),
        ({"temperature": 0, "frequency_penalty": 0.0}, {"temperature": 0, "frequency_penalty": 0.0}),
        ({"logprobs": False}, {"logprobs": False}),
        ({"stop": ""}, {"stop": ""}),
        ({"response_format": {"type": "json_object"}}, {"response_format": {"type": "json_object"}}),
    ],
    ids=[
        "no_hyperparameters",
        "empty_hyperparameters",
        "basic_params",
        "max_tokens_filtered",
        "max_output_tokens_filtered",
        "none_values_filtered",
        "all_filtered",
        "all_none",
        "zero_preserved",
        "false_preserved",
        "empty_string_preserved",
        "nested_dict_preserved",
    ],
)
def test_extract_extra_params_scenarios(self, hyperparameters, expected):
    record = ConfluxRecord(
        session_id="s1",
        timestamp=1000.0,
        hyperparameters=hyperparameters,
    )
    assert ConfluxLoader._extract_extra_params(record) == expected

However, the current explicit test structure is also clear and may be preferred for readability.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/dataset/loader/test_conflux.py` around lines 692 - 799, The tests
in TestExtractExtraParams are highly repetitive; refactor by parametrizing the
cases for ConfluxLoader._extract_extra_params: replace the many individual
test_* methods with a single pytest.mark.parametrize test that iterates over
tuples of (hyperparameters, expected) covering the same scenarios (None, {},
basic params, max_tokens filtered, max_output_tokens filtered, none-values
filtered, all filtered, all none, zero preserved, false preserved, empty string
preserved, nested dict preserved) and assert
ConfluxLoader._extract_extra_params(record) == expected for a ConfluxRecord
initialized with session_id="s1" and timestamp=1000.0; keep the same case ids
for readability.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/unit/dataset/loader/test_conflux.py`:
- Line 481: The test assertion uses pytest.raises(..., match="No .json files
found") where the "." is a regex metacharacter; update the match argument to
precisely escape the period (e.g., use a raw string or escape the dot) so the
pattern becomes "No \.json files found" (e.g., r"No \.json files found") in the
pytest.raises call to avoid unintended regex matching and static-analysis
warnings.

---

Nitpick comments:
In `@tests/unit/dataset/loader/test_conflux.py`:
- Line 78: Replace the bypassed Pydantic construction call
InputConfig.model_construct(...) in the test with validated construction: either
call the InputConfig(...) constructor with all required fields populated, or
call InputConfig.model_validate({...}) with a minimal but complete dict so
Pydantic validation runs; update the test fixture that references
InputConfig.model_construct to use InputConfig(...) or
InputConfig.model_validate and ensure all required fields for InputConfig are
provided.
- Around line 692-799: The tests in TestExtractExtraParams are highly
repetitive; refactor by parametrizing the cases for
ConfluxLoader._extract_extra_params: replace the many individual test_* methods
with a single pytest.mark.parametrize test that iterates over tuples of
(hyperparameters, expected) covering the same scenarios (None, {}, basic params,
max_tokens filtered, max_output_tokens filtered, none-values filtered, all
filtered, all none, zero preserved, false preserved, empty string preserved,
nested dict preserved) and assert ConfluxLoader._extract_extra_params(record) ==
expected for a ConfluxRecord initialized with session_id="s1" and
timestamp=1000.0; keep the same case ids for readability.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 9dab99ba-0c20-47b3-9b3d-62f14bf7f742

📥 Commits

Reviewing files that changed from the base of the PR and between 997abf8 and 03dc6ef.

📒 Files selected for processing (11)
  • docs/cli-options.md
  • src/aiperf/common/config/input_config.py
  • src/aiperf/dataset/composer/custom.py
  • src/aiperf/dataset/loader/conflux.py
  • src/aiperf/endpoints/openai_chat.py
  • src/aiperf/timing/config.py
  • src/aiperf/timing/strategies/fixed_schedule.py
  • tests/integration/test_conflux_loader.py
  • tests/unit/dataset/composer/test_custom_composer.py
  • tests/unit/dataset/loader/test_conflux.py
  • tests/unit/endpoints/test_openai_chat_completions.py
✅ Files skipped from review due to trivial changes (1)
  • tests/unit/endpoints/test_openai_chat_completions.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/aiperf/endpoints/openai_chat.py

Copy link
Contributor

@lvogel04 lvogel04 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants