Skip to content

Make Loadgen Async#255

Open
nv-alicheng wants to merge 10 commits intofeat/alicheng-pubsub-integrationfrom
feat/alicheng-lg-asyncify
Open

Make Loadgen Async#255
nv-alicheng wants to merge 10 commits intofeat/alicheng-pubsub-integrationfrom
feat/alicheng-lg-asyncify

Conversation

@nv-alicheng
Copy link
Copy Markdown
Collaborator

What does this PR do?

Overhauls old multi-threaded design of load generator and converts it use async event loops for better compatibility with the PubSub system and the HTTP client.

Type of change

  • Bug fix
  • New feature
  • Documentation update
  • Refactor/cleanup

Related issues

Testing

  • Tests added/updated
  • All tests pass locally
  • Manual testing completed

Checklist

  • Code follows project style
  • Pre-commit hooks pass
  • Documentation updated (if needed)

@nv-alicheng nv-alicheng requested a review from a team as a code owner April 2, 2026 07:03
@github-actions github-actions bot requested review from arekay-nv and nvzhihanj April 2, 2026 07:03
@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 2, 2026

MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a new asynchronous load generator architecture, replacing the legacy threaded system with a phase-based orchestration model. It introduces the BenchmarkSession for sequential phase management, specialized LoadStrategy implementations for various load patterns, and async factory methods for HTTPEndpointClient to avoid deadlocks. The review feedback highlights several critical reliability and efficiency improvements: exception handling must be added to the TimedIssueStrategy and BurstStrategy callbacks to prevent potential hangs, and ConcurrencyStrategy needs a try...finally block to prevent semaphore leaks. Furthermore, the polling mechanism in _drain_inflight should be replaced with an event-driven approach to reduce latency and overhead.

Copy link
Copy Markdown
Collaborator Author

@nv-alicheng nv-alicheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Council — Multi-AI Code Review

Reviewed by: Claude + Codex | Depth: thorough

Found 13 issues across 6 files.

Must Fix (critical/high)

Issues that will cause incorrect behavior, data loss, or hangs.

# File Line Category Reviewer(s) Summary
1 scoring.py 115 data-integrity Codex The async rewrite removed the sample_idx_map.json write path, but Scorer.get_outputs() still reads it. `TestMode.ACC
2 execute.py 426 data-integrity Codex No event logger service is launched, yet Scorer.get_outputs() reads events.jsonl for accuracy scoring. The old path
3 strategy.py 130 concurrency Codex fire() callbacks in TimedIssueStrategy and BurstStrategy call phase_issuer.issue() without exception handling. I
4 session.py 413 bug Claude on_sample_complete is never called for StreamChunk(is_complete=True). When streaming endpoints use terminal StreamCh
5 session.py 379 bug Codex Failed requests (QueryResult.error is not None) are published as normal COMPLETE events. The MetricsAggregator only

Should Fix (medium)

Real issues under specific conditions or design flaws.

# File Line Category Reviewer(s) Summary
6 report.py 178 data-integrity Both n_samples_failed reads total_samples_failed (all phases) while n_samples_issued/completed read tracked_* (perf
7 execute.py 474 data-integrity Codex Report is built from KVStore immediately after session.run() returns, before the MetricsAggregator subprocess processe
8 execute.py 345 data-integrity Both Accuracy phases reuse the same random.Random instances as the perf phase by reference. Accuracy sample ordering depend
9 session.py 334 performance Claude _drain_inflight busy-polls with asyncio.sleep(0.01) adding up to 10ms latency. Use an asyncio.Event set by `_handl
10 execute.py 470 error-handling Claude A second Ctrl+C during loop.run_until_complete raises KeyboardInterrupt that bypasses the finally block in `_run_b
11 execute.py 551 bug Claude If stopped before any perf phase, total_issued=0 and n_errors=len(collector.errors) produces `successful = 0 - n_err

Consider (low)

Valid improvements for follow-ups.

# File Line Category Reviewer(s) Summary
12 execute.py 411 error-handling Claude assert zmq_ctx.socket_dir is not None is disabled with python -O. Use a proper if ... raise RuntimeError instead.
13 test_benchmark_command.py 81 testing Codex Integration tests only exercise TestMode.PERF. No coverage for ACC/BOTH modes, which is how the missing `sample_id

Generated with Claude Code

n_samples_from_dataset=acc_ds.num_samples(),
n_samples_to_issue=acc_ds.num_samples() * acc_ds.repeats,
min_sample_count=acc_ds.num_samples() * acc_ds.repeats,
rng_sched=ctx.rt_settings.rng_sched,
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Both] medium (data-integrity): Accuracy phases reuse the same random.Random instances as the perf phase by reference. Accuracy sample ordering depends on how many RNG draws the perf phase consumed, breaking reproducibility.

rng_sched=ctx.rt_settings.rng_sched,  # shared reference

Create fresh Random instances with derived seeds for each accuracy phase.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude] Design limitation, added documentation — shared RNG is known issue for multi-phase reproducibility.

finally:
# Always restore original handler
signal.signal(signal.SIGINT, old_handler)
loop.add_signal_handler(signal.SIGINT, session.stop)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude] medium (error-handling): A second Ctrl+C during loop.run_until_complete raises KeyboardInterrupt that bypasses the finally block in _run_benchmark_async, leaking ZMQ context, publisher, and HTTP client.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude] Design limitation, added documentation — double SIGINT inherently hard to handle with loop.run_until_complete

Copy link
Copy Markdown
Collaborator Author

@nv-alicheng nv-alicheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Council — Multi-AI Code Review (Round 2)

Reviewed by: Claude + Codex | Depth: thorough

Found 7 new issues across 2 files (previous issues excluded).

Must Fix (high)

# File Line Category Reviewer(s) Summary
1 session.py 436 bug Both on_sample_complete(resp) is called with a StreamChunk for terminal streaming completions, but `ResponseCollector.on_
2 execute.py 413 bug Claude Socket name collision: pub_socket_name = f"ev_pub_{session_id[:8]}" slices to "ev_pub_cli_benc" — the static prefix,
3 session.py 179 data-integrity Codex PhaseIssuer.issue() calls dataset.load_sample() BEFORE recording the ISSUED timestamp. Dataset reads inflate TTFT/la

Should Fix (medium)

# File Line Category Reviewer(s) Summary
4 session.py 241 concurrency Codex stop() sets _stop_requested and cancels the strategy task, but does NOT set _drain_event. If stop is called during
5 execute.py 349 data-integrity Both Accuracy phase name from DATASET_ID/class name may differ from eval_cfg.dataset_name (from config). Scorer looks u
6 execute.py 528 performance Claude launcher.wait_for_exit(timeout=10.0) is a blocking OS call inside an async def. Blocks the event loop for up to 10s

Consider (low)

# File Line Category Reviewer(s) Summary
7 execute.py 430 bug Claude Tmpfs directories under /dev/shm are only cleaned in _write_scoring_artifacts. If benchmark crashes or is interrupte

Generated with Claude Code

> phases, or (b) per-phase metric namespacing (e.g., prefix keys with phase name),
> or (c) the report builder computes deltas by snapshotting before and after each
> phase. This will be addressed in a future change to the `MetricsAggregator`.
> Option (b) is the most-likely planned change as it is the most robust.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

with zero GIL contention and low response latency (0.6–1.4ms). No thread pool overhead.
Degrades above 100k+ QPS where the callback queue saturates.

`run_in_executor(busy_wait)` is available as an opt-in for workloads requiring sub-100μs
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

For `ConcurrencyStrategy`, `_handle_response` calls `strategy.on_query_complete()`
which releases the semaphore. Since `recv()` returns as soon as the fd is readable
and `eager_task_factory` executes the woken semaphore waiter synchronously, there
is no added latency compared to a poll-based approach.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass over the code + doc. Will make one on the tests later tonight.
One general comment - we should pick between sample and query - my preference is "request" since it captures the user aspect, but moving away from query might be useful since it draws too much similarity to databases.

Comment on lines +24 to +26
+-- [perf phase 1] START_TRACKING → strategy.execute() → drain → STOP_TRACKING → snapshot report
+-- [saturation] strategy.execute() → drain
+-- [perf phase 2] START_TRACKING → strategy.execute() → drain → STOP_TRACKING → snapshot report
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

START_PERFORMANCE_TRACKING not START_TRACKING

- ISSUED: `monotonic_ns()` taken immediately before `issuer.issue()`. The ZMQ push is
sync and non-blocking, so this honestly represents when the query entered the transport.
- COMPLETE: `QueryResult.completed_at` is set via `force_setattr(monotonic_ns())` in
`__post_init__`, regenerated on deserialization. Both ISSUED and COMPLETE timestamps
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think TTFT is still sensitive to the zmq overhead, but TPOT would not be -
RECV_FIRST and COMPLETE are both stamped on the main-process side after worker-to-main transit, avoiding cross-process clock skew; TPOT is therefore relatively insensitive to that transport bias, while TTFT still includes the end-to-end handoff to first-token path.

tokenizer = _load_tokenizer(config.model_params.name)
# Tokenizer check (light API call, no download)
model_name = config.model_params.name
tokenizer_name = model_name if _check_tokenizer_exists(model_name) else None
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this work for non-standard tokenizers? Also, we might want to report an error/warning here since having a typo in the model name (or non-standard model name) might cause the tokenizer to be None but only reveal as an error when we do the TPOT calculations.

