[data] feat: TransferQueue - Support managing multiple data partitions for Train/Val/Test in controller#43
Conversation
…s for Train/Val/Test in controller
recipe/transfer_queue/ray_trainer.py
Outdated
| self.config = OmegaConf.merge(tq_config, self.config) | ||
|
|
||
| # 4. create client | ||
| # each client should be allocated to exactly one controller |
There was a problem hiding this comment.
| # each client should be allocated to exactly one controller |
There was a problem hiding this comment.
Pull Request Overview
This PR refactors the TransferQueue data system to use a unified controller/client architecture instead of separate train and validation systems. The key changes include:
- Consolidates dual data systems (train and validation) into a single unified TransferQueue system
- Replaces
global_stepparameter withpartition_idfor better data partitioning - Updates TransferQueue dependency to a newer commit version
- Removes the concept of separate validation clients
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| verl/utils/transferqueue_utils.py | Removed separate validation client; simplified client creation to use single controller with storage manager initialization |
| verl/single_controller/base/worker.py | Updated client creation signature to pass unified controller info and config instead of separate train/val parameters |
| requirements_transferqueue.txt | Updated TransferQueue dependency to newer commit hash (862b74a) |
| recipe/transfer_queue/ray_trainer.py | Consolidated train/val data systems into unified system; replaced global_step with partition_id prefixed by train/val context |
| recipe/transfer_queue/agent_loop.py | Updated to use unified controller info and config parameters |
Comments suppressed due to low confidence (1)
recipe/transfer_queue/ray_trainer.py:708
- The 'validate' flag is still being set in
extra_info(lines 708, 738) but the TransferQueue system no longer uses this flag to route data to separate validation clients after the refactor. This flag appears to only be used by rollout workers to determine sampling behavior. Consider documenting this usage or removing it if it's truly unused after the consolidation of train/val data systems.
"validate": True,
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughUnified transfer queue data system architecture by consolidating per-role (train/val) initialization into a single controller-config-based approach. Refactored method signatures across agent loop, worker, and utility modules to use unified Changes
Sequence Diagram(s)sequenceDiagram
participant RayTrainer as RayPPOTrainer
participant DataSys as _initialize_data_system()
participant Controller as TransferQueueController
participant Storage as SimpleStorageUnit
participant Client as TransferQueueClient
Note over RayTrainer,Client: New Unified Data System Flow
RayTrainer->>DataSys: Initialize single data system
DataSys->>DataSys: Compute train/val data sizes
DataSys->>Storage: Create SimpleStorageUnit
DataSys->>Controller: Create TransferQueueController
DataSys->>Controller: Register storage_unit_infos
DataSys->>RayTrainer: Return controller_info, storage_infos
rect rgb(220, 240, 220)
Note over RayTrainer,Client: Worker Setup (Old: multiple clients per role)
RayTrainer->>Client: create_transferqueue_client(controller_info, config)
Client->>Client: initialize_storage_manager(AsyncSimpleStorageManager)
end
rect rgb(240, 220, 220)
Note over RayTrainer,Client: RPC Pattern Change (Old: global_step-based)
RayTrainer->>Storage: async_put(key="train_{step}", data)
RayTrainer->>Storage: async_put(key="val_{step}", data)
Storage-->>RayTrainer: Partition_id-based namespacing
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25–40 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
recipe/transfer_queue/ray_trainer.py (1)
447-451: Document why allow_objects=True is necessary for config merge.The code creates a temporary config with
allow_objects=Trueto preserveZMQServerInfoinstances during the merge. While this is correct, consider adding a comment explaining why this flag is required, as it's not immediately obvious and could be accidentally removed during refactoring.Apply this diff to add clarifying documentation:
tq_config = OmegaConf.create({}, flags={"allow_objects": True}) # Note: Need to generate a new DictConfig - # with allow_objects=True to maintain ZMQServerInfo instance. Otherwise it will be flattened to dict + # with allow_objects=True to preserve ZMQServerInfo instances (which contain socket connection details). + # Without this flag, OmegaConf would flatten these objects to dicts, breaking the transfer queue client initialization. tq_config.controller_info = self.data_system_controller_info
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
recipe/transfer_queue/agent_loop.py(1 hunks)recipe/transfer_queue/ray_trainer.py(15 hunks)requirements_transferqueue.txt(1 hunks)verl/single_controller/base/worker.py(1 hunks)verl/utils/transferqueue_utils.py(3 hunks)
🔇 Additional comments (8)
recipe/transfer_queue/agent_loop.py (1)
70-72: LGTM! Signature simplified to unified controller/config pattern.The method signature has been updated to use a single
controller_infoandconfiginstead of separatecontroller_infos,storage_infos, androleparameters. This aligns with the unified data system architecture described in the PR.verl/single_controller/base/worker.py (1)
134-141: LGTM! Worker-side signature updated to match unified API.The signature change removes the
roleparameter and consolidatescontroller_infos/storage_infosintocontroller_info/config. The client ID now uses a simplerworker_{rank}format instead of incorporating the role.verl/utils/transferqueue_utils.py (2)
45-52: LGTM! Unified transfer queue client initialization.The function now creates a single
AsyncTransferQueueClientand initializes its storage manager in one place, removing the complexity of managing separate train/val clients. This aligns with the unified data system architecture.
104-104: All visible callsites have been successfully updated to use the unified client.The verification found:
- No references to old
get_val_transferqueue_clientorget_train_transferqueue_clientfunctions- Only
_TRANSFER_QUEUE_CLIENTused intransferqueue_utils.py(line 104)- Partition-based routing is explicit in
ray_trainer.pyviapartition_idparameters toasync_get_meta(),async_put(), andasync_clear()- All calls in the decorator pattern (lines 172, 174) correctly convert
BatchMetatoDataProtothrough the unified clientHowever, the partition routing mechanism at line 104 relies on
AsyncTransferQueueClient(imported from externaltransfer_queuemodule), which cannot be inspected in this codebase. The routing assumes the client maintains partition context fromasync_get_meta()through toasync_get_data(), but this contract cannot be verified without access to the external dependency.recipe/transfer_queue/ray_trainer.py (3)
414-460: LGTM! Unified data system initialization consolidates train/val partitions.The new
_initialize_data_systemmethod creates a singleTransferQueueControllerand shared storage units sized for combined train+val data. The use ofOmegaConf.createwithallow_objects=Truecorrectly preservesZMQServerInfoinstances when merging into config.
671-678: Partition_id naming is consistent across all RPC calls.Verification confirms all async_put, async_get_meta, async_get_data, and async_clear operations for validation data consistently use
f"val_{self.global_steps - 1}", and training data consistently usef"train_{self.global_steps - 1}". Theself.global_steps - 1offset is correctly applied uniformly throughout.
1268-1270: Concern verified and resolved.The code correctly calls
async_putat line 1269 before any subsequentasync_get_metaorasync_get_dataoperations on the same partition. Specifically:
- Line 1269:
async_putwrites data topartition_id=f"train_{self.global_steps - 1}"- Lines 1273+:
async_get_metaandasync_get_dataread from the same partition_idAll read operations occur after the write, preventing consumer failures.
requirements_transferqueue.txt (1)
2-2: Avoid using a same-day commit as a dependency; consider waiting for a stable release.The commit
862b74aexists in the upstream repository, but it was committed today (2025-11-07 03:01 UTC). This is far too recent to be considered stable. The commit is a bug fix ("Close zmq socket for storage manager"), but lacks the testing time required for production use. Wait for an official release or a commit with a proven track record.
…s for Train/Val/Test in controller
What does this PR do?
Checklist Before Starting
[{modules}] {type}: {description}(This will be checked by the CI){modules}includefsdp,megatron,sglang,vllm,rollout,trainer,ci,training_utils,recipe,hardware,deployment,ray,worker,single_controller,misc,perf,model,algo,env,tool,ckpt,doc,data,like[megatron, fsdp, doc]{type}is infeat,fix,refactor,chore,test[BREAKING]to the beginning of the title.[BREAKING][fsdp, megatron] feat: dynamic batchingTest
API and Usage Example
# Add code snippet or script demonstrating how to use thisDesign & Code Changes
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaysci-requestchannel in theverlSlack workspace. (If not accessible, please try the Feishu group (飞书群).)Summary by CodeRabbit
Refactor
Chores