Skip to content

feat: replace per-conversation ZMQ fallback with dataset client metadata request#742

Draft
ajcasagrande wants to merge 1 commit intomainfrom
ajc/dataset-manager-client-request
Draft

feat: replace per-conversation ZMQ fallback with dataset client metadata request#742
ajcasagrande wants to merge 1 commit intomainfrom
ajc/dataset-manager-client-request

Conversation

@ajcasagrande
Copy link
Contributor

@ajcasagrande ajcasagrande commented Mar 7, 2026

When Workers don't have a dataset client at startup, instead of requesting individual conversations from DatasetManager via ZMQ, they now request the client metadata once and initialize their own local client. This simplifies the fallback path and eliminates per-request error handling overhead.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added dataset client request/response message types for metadata exchange.
    • Introduced factory-based dataset client initialization with dataset manager fallback.
  • Refactor

    • Improved dataset client metadata retrieval flow using dedicated message protocol.
  • Tests

    • Updated worker tests for new dataset client initialization flow.

…ata request

When Workers don't have a dataset client at startup, instead of requesting
individual conversations from DatasetManager via ZMQ, they now request the
client metadata once and initialize their own local client. This simplifies
the fallback path and eliminates per-request error handling overhead.

Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
@github-actions github-actions bot added the feat label Mar 7, 2026
@github-actions
Copy link

github-actions bot commented Mar 7, 2026

Try out this PR

Quick install:

pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@e5654224faa127725afa50cd719101b46ae83d76

Recommended with virtual environment (using uv):

uv venv --python 3.12 && source .venv/bin/activate
uv pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@e5654224faa127725afa50cd719101b46ae83d76

Last updated for commit: e565422Browse code

@ajcasagrande
Copy link
Contributor Author

What happens if a worker gets sent 100 credits simultaneously and doesn't have client metadata? will this cause a race confition?

@codecov
Copy link

codecov bot commented Mar 7, 2026

Codecov Report

❌ Patch coverage is 64.51613% with 11 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/aiperf/dataset/dataset_manager.py 33.33% 4 Missing ⚠️
src/aiperf/workers/worker.py 66.66% 4 Missing ⚠️
src/aiperf/common/messages/dataset_messages.py 72.72% 3 Missing ⚠️

📢 Thoughts on this report? Let us know!

@coderabbitai
Copy link

coderabbitai bot commented Mar 7, 2026

Walkthrough

Two new message types (DATASET_CLIENT_REQUEST and DATASET_CLIENT_RESPONSE) are introduced to the enum and message infrastructure. New message classes with validation hooks are added, and the dataset manager implements a handler for client requests. Worker initialization logic is refactored to use the new dataset client request-response flow.

Changes

Cohort / File(s) Summary
Enum and Message Infrastructure
src/aiperf/common/enums/enums.py, src/aiperf/common/messages/__init__.py, src/aiperf/common/messages/dataset_messages.py
Added DATASET_CLIENT_REQUEST and DATASET_CLIENT_RESPONSE enum members; exported DatasetClientRequestMessage and DatasetClientResponseMessage; implemented new message classes with before-validation hooks to coerce dict inputs into DatasetClientMetadata.
Dataset Manager Handler
src/aiperf/dataset/dataset_manager.py
Implemented _handle_dataset_client_request method to retrieve and return dataset client metadata in response to client requests. Note: handler definition appears duplicated in the file.
Worker Client Initialization
src/aiperf/workers/worker.py, tests/unit/workers/test_worker.py
Refactored dataset client initialization to use dedicated request-response flow; added _initialize_dataset_client and _request_dataset_client_from_dataset_manager helper methods; replaced direct ConversationRequestMessage usage with DatasetClientRequestMessage for metadata operations; updated corresponding test mocking strategy.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Poem