Comment on lines +343 to +347
phases.append(
PhaseConfig(
"performance", ctx.rt_settings, ctx.dataloader, PhaseType.PERFORMANCE
)
)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment here indicating that this will change to support multiple performance datasets.
If it isn't too much work, you can probably plug in multiple perf datasets since the configs should support it.

Comment on lines +380 to +386
"total_samples_issued",
"total_samples_completed",
"total_samples_failed",
"tracked_samples_issued",
"tracked_samples_completed",
"tracked_duration_ns",
"total_duration_ns",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can be collected in a separate enum or something more type controlled - for instance a key, type (counter/series), data-type, streaming/non-streaming tuple. That will make extending to new metrics much easier.

# Report metrics: prefer Report from KVStore, fall back to SessionResult
if report is not None and report.duration_ns is not None:
perf_elapsed = report.duration_ns / 1e9
total_issued = report.n_samples_issued
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should rename n_samples_issued to perf_samples_issued or tracked_samples_issued to be consistent.

"successful": success,
"failed": report.n_samples_failed,
"elapsed_time": elapsed,
"successful": max(0, total_issued - n_errors),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should't be necessary. Is this because we do not distinguish accuracy/saturation/performance errors?

Comment on lines +52 to +57
class PhaseType(str, Enum):
"""Phase types control tracking and reporting behavior."""

PERFORMANCE = "performance"
ACCURACY = "accuracy"
SATURATION = "saturation"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to rethink this - the only difference between perf/accuracy is whether we track metrics/results - and if the overhead is low enough, we can probably do it for both - the dataset characteristics would define whether it is for performance (uniform, small-ish OSL) or accuracy.
Similarly, saturation requires neither, but only has a missing barrier between the next phase, where performance datasets always have a barrier after it.
We can expose those knobs to the user and the system shouldn't care about what the intent of the phase it.
Just a thought though.

Comment on lines +462 to +473
if self._stop_requested:
return True
if (
self._current_phase_issuer
and self._current_phase_issuer.issued_count >= total_samples
):
return True
if (
max_duration_ns > 0
and (time.monotonic_ns() - phase_start_ns) >= max_duration_ns
):
return True
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When returning true, can we log the reason so it is clear that all samples were issued, or max-duration was reached etc.

class PhaseIssuerProtocol(Protocol):
"""Minimal interface that strategies see for issuing samples."""

def issue(self, sample_index: int) -> str | None: ...
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document when None is expected. If i understand correctl, str is the id of the sample issued.

self.pbar.set_postfix(refresh=True, errors=len(self.errors))
elif self.collect_responses:
self.responses[result.id] = result.get_response_output_string()
# StreamChunk(is_complete=True) — no response text to collect, just count it
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heads up: this can never happen btw (worker will never send streamchunk with is-complete set True)

tokenizer = AutoTokenizer.from_pretrained(model_name)
logger.info("Tokenizer loaded successfully")
return tokenizer
from huggingface_hub import model_info
Copy link
Copy Markdown
Collaborator

@viraatc viraatc Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move imports to top?

phases: list[PhaseConfig] = []

# Performance phase
phases.append(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean benchmark always need to have performance phase?

logger.warning(f"Client cleanup error: {e}")
publisher.close()
await asyncio.to_thread(launcher.wait_for_exit, 10.0)
zmq_ctx.cleanup()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can use zmq ctx scope we already support

"""Create a KVStoreReader pre-registered with all metric keys."""
reader = BasicKVStoreReader(metrics_dir)
# Counter keys (from MetricCounterKey enum)
for key in [
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking if its poissble to auto-populate enum names automatically, so we dont have to maintain a list in setup_kv_reader?

and phase_issuer is not None
and resp.id in phase_issuer.uuid_to_index
):
phase_issuer.inflight -= 1
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repeated block can be made into a named function?

Copy link
Copy Markdown
Collaborator

@viraatc viraatc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great thanks so much!

Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


# Verify sample_idx_map has both phases
with (report_dir / "sample_idx_map.json").open("rb") as f:
import msgspec.json
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move import to top.

e for e in events if e.get("event_type") == "sample.complete"
]
# Should have both perf (3) and accuracy (5) completions
assert len(complete_events) >= 5
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tighter bounds with ==5

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test can be replicated by using the generated config file from the test to re-run though the dataset generation would be a bit challenging. Possibly as a followup feature.

):
dummy_dataloader = Dataset.load_from_file(
dataset_path,
ds_pickle_dataset_path,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did I miss removing the pickle in the filename? If yes, please update here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants