Conversation
|
MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅ |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant architectural shift for the benchmark system, moving from a multi-threaded design to a fully asynchronous, single-event-loop model. This change aims to improve the performance, scalability, and resource utilization of the benchmark by leveraging Python's Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new asynchronous benchmark execution path, replacing the previous threaded implementation. New modules for async benchmark execution (execute_async.py), event recording (async_recorder.py), and event reporting (async_reporter.py) have been added. The Scheduler in load_generator has been updated to support asynchronous iteration and concurrency control using asyncio primitives, moving away from threading. The benchmark_endpoints utility now supports running benchmarks with either the new async or the old sync runtime, and the unit tests for the scheduler have been updated accordingly. I have no feedback to provide.
| """Best-effort cleanup — each step guarded individually.""" | ||
| try: | ||
| loop.remove_signal_handler(signal.SIGINT) | ||
| except Exception: |
| if pbar: | ||
| try: | ||
| pbar.close() | ||
| except Exception: |
| if recorder and not session_ended: | ||
| try: | ||
| recorder.record_event(SessionEventType.ENDED, time.monotonic_ns()) | ||
| except Exception: |
| if writer: | ||
| try: | ||
| writer.stop() | ||
| except Exception: |
| if http_client: | ||
| try: | ||
| http_client.shutdown() | ||
| except Exception: |
|
|
||
| try: | ||
| os.sched_setaffinity(0, range(os.cpu_count() or 1)) | ||
| except (OSError, AttributeError): |
| if publisher: | ||
| try: | ||
| publisher.close() | ||
| except Exception: |
| if zmq_ctx: | ||
| try: | ||
| zmq_ctx.cleanup() | ||
| except Exception: |
There was a problem hiding this comment.
Pull request overview
Adds a fully async (single-loop) benchmark execution path with ZMQ-published event records and a background SQLite writer process, plus a small E2E runner script and updated unit coverage for the async concurrency scheduler.
Changes:
- Introduces
run_benchmark_async()(new async benchmark runner) and switches the defaultrun_benchmark()orchestration to use it. - Adds async metrics components:
AsyncEventReporter(publisher-side) andAsyncEventRecorder(subscriber-side SQLite writer in a separate process). - Extends schedulers with async iteration (
__aiter__) and updates the concurrency scheduler unit test to use asyncio.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
tests/unit/load_generator/test_scheduler.py |
Converts ConcurrencyScheduler unit test to async iteration. |
src/inference_endpoint/load_generator/scheduler.py |
Adds async iteration support and refactors concurrency/max-throughput schedulers for async runtime. |
src/inference_endpoint/commands/benchmark/execute_async.py |
New async benchmark runner (sender/receiver, async event pipeline, report generation). |
src/inference_endpoint/commands/benchmark/execute.py |
Switches run_benchmark() to invoke the async runner via run_async. |
src/inference_endpoint/metrics/async_reporter.py |
New async event reporter that publishes EventRecords and tracks inflight. |
src/inference_endpoint/metrics/async_recorder.py |
New background-process SQLite writer consuming ZMQ event records. |
src/inference_endpoint/utils/benchmark_endpoints.py |
New utility to smoke-test benchmarks against local/external endpoints (async and sync). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def __init__(self, runtime_settings: RuntimeSettings, sample_order_cls): | ||
| super().__init__(runtime_settings, sample_order_cls) | ||
| assert runtime_settings.load_pattern is not None | ||
| target_concurrency = runtime_settings.load_pattern.target_concurrency | ||
| if target_concurrency is None or target_concurrency <= 0: | ||
| raise ValueError( | ||
| f"target_concurrency must be > 0 for CONCURRENCY load pattern, got {target_concurrency}" | ||
| ) | ||
|
|
||
| # Use threading.Condition for concurrency control with explicit counter | ||
| self._condition = threading.Condition() | ||
| self._inflight = 0 | ||
| self._target_concurrency = target_concurrency | ||
|
|
||
| # Register completion hook - free up slot when query completes | ||
| SampleEventHandler.register_hook(SampleEvent.COMPLETE, self._release_slot) | ||
|
|
||
| # Unused (required by Scheduler interface) - returns 0 delay | ||
| self._semaphore: asyncio.Semaphore | None = None | ||
| self.delay_fn = lambda: 0 |
There was a problem hiding this comment.
ConcurrencyScheduler now only enforces concurrency in aiter via an asyncio.Semaphore. The sync iterator path used by SchedulerBasedLoadGenerator / BenchmarkSession (threaded runtime and --sync in benchmark_endpoints) will no longer gate issuance by completions, so CONCURRENCY mode can oversubscribe and behave incorrectly. Either restore a blocking iter implementation (e.g., Condition/Semaphore) for the threaded path, or update the threaded runtime to call notify_complete() and acquire slots in iter so both runtimes enforce the same load pattern.
| def handle_response(self, result: QueryResult | StreamChunk) -> None: | ||
| ts = time.monotonic_ns() | ||
| if isinstance(result, StreamChunk): | ||
| ev = ( | ||
| SampleEventType.RECV_FIRST | ||
| if (result.metadata or {}).get("first_chunk", False) | ||
| else SampleEventType.RECV_NON_FIRST | ||
| ) | ||
| self.recorder.record_event(ev, ts, sample_uuid=result.id) | ||
| elif isinstance(result, QueryResult): | ||
| if result.error is not None: | ||
| logger.error(f"Error in request {result.id}: {result.error}") | ||
| self.recorder.record_event( | ||
| SampleEventType.COMPLETE, | ||
| ts, | ||
| sample_uuid=result.id, | ||
| data=result.response_output, | ||
| ) | ||
| self.collector.on_complete_hook(result) | ||
| self.scheduler.notify_complete() | ||
|
|
There was a problem hiding this comment.
_BenchmarkRuntime.handle_response() always calls rt.scheduler.notify_complete(), but during accuracy phases samples are issued using acc_scheduler. If accuracy uses the CONCURRENCY scheduler (or if accuracy sending is later switched to async iteration), completions will release the wrong semaphore and can deadlock or distort concurrency. Consider passing the active scheduler into handle_response() (or temporarily swapping rt.scheduler during the accuracy phase) so notify_complete() is applied to the scheduler that actually gated issuance.
| # ── Accuracy phases ────────────────────────────────────────── | ||
| if ctx.accuracy_datasets and not rt.stop_requested: | ||
| for acc_ds in ctx.accuracy_datasets: | ||
| ds_name = getattr( | ||
| acc_ds.__class__, "DATASET_ID", acc_ds.__class__.__name__ | ||
| ) | ||
| acc_rt = RuntimeSettings( | ||
| metric_target=ctx.rt_settings.metric_target, | ||
| reported_metrics=ctx.rt_settings.reported_metrics, | ||
| min_duration_ms=0, | ||
| max_duration_ms=None, | ||
| n_samples_from_dataset=acc_ds.num_samples(), | ||
| n_samples_to_issue=acc_ds.num_samples() * acc_ds.repeats, | ||
| min_sample_count=acc_ds.num_samples() * acc_ds.repeats, | ||
| rng_sched=ctx.rt_settings.rng_sched, | ||
| rng_sample_index=ctx.rt_settings.rng_sample_index, | ||
| load_pattern=ctx.rt_settings.load_pattern, | ||
| ) | ||
| acc_sched = ctx.scheduler.__class__( | ||
| acc_rt, WithoutReplacementSampleOrder | ||
| ) | ||
|
|
||
| logger.info(f"Running accuracy phase: {ds_name}") | ||
| await _run_accuracy_phase(rt, acc_ds, acc_sched) | ||
|
|
||
| logger.info("All accuracy samples issued") |
There was a problem hiding this comment.
The async runner never writes report_dir/sample_idx_map.json (UUID -> dataset index mapping). Accuracy scoring (Scorer._load_sample_index_map) requires this file and will raise FileNotFoundError in TestMode.ACC/BOTH. Please persist the performance uuid_to_index map and each accuracy dataset's UUID map under the dataset name (same structure as BenchmarkSession.sample_uuid_map) before finalize_benchmark() runs.
| """Async benchmark runner — single uvloop, no threads in the main process. | ||
|
|
||
| Architecture: | ||
| - HTTPEndpointClient on the running loop (no separate loop thread) | ||
| - ZmqEventRecordPublisher for non-blocking event publishing | ||
| - AsyncEventRecorder in a background process for SQLite writes | ||
| - Scheduler.__aiter__() for drift-correcting online timing | ||
| - Unified receiver: await recv() wakeup + poll() drain |
There was a problem hiding this comment.
Module docstring claims "no threads in the main process", but run_benchmark_async() uses asyncio.to_thread() to construct HTTPEndpointClient. If the intent is "no long-lived loop thread" or "no benchmark threads on the hot path", consider clarifying the wording to avoid misleading operational expectations.
| """Async benchmark runner — single uvloop, no threads in the main process. | |
| Architecture: | |
| - HTTPEndpointClient on the running loop (no separate loop thread) | |
| - ZmqEventRecordPublisher for non-blocking event publishing | |
| - AsyncEventRecorder in a background process for SQLite writes | |
| - Scheduler.__aiter__() for drift-correcting online timing | |
| - Unified receiver: await recv() wakeup + poll() drain | |
| """Async benchmark runner — single uvloop in the main thread, no separate long-lived loop thread. | |
| Architecture: | |
| - HTTPEndpointClient on the running loop (no separate loop thread on the hot path) | |
| - ZmqEventRecordPublisher for non-blocking event publishing | |
| - AsyncEventRecorder in a background process for SQLite writes | |
| - Scheduler.__aiter__() for drift-correcting online timing | |
| - Unified receiver: await recv() wakeup + poll() drain | |
| - Only short-lived helper threads may be used (e.g., asyncio.to_thread during setup) |
| _TOPIC_TO_SQLITE_EVENT_TYPE: dict[str, str] = { | ||
| # Session events | ||
| "session.started": "test_started", | ||
| "session.ended": "test_ended", | ||
| "session.stop_loadgen": "loadgen_stop", | ||
| "session.start_performance_tracking": "start_performance_tracking", | ||
| "session.stop_performance_tracking": "stop_performance_tracking", | ||
| # Sample events | ||
| "sample.issued": "loadgen_issue_called", | ||
| "sample.complete": "complete", | ||
| "sample.recv_first": "first_chunk_received", | ||
| "sample.recv_non_first": "non_first_chunk_received", | ||
| "sample.client_send": "http_request_issued", | ||
| "sample.client_resp_done": "http_response_completed", | ||
| "sample.transport_sent": "zmq_response_sent", | ||
| "sample.transport_recv": "zmq_request_received", | ||
| # Error events | ||
| "error.generic": "error", | ||
| "error.loadgen": "error", | ||
| "error.session": "error", | ||
| "error.client": "error", | ||
| } | ||
|
|
||
| # SQLite schema (same as EventRecorder) | ||
| _CREATE_TABLE = ( | ||
| "CREATE TABLE IF NOT EXISTS events (" | ||
| "sample_uuid TEXT, event_type TEXT, timestamp_ns INTEGER, data BLOB)" | ||
| ) | ||
| _INSERT = "INSERT INTO events (sample_uuid, event_type, timestamp_ns, data) VALUES (?, ?, ?, ?)" | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- |
There was a problem hiding this comment.
AsyncEventRecorder/_SqliteWriterSubscriber introduces a new persistence path and a topic->sqlite event_type mapping relied on by MetricsReporter, but there are no unit tests validating that the produced SQLite DB is readable by MetricsReporter (schema, event_type strings, and JSON encoding of data). Given existing tests for metrics.recorder/reporter, it would be good to add a focused test that publishes a small set of EventRecords and asserts the resulting SQLite rows match expectations.
| @pytest.mark.unit | ||
| @pytest.mark.asyncio | ||
| @pytest.mark.parametrize("target_concurrency", [1, 2, 100, 1000], indirect=True) | ||
| def test_concurrency_scheduler(concurrency_runtime_settings, target_concurrency): | ||
| """Test ConcurrencyScheduler properly gates issuance by completions.""" | ||
| async def test_concurrency_scheduler(concurrency_runtime_settings, target_concurrency): | ||
| """Test ConcurrencyScheduler properly gates issuance by completions (async).""" | ||
| total_samples = concurrency_runtime_settings.n_samples_to_issue | ||
|
|
||
| scheduler = ConcurrencyScheduler( | ||
| concurrency_runtime_settings, WithReplacementSampleOrder | ||
| ) | ||
|
|
||
| # State tracking | ||
| state_lock = threading.RLock() | ||
| issued_count = 0 | ||
| completed_count = 0 | ||
| current_inflight = 0 | ||
| max_inflight = 0 | ||
| current_inflight = 0 | ||
|
|
||
| # Synchronization: signal when queries can complete and when they're done | ||
| can_complete = [threading.Event() for _ in range(total_samples)] | ||
| completed = [threading.Event() for _ in range(total_samples)] | ||
| # Signal when each query is issued | ||
| issued = [threading.Event() for _ in range(total_samples)] | ||
|
|
||
| def completion_worker(): | ||
| """Waits for signals to complete queries.""" | ||
| nonlocal completed_count, current_inflight | ||
|
|
||
| for position in range(total_samples): | ||
| can_complete[position].wait() | ||
|
|
||
| with state_lock: | ||
| completed_count += 1 | ||
| current_inflight -= 1 | ||
| assert current_inflight >= 0, "Inflight count went negative" | ||
|
|
||
| scheduler._release_slot() | ||
| completed[position].set() | ||
|
|
||
| threading.Thread(target=completion_worker, daemon=True).start() | ||
|
|
||
| def issue_worker(): | ||
| """Issues queries through scheduler.""" | ||
| async def sender(): | ||
| nonlocal issued_count, current_inflight, max_inflight | ||
| async for _s_idx in scheduler: | ||
| issued_count += 1 | ||
| current_inflight += 1 | ||
| max_inflight = max(max_inflight, current_inflight) | ||
| assert ( | ||
| current_inflight <= target_concurrency | ||
| ), f"Concurrency {current_inflight} exceeded limit {target_concurrency}" | ||
|
|
||
| async def completer(): | ||
| """Simulates completions by calling notify_complete after yielding.""" | ||
| nonlocal current_inflight | ||
| completed = 0 | ||
| while completed < total_samples: | ||
| if current_inflight > 0: | ||
| current_inflight -= 1 | ||
| scheduler.notify_complete() | ||
| completed += 1 | ||
| else: | ||
| await asyncio.sleep(0) | ||
|
|
||
| for position, _ in enumerate(scheduler): | ||
| with state_lock: | ||
| issued_count += 1 | ||
| current_inflight += 1 | ||
| max_inflight = max(max_inflight, current_inflight) | ||
| assert ( | ||
| current_inflight <= target_concurrency | ||
| ), f"Concurrency {current_inflight} exceeded limit {target_concurrency}" | ||
| issued[position].set() | ||
|
|
||
| issue_thread = threading.Thread(target=issue_worker, daemon=True) | ||
| issue_thread.start() | ||
|
|
||
| try: | ||
| # Phase 1: First target_concurrency queries issue immediately | ||
| for position in range(target_concurrency): | ||
| issued[position].wait() | ||
|
|
||
| with state_lock: | ||
| assert issued_count == target_concurrency | ||
| assert completed_count == 0 | ||
| assert current_inflight == target_concurrency | ||
|
|
||
| # Phase 2: Verify scheduler blocks when at capacity, unblocks on completion | ||
| for position in range(target_concurrency, total_samples): | ||
| position_to_complete = position - target_concurrency | ||
|
|
||
| # Verify next query hasn't issued yet (scheduler is blocking) | ||
| assert not issued[ | ||
| position | ||
| ].is_set(), f"Query {position} issued before slot was freed" | ||
|
|
||
| # Free a slot | ||
| can_complete[position_to_complete].set() | ||
| completed[position_to_complete].wait() | ||
|
|
||
| # Verify next query now issues | ||
| issued[position].wait() | ||
|
|
||
| with state_lock: | ||
| assert current_inflight == target_concurrency | ||
|
|
||
| # Phase 3: Complete remaining queries and cleanup | ||
| for position in range(target_concurrency, total_samples): | ||
| can_complete[position].set() | ||
| completed[position].wait() | ||
|
|
||
| issue_thread.join() | ||
|
|
||
| # Final validation | ||
| with state_lock: | ||
| assert issued_count == total_samples | ||
| assert completed_count == total_samples | ||
| assert current_inflight == 0 | ||
| assert max_inflight == target_concurrency | ||
| await asyncio.gather(sender(), completer()) |
There was a problem hiding this comment.
This test now exercises only the async iteration path of ConcurrencyScheduler. Since the codebase still has a threaded runtime (BenchmarkSession/SchedulerBasedLoadGenerator) that consumes schedulers via iter(), consider adding/keeping a sync-path test that asserts CONCURRENCY mode blocks issuance until notify_complete (or SampleEvent.COMPLETE) frees a slot, to prevent regressions across runtimes.
42f2271 to
b6ab36d
Compare
What does this PR do?
TODO(vir): add desc when functional.
Type of change
Related issues
Testing
Checklist