🐰 New message types hop into place,
Request and response in dataset's embrace,
Client metadata flows with validation's grace,
Worker refactored at a measured pace,
Infrastructure strengthened—a rabbit's-pace chase! 🌟

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately describes the main architectural change: replacing per-conversation ZMQ fallback with a dataset client metadata request pattern.
Docstring Coverage ✅ Passed Docstring coverage is 84.62% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
tests/unit/workers/test_worker.py (1)

302-315: Add a concurrent startup-fallback test.

This only covers the single-caller path. A multi-caller case that drives _retrieve_conversation() concurrently and asserts _request_dataset_client_from_dataset_manager is awaited once would catch the race-prone path behind this change.

Based on learnings: "Do not assume atomicity across awaits; guard shared mutable state accordingly."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/workers/test_worker.py` around lines 302 - 315, Add a concurrent
startup-fallback unit test that simulates multiple coroutines calling
_retrieve_conversation concurrently when mock_worker._dataset_client is None and
ensure _request_dataset_client_from_dataset_manager is awaited exactly once; do
this by monkeypatching mock_worker._request_dataset_client_from_dataset_manager
with an AsyncMock that sets mock_worker._dataset_client and returns the client,
then spawn multiple asyncio tasks (e.g., via asyncio.gather or create_task) that
call mock_worker._retrieve_conversation(session_id) concurrently and finally
assert the patched AsyncMock was awaited once (call_count or
assert_awaited_once) and all callers received the expected Conversation from
get_conversation on the single created client.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/aiperf/workers/worker.py`:
- Around line 223-247: The worker currently initializes dataset clients directly
from DatasetManager metadata in _on_dataset_configured and
_initialize_dataset_client, which causes pods in Kubernetes to attempt to use
control-plane paths; modify these methods to first check the run mode (e.g.,
Kubernetes) and, if running in Kubernetes, do NOT initialize from
DatasetClientMetadata immediately but instead wait for the
WorkerPodManager/local-download path to become available (the “downloaded-path”
source) before creating ClientStoreClass; in practice, add a guard around the
existing initialization logic in
_on_dataset_configured/_initialize_dataset_client that checks run type and
either returns/waits for an alternative notification from WorkerPodManager (or
polls for the downloaded-path) and only then instantiates
ClientStoreClass(client_metadata=...) and calls initialize() and
_dataset_configured_event.set().
- Around line 232-247: Create the dataset client locally, fully initialize it,
then publish it to the Worker instance under synchronization to avoid races: in
_initialize_dataset_client build the client in a local variable (e.g., client =
ClientStoreClass(...)), await client.initialize(), then acquire the existing
Worker-level lock (or create one if missing) and assign self._dataset_client =
client and set self._dataset_configured_event inside that critical section; also
update get_conversation to wait on self._dataset_configured_event (or re-check
under the same lock) before using self._dataset_client and guard against
duplicate concurrent initializations by checking the published
self._dataset_client after waiting and returning early if another initializer
already set it.

---

Nitpick comments:
In `@tests/unit/workers/test_worker.py`:
- Around line 302-315: Add a concurrent startup-fallback unit test that
simulates multiple coroutines calling _retrieve_conversation concurrently when
mock_worker._dataset_client is None and ensure
_request_dataset_client_from_dataset_manager is awaited exactly once; do this by
monkeypatching mock_worker._request_dataset_client_from_dataset_manager with an
AsyncMock that sets mock_worker._dataset_client and returns the client, then
spawn multiple asyncio tasks (e.g., via asyncio.gather or create_task) that call
mock_worker._retrieve_conversation(session_id) concurrently and finally assert
the patched AsyncMock was awaited once (call_count or assert_awaited_once) and
all callers received the expected Conversation from get_conversation on the
single created client.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 3e7f4026-d0cb-4e41-a457-b76428cbeba1

📥 Commits

Reviewing files that changed from the base of the PR and between e294a31 and e565422.

📒 Files selected for processing (6)
  • src/aiperf/common/enums/enums.py
  • src/aiperf/common/messages/__init__.py
  • src/aiperf/common/messages/dataset_messages.py
  • src/aiperf/dataset/dataset_manager.py
  • src/aiperf/workers/worker.py
  • tests/unit/workers/test_worker.py

Comment on lines 223 to 247
async def _on_dataset_configured(self, msg: DatasetConfiguredNotification) -> None:
"""Initialize dataset client when configuration is received.
"""Initialize dataset client when configuration is received."""
await self._initialize_dataset_client(msg.client_metadata)
self.debug(
lambda: (
f"Dataset client initialized: type={msg.client_metadata.client_type}"
)
)

async def _initialize_dataset_client(
self, client_metadata: DatasetClientMetadata
) -> None:
"""Create and initialize the dataset client from metadata.

Uses factory pattern to dynamically create the appropriate client.
The factory auto-extracts client_type from client_metadata, leveraging
the discriminated union pattern for type-safe routing. This allows new
storage backends (S3, Redis, etc.) to work without modifying Worker code.
"""
ClientStoreClass = plugins.get_class(
PluginType.DATASET_CLIENT_STORE, msg.client_metadata.client_type
PluginType.DATASET_CLIENT_STORE, client_metadata.client_type
)
self._dataset_client = ClientStoreClass(client_metadata=msg.client_metadata)
self._dataset_client = ClientStoreClass(client_metadata=client_metadata)
await self._dataset_client.initialize()
self._dataset_configured_event.set()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't initialize dataset clients from DatasetManager metadata in Kubernetes.

Lines 223-225 and Lines 588-604 treat DatasetManager client_metadata as worker-local in every run mode. But src/aiperf/dataset/dataset_manager.py Lines 356-364 explicitly say that, in Kubernetes, those paths are control-plane paths that workers should ignore until WorkerPodManager provides local download paths. As written, a pod can try to mmap files that do not exist locally. Guard both paths on run type and keep waiting for the downloaded-path source in Kubernetes.

Also applies to: 588-605

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/workers/worker.py` around lines 223 - 247, The worker currently
initializes dataset clients directly from DatasetManager metadata in
_on_dataset_configured and _initialize_dataset_client, which causes pods in
Kubernetes to attempt to use control-plane paths; modify these methods to first
check the run mode (e.g., Kubernetes) and, if running in Kubernetes, do NOT
initialize from DatasetClientMetadata immediately but instead wait for the
WorkerPodManager/local-download path to become available (the “downloaded-path”
source) before creating ClientStoreClass; in practice, add a guard around the
existing initialization logic in
_on_dataset_configured/_initialize_dataset_client that checks run type and
either returns/waits for an alternative notification from WorkerPodManager (or
polls for the downloaded-path) and only then instantiates
ClientStoreClass(client_metadata=...) and calls initialize() and
_dataset_configured_event.set().

Comment on lines +232 to 247
async def _initialize_dataset_client(
self, client_metadata: DatasetClientMetadata
) -> None:
"""Create and initialize the dataset client from metadata.

Uses factory pattern to dynamically create the appropriate client.
The factory auto-extracts client_type from client_metadata, leveraging
the discriminated union pattern for type-safe routing. This allows new
storage backends (S3, Redis, etc.) to work without modifying Worker code.
"""
ClientStoreClass = plugins.get_class(
PluginType.DATASET_CLIENT_STORE, msg.client_metadata.client_type
PluginType.DATASET_CLIENT_STORE, client_metadata.client_type
)
self._dataset_client = ClientStoreClass(client_metadata=msg.client_metadata)
self._dataset_client = ClientStoreClass(client_metadata=client_metadata)
await self._dataset_client.initialize()
self._dataset_configured_event.set()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

wc -l src/aiperf/workers/worker.py

Repository: ai-dynamo/aiperf

Length of output: 92


🏁 Script executed:

# Read the _initialize_dataset_client method and surrounding context
sed -n '230,250p' src/aiperf/workers/worker.py

Repository: ai-dynamo/aiperf

Length of output: 951


🏁 Script executed:

# Read the lines 580-586 where client is used
sed -n '575,590p' src/aiperf/workers/worker.py

Repository: ai-dynamo/aiperf

Length of output: 794


🏁 Script executed:

# Read the lines 588-605 fallback request
sed -n '585,610p' src/aiperf/workers/worker.py

Repository: ai-dynamo/aiperf

Length of output: 1147


🏁 Script executed:

# Search for _on_dataset_configured method
rg -n "_on_dataset_configured" src/aiperf/workers/worker.py -A 10

Repository: ai-dynamo/aiperf

Length of output: 578


🏁 Script executed:

# Search for _dataset_configured_event usage
rg -n "_dataset_configured_event" src/aiperf/workers/worker.py

Repository: ai-dynamo/aiperf

Length of output: 224


🏁 Script executed:

# Check line 673 context to see how the event is used for synchronization
sed -n '670,685p' src/aiperf/workers/worker.py

Repository: ai-dynamo/aiperf

Length of output: 761


🏁 Script executed:

# Check if there's any lock mechanism used with _dataset_client
rg -n "asyncio.Lock\|RLock\|Semaphore\|_dataset_client" src/aiperf/workers/worker.py | head -30

Repository: ai-dynamo/aiperf

Length of output: 42


🏁 Script executed:

# Get full context of the on_pull_message handler to see event wait usage
rg -n "@on_pull_message\|_dataset_configured_event" src/aiperf/workers/worker.py -B 2 -A 5

Repository: ai-dynamo/aiperf

Length of output: 42


🏁 Script executed:

# Search for methods that call get_conversation to understand when it's invoked
rg -n "get_conversation" src/aiperf/workers/worker.py -B 3 -A 1

Repository: ai-dynamo/aiperf

Length of output: 565


🏁 Script executed:

# Check the on_message handlers and their concurrency
rg -n "@on_message\|@on_pull_message\|@on_start\|@on_init" src/aiperf/workers/worker.py | head -20

Repository: ai-dynamo/aiperf

Length of output: 42


🏁 Script executed:

# Get more context around line 580 to see the full get_conversation method
sed -n '570,595p' src/aiperf/workers/worker.py

Repository: ai-dynamo/aiperf

Length of output: 1219


🏁 Script executed:

# Check if there's any initialization guard or lock used elsewhere with _dataset_client
rg -n "self._dataset_client" src/aiperf/workers/worker.py | head -20

Repository: ai-dynamo/aiperf

Length of output: 773


Fix race condition in dataset client initialization.

The assignment of self._dataset_client at line 245 occurs before await self._dataset_client.initialize() completes at line 246. This creates a window where concurrent calls to get_conversation() (line 580) can observe self._dataset_client is not None and invoke methods on an uninitialized client. The synchronization event is set only after initialization (line 247) and is awaited only during startup configuration, not during concurrent get_conversation() calls. Additionally, if get_conversation() triggers the fallback request (line 585), it will invoke _initialize_dataset_client() again while the original initialization is still in progress, causing client conflicts or double-initialization. Build the client in a local variable, await initialization, then assign under proper synchronization (lock or fully completed event).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/aiperf/workers/worker.py` around lines 232 - 247, Create the dataset
client locally, fully initialize it, then publish it to the Worker instance
under synchronization to avoid races: in _initialize_dataset_client build the
client in a local variable (e.g., client = ClientStoreClass(...)), await
client.initialize(), then acquire the existing Worker-level lock (or create one
if missing) and assign self._dataset_client = client and set
self._dataset_configured_event inside that critical section; also update
get_conversation to wait on self._dataset_configured_event (or re-check under
the same lock) before using self._dataset_client and guard against duplicate
concurrent initializations by checking the published self._dataset_client after
waiting and returning early if another initializer already set it.

@ajcasagrande ajcasagrande marked this pull request as draft March 9, 2026 15:43
@ajcasagrande ajcasagrande marked this pull request as draft March 9, 2026 15:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant