diff --git a/docs/advance/fully_async.md b/docs/advance/fully_async.md
index 051e57586b8..5cb9b6acc7b 100644
--- a/docs/advance/fully_async.md
+++ b/docs/advance/fully_async.md
@@ -105,6 +105,7 @@ but the overlap in their time consumption reduces the end-to-end time consumptio
| `async_training.checkpoint_engine.enable` | Whether to use checkpoint_engine for accelerating, default `True` |
| `async_training.checkpoint_engine.overlap_broadcast_and_consume` | When use checkpoint_engine, whether to overlap broadcast and load_weights, default `False` |
| `async_training.checkpoint_engine.device_buffer_size_M` | When use checkpoint_engine, the user-specific bucket size (MB), default `4096` |
+| `async_training.use_trainer_do_validate` | Whether use trainer node to do validate process, default `False`|
**Further Explanation:**
@@ -194,6 +195,13 @@ but the overlap in their time consumption reduces the end-to-end time consumptio
- When disable `overlap_broadcast_and_consume`, the additional device memory overhead of
trainer rank is `2 * bucket_size`and rollout rank is `1 * bucket_size`。
+* `async_training.use_trainer_do_validate`
+
+ It controls whether to use the trainer's `do_validate` method for validation.
+ If set to True, the trainer will perform validation after each parameter update. It can reduce the validation time
+ overhead and trainer node idle time.
+ If set to False, the trainer will not perform validation.
+
### Supported Modes
1. on policy pipeline:
@@ -477,6 +485,35 @@ We tested the single-step parameter synchronization time of the checkpoint-engin
| Qwen3-235B-A22B | 64 | 64 | False | 58.57s |
| Qwen3-235B-A22B | 64 | 64 | True | 23.70s |
+### use_trainer_do_validate Experiment
+
+We tested the effect of setting `use_trainer_do_validate=True` on the training process. The results show that setting
+this parameter to True can reduce the validation time overhead and trainer node idle time.
+We used Qwen2.5-Math-7B to verify the benefits of `use_trainer_do_validate=True` on the training process, we achieved about 2x performance improvement on validation time, and the trainer node idle time is reduced by about 40%.
+
+* Machine: H20
+* Model: Qwen2.5-Math-7B
+* Rollout length: max_response_length FSDP2: 10K tokens;
+* Algorithm: DAPO
+* Dataset: TRAIN_FILE: dapo-math-17k.parquet TEST_FILE: aime-2024.parquet
+* Engine: vllm+FSDP2
+* rollout.n: 16
+* ppo_mini_batch_size: 32
+* test_freq: 10
+
+* fully_async_policy
+ * total_rollout_steps: 512*400
+ * require_batches: 4
+ * trigger_parameter_sync_step: 4
+ * staleness_threshold: 0.5
+ * partial_rollout: True
+
+| training mode | resource allocation | step | gen | old_log_prob | update_actor | validate time | total time
50 step | acc/mean@2 |
+|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|
+| colocate sync | 16 | 484.623 | 52.939 | 0 | 430.263 | 205.080 | 7h9m | 22.6 |
+| fully_async_policy | 8:8 | 489.953 | 52.622 | 0 | 435.874 | 95.699 | 7h2m | 21.0 |
+
+
## Multi-Turn Tool Calling
Referencing **recipe/retool** and **ToolAgentLoop**, we implemented **AsyncPartialToolAgentLoop**, a multi-turn
diff --git a/recipe b/recipe
deleted file mode 160000
index 21892b92769..00000000000
--- a/recipe
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 21892b9276936efab5375c3f6b8415e472ef7118
diff --git a/recipe/fully_async_policy/README.md b/recipe/fully_async_policy/README.md
new file mode 100644
index 00000000000..70c8faa788b
--- /dev/null
+++ b/recipe/fully_async_policy/README.md
@@ -0,0 +1,597 @@
+# Recipe: Fully Async Policy Trainer
+
+**Author:** `https://github.com/meituan-search`
+
+Last updated: 12/25/2025.
+
+This document introduces a fully asynchronous PPO training system that completely decouples the Trainer and Rollouter,
+supporting asynchronous sample generation and training.
+Under this system, we achieved a 2.35x-2.67x performance improvement when training the Qwen2.5-7B model with 128 GPUs,
+without significantly affecting the results.
+
+## Introduction
+
+### Background
+
+The separated rollout and train architecture, compared to the colocate architecture, can allocate resources more
+flexibly and design more flexible training logic, thereby addressing issues such as low GPU utilization and training
+efficiency caused by long-tail problems.
+The one_step_off_policy alleviates the problem of long rollout times and achieves some gains in training efficiency by
+designing a separated architecture and performing asynchronous training between rollout and train for one round.
+However, it forcibly uses data from one round of asynchronous training, which is not flexible enough and cannot
+completely eliminate the impact of long-tail on training efficiency.
+In other frameworks such as AReaL, Magistral, StreamRL, and AsyncFlow, asynchronous training and streaming training have
+been implemented based on the separated architecture and have achieved gains.
+We borrow from their methods and implemented them in VERL. The fully_async_policy supports asynchronous, streaming, and
+partial
+rollout training.
+By reasonably setting parameters such as resource allocation and parameter synchronization frequency, fully_async_policy
+can significantly improve training efficiency.
+
+> Magistral https://arxiv.org/abs/2506.10910
+>
+> AReaL: A Large-Scale Asynchronous Reinforcement Learning System for Language
+> Reasoning https://arxiv.org/abs/2505.24298
+>
+> StreamRL: Scalable, Heterogeneous, and Elastic RL for LLMs with Disaggregated Stream
+> Generation https://arxiv.org/abs/2504.15930
+>
+> AsyncFlow: An Asynchronous Streaming RL Framework for Efficient LLM Post-Training https://arxiv.org/abs/2507.01663
+>
+
+### Core Contributions
+
+* **Resource Isolation**: Unlike using hybrid_engine, Rollouter and Trainer use separate computing resources and need to
+ specify the resources they occupy separately.
+* **Parallel Generation and Training**: While the Trainer is training, the Rollouter is generating new samples.
+* **Multi-step Asynchronous**: Compared to one step off policy, it supports asynchronous settings from 0.x steps to
+ multiple steps, making the asynchronous solution more flexible.
+* **NCCL Parameter Synchronization**: Based on the nccl communication primitive, refer to [checkpoint-engine](https://github.com/MoonshotAI/checkpoint-engine) to
+ achieve efficient parameter synchronization between Rollouter and Trainer.
+* **Stream Inference and Training**: Rollouter generates data sample by sample, and data transmission uses a single
+ sample as the minimum transmission unit.
+* **Asynchronous Training and Freshness Control**: By setting the parameter async_training.staleness_threshold, it
+ supports training with samples generated by old parameters.
+* **PartialRollout**: The Rollouter's inference process supports partial rollout logic. During parameter
+ synchronization, by adding `sleep() and resume()` logic, it
+ saves samples from ongoing rollouts and continues using them in the next rollout, reducing the time spent waiting for
+ ongoing tasks to finish during parameter synchronization.
+
+Currently, the supported usage mode is megatron/fsdp+vllm. vllm must use the server mode based on AgentLoop.
+
+## Design
+
+The overall architecture of fully_async_policy is shown in the figure below. fully_async_policy mainly consists of four
+parts: Rollouter, MessageQueue, Trainer, and ParameterSynchronizer.
+
+
+
+1. Rollouter generates sequences sample by sample and puts the generated samples into the MessageQueue, with the
+ production speed controlled by freshness.
+2. MessageQueue is used to temporarily store samples generated by Rollouter.
+3. Trainer fetches samples from MessageQueue sample by sample. After fetching `require_batches*ppo_mini_batch_size`
+ samples, it will perform training. After training for async_training.trigger_parameter_sync_step rounds, it triggers
+ a parameter synchronization with Rollouter.
+4. ParameterSynchronizer implements the NCCL synchronous parameter synchronization capability.
+
+The source of benefits compared to the base scheme lies in the fact that in the colocate case, using more resources for
+rollout cannot solve the idleness caused by long-tail samples.
+After we perform resource isolation, the time for rollout and train may be longer than before (because fewer resources
+are used),
+but the overlap in their time consumption reduces the end-to-end time consumption.
+
+
+
+## Usage
+
+### Parameter Description
+
+| super params | implication |
+|-----------------------------------------------|------------------------------------------------------------------------------------------------|
+| `trainer.nnodes` | Number of nodes for Trainer |
+| `trainer.n_gpus_per_node` | Number of GPUs per node for Trainer |
+| `rollout.nnodes` | Number of nodes for Rollouter |
+| `rollout.n_gpus_per_node` | Number of GPUs per node for Rollouter |
+| `data.train_batch_size` | In the fully async strategy, this value is not effective (default is 0) |
+| `data.gen_batch_size` | In the fully async strategy, uses streaming sample production logic (default is 1) |
+| `rollout.total_rollout_steps` | Total number of rollout samples |
+| `rollout.test_freq` | How many times Rollouter updates parameters before performing a validation |
+| `actor_rollout_ref.actor.ppo_mini_batch_size` | The ppo_mini_batch_size is a global num across all workers/gpus |
+| `async_training.require_batches` | Number of ppo_mini_batch_size that FullyAsyncTrainer fetches at once |
+| `async_training.trigger_parameter_sync_step` | Indicates how many local updates FullyAsyncTrainer performs before a parameter synchronization |
+| `async_training.staleness_threshold` | Freshness control |
+| `async_training.partial_rollout` | Whether to perform partial_rollout |
+| `async_training.use_rollout_log_probs` | Use log_probs generated by rollout |
+| `async_training.compute_prox_log_prob` | Whether to compute log_prob using the training model's parameters during the training phase. | |
+| `async_training.checkpoint_engine.enable`| Whether to use checkpoint_engine for accelerating, default `True`|
+| `async_training.checkpoint_engine.overlap_broadcast_and_consume` | When use checkpoint_engine, whether to overlap broadcast and load_weights, default `False`|
+| `async_training.checkpoint_engine.device_buffer_size_M` | When use checkpoint_engine, the user-specific bucket size (MB), default `4096`|
+| `async_training.use_trainer_do_validate` | Whether use trainer node to do validate process, default `False`|
+
+**Further Explanation:**
+
+* `rollout.total_rollout_steps`
+
+ Compared to colocate, the quantity can be aligned by multiplying train_batch_size and step:
+ `rollout.total_rollout_steps = data.train_batch_size * step`.
+
+* `async_training.trigger_parameter_sync_step`
+
+ In the fully async strategy, it indicates how many local updates the Trainer performs (i.e., how many times it fetches
+ `require_batches * ppo_mini_batch_size` samples) before a parameter synchronization with Rollouter.
+ Between every two parameter synchronizations between Rollouter and Trainer, the Trainer will process
+ `trigger_parameter_sync_step* require_batches*ppo_mini_batch_size` samples.
+ To fairly compare speed with colocate, trigger_parameter_sync_step should be set to
+ `data.train_batch_size / (require_batches * ppo_mini_batch_size)`.
+
+* `async_training.staleness_threshold`
+
+ In the fully async strategy, it indicates the maximum proportion of stale samples allowed to be used.
+
+ * staleness_threshold=0, indicates synchronous training.
+ Rollouter will generate a fixed number of samples between two parameter updates, the sample count is:
+ $$rollout\_num = (trigger\_parameter\_sync\_step*require\_batches*ppo\_mini\_batch\_size)$$
+ * staleness_threshold>0, indicates asynchronous training, can be set to a decimal for more flexible asynchronous
+ calls.
+ Rollouter will generate at most the following number of samples between two parameter updates:
+ $$rollout\_num = (1+staleness\_threshold)*(trigger\_parameter\_sync\_step*require\_batches*ppo\_mini\_batch\_size) - num\_staleness\_sample $$
+
+ num_staleness_sample represents the number of stale samples generated in excess during the last rollout.
+
+ Since it's a streaming system, rollout continues to generate and trainer continues to consume. If rollouter is slower,
+ trainer will trigger parameter synchronization earlier, and rollouter will not actually produce rollout_num samples.
+ When rollout is fast enough, setting staleness_threshold to 1 is basically equivalent to one_step_off policy.
+ To avoid too many expired samples affecting training accuracy, it is recommended to set this value to less than 1.
+
+* `async_training.partial_rollout`
+
+ partial_rollout only actually takes effect when staleness_threshold>0.
+
+* `async_training.use_rollout_log_probs`
+
+ In reinforcement learning algorithms, log_probs have implicit correlations with parameter versions and tokens. Due to
+ the settings of algorithms like PPO/GRPO/DAPO, when calculating importance sampling,
+ old_log_prob must use the log_probs corresponding to the rollout parameters and tokens to ensure algorithm
+ correctness. In the fully
+ async strategy, we default to old_log_prob being calculated by rollout rather than by trainer.
+
+* `async_training.require_batches`
+
+ In streaming training, require_batches should be set to 1, indicating that training is performed after producing
+ enough ppo_mini_batch_size samples.
+ In actual testing, we found that if fewer samples are issued at once, due to the order of data distribution, it can
+ cause training instability and longer response lengths.
+ Here, we additionally provide require_batches for streaming distribution and control the number of samples
+ participating in training at once.
+
+* `async_training.compute_prox_log_prob` (experimental)
+
+ During the training process, we observed that metrics and response lengths may become unstable in the later
+ stages of training. To mitigate this issue, we can use
+ the [Rollout Importance Sampling](https://verl.readthedocs.io/en/latest/advance/rollout_is.html)
+ technique for importance sampling. To utilize Rollout Importance Sampling, we need to compute log_prob using
+ the training engine, which requires enabling this switch.
+ Additionally, when compute_prox_log_prob and Rollout Importance Sampling are enabled under mode d
+ (async stream pipeline with partial rollout), our implementation approximates `Areal's Decoupled PPO`.
+
+* `async_training.checkpoint_engine.enable`
+
+ Enabling the checkpoint engine generally reduces synchronization time overhead by more than 60% compared to
+ the original per-tensor parameter synchronization method. However, assembling buckets incurs additional
+ temporary GPU memory overhead.
+
+* `async_training.checkpoint_engine.overlap_broadcast_and_consume`
+
+ Enabling pipeline between the broadcast and load_weights parameters will allocate additional GPU memory.
+ Since the main time consumption for parameter synchronization is not in the broadcast and load_weights phases,
+ but in the parameter generation phase (by megatron or FSDP), this option is off by default.
+
+* `async_training.checkpoint_engine.device_buffer_size_M`
+
+ It controls the size of the memory buffer used for synchronization when the checkpoint-engine is enabled.
+ The actual `bucket_size` = `max(device_buffer_size_M, maximum parameter tensor size)`.
+ * When enable `overlap_broadcast_and_consume`, the additional device memory overhead of
+ trainer rank is `3 * bucket_size`and rollout rank is `2 * bucket_size`。
+ * When disable `overlap_broadcast_and_consume`, the additional device memory overhead of
+ trainer rank is `2 * bucket_size`and rollout rank is `1 * bucket_size`。
+
+* `async_training.use_trainer_do_validate`
+
+ It controls whether to use the trainer's `do_validate` method for validation.
+ If set to True, the trainer will perform validation after each parameter update. It can reduce the validation time
+ overhead and trainer node idle time.
+ If set to False, the trainer will not perform validation.
+
+### Supported Modes
+
+1. on policy pipeline:
+ 1. **trigger_parameter_sync_step=1, staleness_threshold=0**
+ 2. Rollouter produces `require_batches*ppo_mini_batch_size` samples at once, Trainer fetches these samples for
+ training, and after training completes, Trainer and Rollouter perform a parameter synchronization;
+ 3. During the rollout phase, if there are long-tail samples but few rollout samples, shorter samples cannot fill
+ idle resources, causing some resource waste.
+ 4. As shown in figure a;
+
+2. stream off policy pipeline:
+ 1. **trigger_parameter_sync_step>1, staleness_threshold=0**
+ 2. Synchronous streaming training will be performed. Rollouter produces
+ `require_batches*ppo_mini_batch_size*trigger_parameter_sync_step` samples at once, Trainer performs a local
+ training every time it fetches `require_batches*ppo_mini_batch_size` samples, and after training
+ trigger_parameter_sync_step times, Trainer and Rollouter perform a parameter synchronization;
+ 3. Compared to a, since more samples are generated at once, resource idleness will be lower.
+ 4. In one step training, there will be two periods of resource idleness: when fetching the first batch of samples,
+ train waits for `require_batches*ppo_mini_batch_size` samples to be produced, and during the last parameter
+ update, rollout waits for training to complete.
+ 5. As shown in figure b;
+
+3. async stream pipeline with stale samples:
+ 1. **trigger_parameter_sync_step>=1, staleness_threshold>0, partial_rollout=False**
+ 2. After each parameter update, Rollouter will plan to produce at most rollout_num samples (in practice, the number
+ of samples generated may be less than this value depending on rollout speed).
+ 3. If the rollout process is relatively fast, Rollouter will generate some additional samples num_stale_samples
+ before parameter synchronization for immediate use by Trainer after synchronization.
+ When triggering parameter synchronization, if Rollouter has ongoing tasks, it will wait for the tasks to complete
+ and not add new tasks;
+ 4. Compared to b, except for the first step training, subsequent training will not have the time to wait for the
+ first batch rollout to finish, but will have the time to wait for active tasks to finish.
+ 5. As shown in figure c;
+
+4. async stream pipeline with partial rollout:
+ 1. **trigger_parameter_sync_step>=1, staleness_threshold>0, partial_rollout=True**
+ 2. Compared to c, when triggering parameter synchronization, if Rollouter has samples being produced, it will
+ interrupt the rollout process and perform parameter synchronization. The interrupted samples will continue to be
+ generated after synchronization. This reduces the time to wait for active tasks to finish.
+ 3. As shown in figure d;
+
+
+
+### Key Metrics
+
+| metrics | implication |
+|------------------------------------------------|--------------------------------------------------------------------------------------------------------|
+| `trainer/idle_ratio` | Trainer idle rate |
+| `rollouter/idle_ratio` | Rollouter idle rate |
+| `fully_async/count/stale_samples_processed` | Total number of old samples used in training |
+| `fully_async/count/stale_trajectory_processed` | Total number of old trajectories used in training (one sample produces rollout.n trajectories) |
+| `fully_async/partial/total_partial_num` | Number of partial samples processed by Trainer between two trigger_parameter_sync_step |
+| `fully_async/partial/partial_ratio` | Ratio of partial samples processed by Trainer between two trigger_parameter_sync_step |
+| `fully_async/partial/max_partial_span` | Maximum parameter span of partial samples processed by Trainer between two trigger_parameter_sync_step |
+
+### Parameter Tuning Recommendations
+
+* Resource Allocation and Adjustment:
+ * Reasonable resource allocation is the prerequisite for achieving good training efficiency. The ideal resource
+ allocation should make the rollout time and train time close, thereby minimizing pipeline bubbles in the entire
+ training process,
+ avoiding resource idleness, and ensuring Trainer does not use old samples. In real training scenarios, resource
+ allocation can be adjusted based on the idle time of rollout and train during actual training,
+ which can be obtained from rollouter/idle_ratio and trainer/idle_ratio. If rollouter/idle_ratio is high and
+ trainer/idle_ratio is low,
+ Trainer resources should be increased and Rollouter resources should be reduced, and vice versa.
+
+* Key Parameters:
+ * staleness_threshold: Setting it too high will cause more old samples to be used, affecting model performance. It
+ is recommended to set it to less than 1.
+ * require_batches: The closer to 1, the closer to a pure streaming process, the smaller the training bubbles, and
+ the faster the acceleration effect that can be achieved in terms of speed, but it will affect the order of sample
+ processing;
+ * trigger_parameter_sync_step: The smaller the setting, the closer to on policy, but it will cause frequent
+ parameter synchronization. Long-tail samples waste resources that cannot be filled by short samples, resulting in
+ low resource utilization.
+ The larger the setting, the higher the computational efficiency, but the accuracy will be affected by off policy.
+ * rollout.test_freq: It will occupy Rollouter resources and is not recommended to be set too small.
+
+* Mode Selection: By adjusting different parameters, the Fully Async architecture supports optimization acceleration at
+ different levels, suitable for tasks in different scenarios.
+ * For small-scale tasks that need to ensure training stability and on-policy nature, and have low speed
+ requirements, the on policy pipeline mode (Mode 1) can be tried.
+ * For scenarios that need to improve training throughput but are sensitive to staleness, the stream off policy
+ pipeline mode can be tried. That is, by
+ setting trigger_parameter_sync_step>1 to improve training efficiency, but still maintaining the synchronization
+ mechanism (staleness_threshold=0) (Mode 2).
+ * For large-scale tasks with high training speed requirements and can tolerate a certain degree of off-policy and
+ staleness, setting staleness_threshold>
+ 0 and partial_rollout=True can improve training efficiency, using the async stream pipeline mode (Mode 3 or 4).
+
+### Quick Start
+
+```shell
+rollout_mode="async"
+rollout_name="vllm" # sglang or vllm
+if [ "$rollout_mode" = "async" ]; then
+ export VLLM_USE_V1=1
+ return_raw_chat="True"
+fi
+
+train_prompt_bsz=0
+gen_prompt_bsz=1
+n_resp_per_prompt=16
+train_prompt_mini_bsz=32
+total_rollout_steps=$(((512*400)))
+test_freq=10
+staleness_threshold=0
+trigger_parameter_sync_step=16
+partial_rollout=False
+
+
+python -m recipe.fully_async_policy.fully_async_main \
+ train_batch_size=${train_prompt_bsz} \
+ data.gen_batch_size=${gen_prompt_bsz} \
+ data.return_raw_chat=${return_raw_chat} \
+ actor_rollout_ref.rollout.n=${n_resp_per_prompt} \
+ actor_rollout_ref.actor.strategy=fsdp2 \
+ critic.strategy=fsdp2 \
+ actor_rollout_ref.hybrid_engine=False \
+ actor_rollout_ref.actor.use_dynamic_bsz=${use_dynamic_bsz} \
+ actor_rollout_ref.ref.log_prob_use_dynamic_bsz=${use_dynamic_bsz} \
+ actor_rollout_ref.rollout.log_prob_use_dynamic_bsz=${use_dynamic_bsz} \
+ actor_rollout_ref.rollout.name=${rollout_name} \
+ actor_rollout_ref.rollout.mode=${rollout_mode} \
+ actor_rollout_ref.rollout.calculate_log_probs=True \
+ trainer.nnodes="${NNODES_TRAIN}" \
+ trainer.n_gpus_per_node="${NGPUS_PER_NODE}" \
+ rollout.nnodes="${NNODES_ROLLOUT}" \
+ rollout.n_gpus_per_node="${NGPUS_PER_NODE}" \
+ rollout.total_rollout_steps="${total_rollout_steps}" \
+ rollout.test_freq="${test_freq}" \
+ async_training.staleness_threshold="${staleness_threshold}" \
+ async_training.trigger_parameter_sync_step="${trigger_parameter_sync_step}" \
+ async_training.partial_rollout="${partial_rollout}"
+```
+
+## Experiments
+
+### Asynchronous Training on 7B Model
+
+We used Qwen2.5-Math-7B to verify the benefits of the fully async strategy under long candidates and multiple resources.
+Using the `async stream pipeline with stale samples` strategy, we achieved about 2x performance improvement on 32 cards,
+64 cards, and 128 cards without significantly affecting experimental results.
+
+* Machine: H20
+* Model: Qwen2.5-Math-7B
+* Rollout length: max_response_length FSDP2: 28K tokens;
+* Algorithm: DAPO
+* Dataset: TRAIN_FILE: dapo-math-17k.parquet TEST_FILE: aime-2024.parquet
+* Engine: vllm+FSDP2
+* rollout.n: 16
+* ppo_mini_batch_size: 32
+* test_freq: 20
+
+* colocate sync:
+ * step: 400
+ * train_batch_size: 512
+
+* fully_async_policy
+ * total_rollout_steps: 512*400
+ * require_batches: 4
+ * trigger_parameter_sync_step: 4
+ * staleness_threshold: 0.5
+ * partial_rollout: True
+
+| training mode | resource allocation | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | total time
300 step | total time
400 step | acc/mean@1 |
+|:--------------------:|:---------------------:|:--------:|:--------:|:--------------:|:---------------:|:------------------------:|:------------------------:|:------------------------:|:------------------------:|:-------------------------------:|
+| colocate sync | 32 | 790.10 | 357.41 | 107.71 | 269.80 | 13h 44m | 1d 3h 43m | 2d 9h 22m | 3d 17h 5m | max: 0.3313
last: 0.2448 |
+| fully_async_policy | 16:16 | 294.77 | 21.26 | \ | 313.81 | 7h 58m
(1.72x) | 16h 21m
(1.70x) | 1d 0h 53m
(2.31x) | 1d 9h 26m
(2.66x) | max: 0.3302
last: 0.2333 |
+| colocate sync | 64 | 365.28 | 150.72 | 70.26 | 133.41 | 10h 22m | 20h 45m | 1d 7h 6m | 1d 17h 32m | max: 0.3365
last: 0.2333 |
+| fully_async_policy | 32:32 | 189.26 | 28.46 | \ | 156.98 | 4h 57m
(2.09x) | 10h 14m
(2.03x) | 16h 58m
(1.83x) | 21h 40m
(1.92x) | max: 0.3677
last: 0.3406 |
+| colocate sync | 128 | 356.30 | 177.85 | 53.92 | 113.81 | 8h 36m | 17h 56m | 1d 5h 6m | 1d 16h 48m | max: 0.3573
last: 0.2958 |
+| fully_async_policy | 64:64 | 150.63 | 33.14 | \ | 113.16 | 3h 13m
(2.67x) | 6h 46m
(2.65x) | 10h 53m
(2.67x) | 17h 22m
(2.35x) | max: 0.3521
last: 0.3094 |
+
+> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-colocate_async?nw=nwuserhouzg
+
+### 128-card 7B Asynchronous Mode Experiment
+
+We used Qwen2.5-Math-7B to verify the effects of various modes supported by fully async.
+We can see that the benefit brought by streaming is approximately 1.6x, and after combining staleness and
+partial_rollout, the benefit reaches 2.35x.
+
+| mode | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | total time
300 step | total time
400 step | acc/mean@1 |
+|:-------------------------------------------------------------------------------------------------------:|:--------:|:--------:|:--------------:|:--------------:|:------------------------:|:------------------------:|:------------------------:|:------------------------:|:------------------------------:|
+| colocate sync | 356.30 | 177.85 | 53.92 | 113.81 | 8h 36m | 17h 56m | 1d 5h 6m | 1d 16h 48m | max: 0.3573
last: 0.2958 |
+| `stream off policy pipeline`
(+fully async: trigger_parameter_sync_step= 4,
require_batches= 4) | 231.34 | 128.47 | \ | 98.77 | 4h 25m | 9h 41m | 15h 2m | 1d 1h 53m | max: 0.2844
last: 0.2604 |
+| `async stream pipeline with stale samples`
(+staleness_threshold=0.5) | | | | | | | | | |
+| `async stream pipeline with partial rollout`
(+partial_rollout=True) | 150.63 | 33.14 | \ | 113.16 | 3h 13m | 6h 46m | 10h 53m | 17h 22m | max: 0.3521
last: 0.3094 |
+
+> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-stream_stale_partial?nw=nwuserhouzg
+
+### 128-card Stale Ablation Experiment
+
+Under the `async stream pipeline with partial rollout` mode, we verified the impact of staleness settings on training
+efficiency.
+We found that the larger the staleness, the more obvious the final gains.
+We also noticed that the times for staleness values of 0.3 and 0.5 are quite close, because as the training steps
+increase, the response length changes significantly, causing training instability.
+Further analysis and optimization are needed for this issue.
+
+| staleness_threshold | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | total time
300 step | total time
400 step | acc/mean@1 |
+|:---------------------:|:--------:|:--------:|:--------------:|:--------------:|:------------------------:|:------------------------:|:------------------------:|:------------------------:|:-----------------------------:|
+| 0 | 231.34 | 128.47 | \ | 98.77 | 4h 25m | 9h 41m | 15h 2m | 1d 1h 53m | max: 0.2844
last: 0.2604 |
+| 0.1 | 171.30 | 58.17 | \ | 109.12 | 3h 53m | 8h 37m | 14h 25m | 19h 59m | max: 0.3542
last: 0.2979 |
+| 0.3 | 146.11 | 38.88 | \ | 103.22 | 3h 18m | 6h 49m | 11h 40m | 17h 20m | max: 0.3469
last: 0.2865 |
+| 0.5 | 150.63 | 33.14 | \ | 113.16 | 3h 13m | 6h 46m | 10h 53m | 17h 22m | max: 0.3521
last: 0.3094 |
+
+> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-stream_stale_partial?nw=nwuserhouzg
+
+### 128-card 7B require_batches Ablation Experiment
+
+In multiple tests, we found that the number of samples issued each time in streaming affects the response length during
+training, which in turn affects training time. We verified the impact on results by modifying
+`async_training.require_batches`.
+
+| require_batches | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | total time
300 step | acc/mean@1 |
+|:-----------------:|:--------:|:-------:|:--------------:|:--------------:|:------------------------:|:------------------------:|:------------------------:|:-----------------------------:|
+| 1 | 203.47 | 30.88 | \ | 181.08 | 3h 31m | 8h 29m | 17h 36m | max: 0.349
last: 0.326 |
+| 2 | 158.72 | 26.32 | \ | 128.08 | 3h 35m | 7h 38m | 13h 57m | max: 0.351
last: 0.3406 |
+| 4 | 124.64 | 25.62 | \ | 95.06 | 3h 13m | 6h 46m | 10h 53m | max: 0.3521
last: 0.3521 |
+
+> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-ablation_require_batches?nw=nwuserhouzg
+
+### 30B Model Mode Experiment
+
+We achieved a 1.7x performance improvement with `async stream pipeline with staleness samples` strategy on the
+Qwen3-30B-A3B-Base model compared to the colocate setup. It is worth noting that this is far from the upper limit of
+performance gains achievable through asynchrony. Firstly, the comparative experiments used a maximum response length of
+only 8k, which is much shorter than the 20k sequence length in previous experiments, resulting in a less pronounced
+rollout tail effect. Secondly, we adopted a highly skewed resource allocation, with rollout using 96 GPUs and trainer
+using 32 GPUs, which is not an optimal configuration. During the experiments, we observed that the current verl
+implementation imposes certain constraints, such as requiring data to be evenly divisible by the number of GPUs, making
+resource adjustment less flexible. Additionally, as asynchronous training and deployment accelerate, the performance gap
+is gradually narrowing. Therefore, enabling more flexible resource allocation and dynamic resource adjustment in the
+future will be our next focus.
+
+* Machine: H20
+* Model: Qwen3-30B-A3B-Base
+* Rollout length: max_response_length : 8K tokens;
+* Algorithm: GRPO
+* Dataset: TRAIN_FILE: dapo-math-17k.parquet TEST_FILE: aime-2024.parquet
+* Engine: vllm+Megatron
+* rollout.n: 16
+* ppo_mini_batch_size: 128
+* test_freq: 20
+
+* colocate sync:
+ * step:400
+ * train_batch_size: 512
+
+* fully_async_policy
+ * total_rollout_steps: 512*400
+ * trigger_parameter_sync_step: 512/128 = 4
+ * staleness_threshold: 0.5
+ * partial_rollout: True
+
+| Training Mode | Resource Allocation | Step | Gen | Old Log Prob | Ref | Update Actor | Total Time 100 Step | Total Time 200 Step | Total Time 300 Step | Total Time 400 Step | Acc/Mean@1 |
+|--------------------|---------------------|--------|--------|--------------|-------|--------------|---------------------|---------------------|---------------------|---------------------|-----------------------------|
+| Colocate Sync | 128 | 497.89 | 348.05 | 28.73 | 20.86 | 86.27 | 13h 36m | 1d 3h 48m | 1d 19h 4m | 2d 11h 39m | max: 0.3500
last: 0.3208 |
+| Fully Async Policy | 96:32 | 282.75 | 22.06 | \ | 50.05 | 206.63 | 6h 45m (2.01x) | 14h 48m (1.88x) | 1d 0h 9m (1.78x) | 1d 10h 41m (1.72x) | max: 0.3813
last: 0.3448 |
+
+> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-30B?nw=nwuserhouzg | | |
+
+### checkpoint-engine Ablation Experiment
+We tested the single-step parameter synchronization time of the checkpoint-engine on three models: Qwen2.5-Math-7B, Qwen3-30B-A3B, and Qwen3-235B-A22B, using default checkpoint-engine configurations. All experiments were performed on H20 machines, and the Megatron engine was used for training.
+| model | trainer rank | rollout rank | checkpoint-engine | total sync time |
+|:-----------------:|:--------:|:-------:|:--------------:|:--------------:|
+| Qwen2.5-Math-7B | 4 | 4 | False | 0.12s |
+| Qwen2.5-Math-7B | 4 | 4 | True | 0.02s |
+| Qwen3-30B-A3B | 16 | 16 | False | 15.76s |
+| Qwen3-30B-A3B | 16 | 16 | True | 4.38s |
+| Qwen3-235B-A22B | 64 | 64 | False | 58.57s |
+| Qwen3-235B-A22B | 64 | 64 | True | 23.70s |
+
+
+### use_trainer_do_validate Experiment
+We tested the effect of setting `use_trainer_do_validate=True` on the training process. The results show that setting
+this parameter to True can reduce the validation time overhead and trainer node idle time.
+We used Qwen2.5-Math-7B to verify the benefits of `use_trainer_do_validate=True` on the training process, we achieved about 2x performance improvement on validation time, and the trainer node idle time is reduced by about 40%.
+
+* Machine: H20
+* Model: Qwen2.5-Math-7B
+* Rollout length: max_response_length FSDP2: 10K tokens;
+* Algorithm: DAPO
+* Dataset: TRAIN_FILE: dapo-math-17k.parquet TEST_FILE: aime-2024.parquet
+* Engine: vllm+FSDP2
+* rollout.n: 16
+* ppo_mini_batch_size: 32
+* test_freq: 10
+
+* fully_async_policy
+ * total_rollout_steps: 512*400
+ * require_batches: 4
+ * trigger_parameter_sync_step: 4
+ * staleness_threshold: 0.5
+ * partial_rollout: True
+
+| training mode | resource allocation | step | gen | old_log_prob | update_actor | validate time | total time
50 step | acc/mean@2 |
+|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|
+| colocate sync | 16 | 484.623 | 52.939 | 0 | 430.263 | 205.080 | 7h9m | 22.6 |
+| fully_async_policy | 8:8 | 489.953 | 52.622 | 0 | 435.874 | 95.699 | 7h2m | 21.0 |
+
+
+## Multi-Turn Tool Calling
+
+Referencing **recipe/retool** and **ToolAgentLoop**, we implemented **AsyncPartialToolAgentLoop**, a multi-turn
+tool-calling loop that supports partial_rollout for **fully_async_policy**.
+
+### Core Design
+
+`AsyncPartialToolAgentLoop` inherits from `ToolAgentLoop` and is adapted for the asynchronous training mode of
+`fully_async_policy`. When `partial_rollout=True`, the Rollouter interrupts ongoing generation tasks before
+synchronizing parameters with the Trainer. `AsyncPartialToolAgentLoop` is capable of:
+
+1. **Interrupting Tasks**: Responding to an interrupt signal to save the current state. Currently, interruptions occur
+ during the `GENERATING` process or after other states have completed.
+2. **Resuming Tasks**: Resuming execution from the saved state after parameter synchronization is complete, rather than
+ starting over.
+
+### How to Use
+
+RL training with multi-turn tool calling in `fully_async_policy` is similar to `recipe/retool`. It is enabled by
+specifying `multi_turn` configurations in the config file.
+
+1. **SFT Stage**: First, the model should undergo SFT to learn how to follow tool-calling format instructions.
+2. **Multi-turn Configuration**: In the `fully_async_policy` training configuration, set the following parameters:
+ ```yaml
+ actor_rollout_ref:
+ rollout:
+ multi_turn:
+ enable: True # AsyncPartialToolAgentLoop will be used by default in fully_async_policy mode
+ # Other multi_turn related configurations
+ ```
+3. **Async Parameters**: To improve efficiency, enable `partial_rollout` and `staleness_threshold` when using multi-turn
+ tool calling:
+ ```yaml
+ async_training:
+ partial_rollout: True
+ staleness_threshold: 0.5
+ # Other async parameters
+ ```
+4. **Example**: See `recipe/fully_async_policy/shell/dapo_7b_async_retool.sh`.
+
+### Experimental Results
+
+To validate the performance of `fully_async_policy` on multi-turn tool-calling tasks, we compared it with the standard
+`colocate` synchronous mode. Key parameter settings are as follows.
+
+* **SFT Model**: Based on `Qwen2.5-7B-Instruct`, trained for 6 epochs on the `ReTool-SFT` dataset
+* **RL Algorithm**: DAPO
+* **Dataset**:
+ * Train: `DAPO-Math-17k`
+ * Test: `aime_2025`
+* **Resource and Mode Comparison**:
+ * `colocate sync`: 32 H20 gpus
+ * `fully_async_policy`: 16 gpus for Trainer + 16 gpus for Rollouter
+* **Key Configurations**:
+ 1. **Tool Calling Configuration**:
+ * `multi_turn.enable: True`
+ * `multi_turn.max_user_turns: 16`
+ * `multi_turn.max_assistant_turns: 16`
+ * `multi_turn.tool_config_path: recipe/retool/sandbox_fusion_tool_config.yaml`
+ 2. **`colocate sync` Configuration**:
+ * `ppo_mini_batch_size: 16`
+ * `train_batch_size: 64`
+ 3. **`fully_async_policy` Configuration**:
+ * `ppo_mini_batch_size: 16`
+ * `trigger_parameter_sync_step: 4`
+ * `require_batches: 1`
+ * `staleness_threshold: 1`
+ * `partial_rollout: True`
+
+| training mode | Resource allocation | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | aime_2025
acc/mean@30 |
+|:--------------------:|:---------------------:|:---------:|:---------:|:--------------:|:--------------:|:------------------------:|:------------------------:|:-------------------------------:|
+| colocate | 32 | 375.47 | 228.03 | 35.19 | 111.84 | 9h 46m | 22h 28m | start:0.1078
last:0.2056 |
+| fully_async_policy | 16: 16 | 221.36 | 40.59 | \ | 179.58 | 6h 19m
(1.55x) | 14h 4m
(1.60x) | start:0.11
last:0.2044 |
+
+> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-multiturn-tool?nw=nwuserhouzg
+
+## Future Plans
+
+* GRPO experiments
+* Megatron adaptation
+* SGLang integration
+* Transfer queue integration
+* Asynchronous parameter synchronization
+* AReaL asynchronous algorithm implementation
+* TPPO algorithm implementation
+* Multi-turn and Tool support
\ No newline at end of file
diff --git a/recipe/fully_async_policy/README_zh.md b/recipe/fully_async_policy/README_zh.md
new file mode 100644
index 00000000000..92af46dce59
--- /dev/null
+++ b/recipe/fully_async_policy/README_zh.md
@@ -0,0 +1,517 @@
+# Recipe: Fully Async Policy Trainer
+
+**Author:** `https://github.com/meituan-search`
+
+Last updated: 12/15/2025.
+
+本文档介绍了完全异步PPO训练系统,该系统实现了 Trainer 和 Rollouter 的完全解耦,支持异步样本生成和训练。
+在该系统下,我们使用128卡训练qwen2.5-7B模型取得了2.35x-2.67x的性能提升,同时效果没有显著受到影响。
+
+## Introduction
+
+### Background
+
+rollout和train分离架构相较于colocate的架构能够更加灵活地分配资源,设计更加灵活的训练逻辑,从而处理长尾等问题带来的GPU利用率低,训练效率低的问题。
+one_step_off_policy通过分离架构的设计并进行rollout和train一轮异步的训练方法,缓解了rollout时间过长的问题,并在训练效率上取得了一些收益,
+但其强制使用一轮异步的数据,存在不够灵活等问题,而且并不能完全去除长尾对训练效率带来的的影响;在其他框架如areal、Magistral、streamrl、asyncflow上,
+已经基于分离架构实现了异步训练、流式训练,并取得了收益;我们借鉴其方法,在verl上进行了实现。fully_async_policy支持异步、流式、partial
+rollout的训练, 通过合理设置资源分配情况、参数同步频率等参数,fully_async_policy能够显著提高训练效率。
+
+> Magistral https://arxiv.org/abs/2506.10910
+>
+> AReaL: A Large-Scale Asynchronous Reinforcement Learning System for Language
+> Reasoning https://arxiv.org/abs/2505.24298
+>
+> StreamRL: Scalable, Heterogeneous, and Elastic RL for LLMs with Disaggregated Stream
+> Generation https://arxiv.org/abs/2504.15930
+>
+> AsyncFlow: An Asynchronous Streaming RL Framework for Efficient LLM Post-Training https://arxiv.org/abs/2507.01663
+>
+
+### 核心贡献
+
+* **资源隔离**:与使用hybrid_engine不同,Rollouter和Trainer使用分离的计算资源,需要分别指定所占用的资源。
+* **生成与训练并行**:Trainer在训练的同时,Rollouter在生成新的样本。
+* **多步异步**: 相比 one step off policy 支持0.x步到多步的异步设定,异步方案更加灵活。
+* **nccl参数同步**:基于nccl通信原语,参考[checkpoint-engine](https://github.com/MoonshotAI/checkpoint-engine)实现Rollouter与Trainer间的高效参数同步。
+* **Stream推理与训练**:Rollouter逐样本生成数据,同时数据传输以单个sample为最小传输单位。
+* **异步训练与新鲜度控制**:通过设置参数async_training.staleness_threshold,支持使用旧参数生成的样本进行训练。
+* **PartialRollout**: Rollouter推理过程支持partial rollout逻辑,通过参数同步时,添加`sleep()`和`resume()`
+ 逻辑,保存进行中的rollout的样本,并在下一次rollout中继续使用,减少参数同步等待进行中的任务结束时间。
+
+目前支持使用模式为 megatron/fsdp+vllm。vllm必须使用基于AgentLoop的server模式。
+
+## 设计
+
+fully_async_policy的整体架构如下图所示,fully_async_policy主要由Rollouter、MessageQueue、Trainer、ParameterSynchronizer四部分组成。
+
+
+
+1. Rollouter逐样本生成序列,并将生成的sample放入MessageQueue中,生产的速度受新鲜度控制。
+2. MessageQueue用于暂存Rollouter生成的sample。
+3. Trainer逐样本从MessageQueue中获取,获取到`require_batches*ppo_mini_batch_size`
+ 数量的样本后,就会进行训练,训练async_training.trigger_parameter_sync_step轮后,触发与Rollouter的一次参数同步。
+4. ParameterSynchronizer 实现了Nccl的同步参数同步能力。
+
+当前方案对比base的收益来源,在于colocate情况下,rollout使用更多的资源无法解决长尾样本带来的空闲,
+当我们进行资源隔离后,rollout的时间和train的时间都可能相较于之前更长(因为使用的资源变少了),
+但是相互之间的耗时overlap,端到端的耗时反而有所缩减。
+
+
+
+## 使用方式
+
+### 参数说明
+
+| super params | implication |
+|------------------------------------------------------|-----------------------------------------------------------------|
+| `trainer.nnodes` | Trainer的node数量 |
+| `trainer.n_gpus_per_node` | Trainer每个node上gpu的数量 |
+| `rollout.nnodes` | Rollouter的node数量 |
+| `rollout.n_gpus_per_node` | Rollouter每个node上gpu的数量 |
+| `data.train_batch_size` | 在fully async策略中,该值不生效(默认设置为0) |
+| `data.gen_batch_size` | 在fully async策略中,使用流式的样本生产逻辑(默认设置为1) |
+| `rollout.total_rollout_steps` | 总的rollout的sample数量 |
+| `rollout.test_freq` | Rollouter每更新多少次参数,进行一次validation |
+| `actor_rollout_ref.actor.ppo_mini_batch_size` | The ppo_mini_batch_size is a global num across all workers/gpus |
+| `async_training.require_batches` | FullyAsyncTrainer一次性获取的ppo_mini_batch_size的数量 |
+| `async_training.trigger_parameter_sync_step` | 表示FullyAsyncTrainer进行多少次本地更新后,进行一次参数同步 |
+| `async_training.staleness_threshold` | 新鲜度控制 |
+| `async_training.partial_rollout` | 是否进行partial_rollout |
+| `async_training.use_rollout_log_probs` | 使用rollout产生的log_probs |
+| `async_training.compute_prox_log_prob`(experimental) | 是否在train阶段,使用train模型的参数计算token的 log_prob |
+| `async_training.checkpoint_engine.enable`| 是否开启checkpoint_engine模式的加速,默认值True |
+| `async_training.checkpoint_engine.overlap_broadcast_and_consume` | 启动checkpoint_engine时,是否在参数同步时在broadcast和加载之间使用流水,默认值False|
+| `async_training.checkpoint_engine.device_buffer_size_M` | 启动checkpoint_engine时,组装的bucket的大小(MB),默认为4096 |
+| `async_training.use_trainer_do_validate` | 是否使用Trainer的do_validate方法进行validation,默认值False |
+
+**进一步的解释:**
+
+* `rollout.total_rollout_steps`
+
+ 与 colocate 相比,数量可以通过 train_batch_size 与 step 相乘对齐:
+ `rollout.total_rollout_steps = data.train_batch_size * step`。
+
+* `async_training.trigger_parameter_sync_step`
+
+ 在fully async策略中,表示Trainer进行多少次本地更新后(也就是获取多少次`require_batches * ppo_mini_batch_size`数量样本),
+ 与Rollouter之间进行一次参数同步。
+ 每两次Rollouter和Trainer参数同步之间,Trainer将会处理`trigger_parameter_sync_step* require_batches\
+ ppo_mini_batch_size`份sample。
+ 如果为了与colocate在公平的情况下对比速度,trigger_parameter_sync_step应该设置为 `data.train_batch_size / (
+ require_batches * ppo_mini_batch_size)`。
+
+* `async_training.staleness_threshold`
+
+ 在fully async策略中,表示最大允许使用的staleness样本的比例。
+
+ * staleness_threshold=0,表示同步训练。
+ Rollouter两次参数更新之间将会生成固定数量的样本,样本数为:
+ $$rollout\_num = (trigger\_parameter\_sync\_step*require\_batches*ppo\_mini\_batch\_size)$$
+ * staleness_threshold>0,表示异步训练, 可以设置为小数,支持更灵活的异步调用。
+ Rollouter两次参数更新之间将会最多生成的样本数为:
+ $$rollout\_num = (1+staleness\_threshold)*(trigger\_parameter\_sync\_step*require\_batches*ppo\_mini\_batch\_size) - num\_staleness\_sample $$
+
+ num_staleness_sample 表示上一次rollout多生成的陈旧样本数。
+
+ 由于是流式系统,rollout持续生成,trainer持续消费。如果rollouter较慢,trainer会更早触发参数同步,rollouter并不会实际生产rollout_num个样本。
+ 当rollout 足够快时,staleness_threshold设置为1,基本上等价于one_step_off policy。
+ 为了避免过期样本太多影响训练精度,建议该值设置小于1。
+
+* `async_training.partial_rollout`
+
+ partial_rollout只会在staleness_threshold>0时才实际上起作用。
+
+* `async_training.use_rollout_log_probs`
+
+ 在强化学习算法中,log_probs与参数版本,token都存在隐性的相关性。由于PPO/GRPO/DAPO等算法的设定,我们在计算重要性采样时,
+ 即 old_log_prob必须使用rollout参数及token所对应log_probs,才能保证算法的正确性。在fully
+ async策略中,我们默认old_log_prob是有rollout所计算的,而不是由trainer所计算。
+
+* `async_training.require_batches`
+
+ 在流式训练中,require_batches 应该设置为1,表示生产够ppo_mini_batch_size样本后,就进行训练。
+ 在实际测试中,我们发现,如果单次下发的样本较少,由于数据分发的顺序,会导致训练不稳定,response 长度变长。
+ 在这里,我们额外提供 require_batches 进行流式分发,单次参与训练的样本数量控制。
+
+* `async_training.compute_prox_log_prob` (experimental)
+
+ 我们在训练过程中,观测到随着训练的进行,训练后期指标和response长度可能会出现不稳定的情况,
+ 这里我们可以使用 [Rollout Importance Sampling](https://verl.readthedocs.io/en/latest/advance/rollout_is.html) 的技术进行
+ 重要性采样,缓解这一问题。为了使用 `Rollout Importance Sampling` 我们需要使用训练引擎使用当前的参数版本计算old_log_prob,此开关需要打开。
+ 此外,在 mode d (async stream pipeline with partial rollout) 的情况下开启 `compute_prox_log_prob` 以及
+ `Rollout Importance Sampling` 后,我们的实现已近似Areal的 `Decoupled PPO`。
+
+* `async_training.checkpoint_engine.enable`
+
+ 开启checkpoint engine后,相较于原始的逐tensor的参数同步方式,同步时间开销普遍可以降低60%以上。但是组装bucket会带来额外的临时显存开销。
+
+* `async_training.checkpoint_engine.overlap_broadcast_and_consume`
+
+ 开启参数broadcast和load_weights之间的流水后,会进一步额外申请更多显存。由于目前分析参数同步的主要耗时并非来自broadcast和load_weights阶段,而是在参数生成阶段(由megatron或FSDP),因此该开关默认关闭。
+
+* `async_training.checkpoint_engine.device_buffer_size_M`
+
+ 控制开启checkpoint engine后,用于同步的显存buffer大小。实际的`bucket_size` = `max(device_buffer_size_M, 最大参数tensor size)`
+ * 在开启`overlap_broadcast_and_consume`时,trainer节点的临时额外显存开销为 `3 * bucket_size`, rollout节点的临时额外显存开销为`2 * bucket_size`。
+ * 在关闭`overlap_broadcast_and_consume`时,trainer节点的临时额外显存开销为 `2 * bucket_size`, rollout节点的临时额外显存开销为`1 * bucket_size`。
+
+* `async_training.use_trainer_do_validate`
+
+ 控制是否使用trainer的`do_validate`方法进行validation。
+ 如果设置为True,trainer会在每次参数更新后,调用`do_validate`方法进行validation。
+ 如果设置为False,trainer不会调用`do_validate`方法。
+
+### 模式支持
+
+1. on policy pipeline:
+ 1. **trigger_parameter_sync_step=1,staleness_threshold=0**
+ 2. Rollouter一次生产`require_batches*ppo_mini_batch_size`
+ 的samples,Trainer获取这些samples后进行训练,训练完后Trainer和Rollouter之间进行一次参数同步;
+ 3. 在rollout阶段,如果存在长尾的样本,但是rollout样本数较少时,较短的样本无法填充到空闲的资源中,会造成一定的资源浪费。
+ 4. 如图a所示;
+
+2. stream off policy pipeline:
+ 1. **trigger_parameter_sync_step>1,staleness_threshold=0**
+ 2. 将会进行同步的流式训练,Rollouter一次生产`require_batches*ppo_mini_batch_size*trigger_parameter_sync_step`
+ 的samples,Trainer每获取`require_batches*ppo_mini_batch_size`
+ 就进行一次本地训练,训练trigger_parameter_sync_step次后,Trainer和Rollouter之间进行一次参数同步;
+ 3. 相较于a,由于一次生成的样本更多,资源的空闲会更低。
+ 4. 在一次step训练中,会存在两次资源闲置的时间,分别是在第一次获取样本时,train等待`require_batches*ppo_mini_batch_size`
+ 个样本生产,以及最后一次参数更新时,rollout等待训练完成。
+ 5. 如图b所示;
+
+3. async stream pipeline with staleness samples:
+ 1. **trigger_parameter_sync_step>=1,staleness_threshold>0,partial_rollout=Flase**
+ 2. Rollouter在每次参数更新后将计划最多生产rollout_num个样本(实际根据rollout速度,生成的样本可能会少与这个值)。
+ 3. 如果rollout过程比较快,Rollouter将会在参数同步前额外生成一部分样本num_stale_samples,用于参数同步后立即给Trainer使用。
+ 触发参数同步时,如果Rollouter有正在生产的任务,将会等待任务完成,同时不会添加新的任务;
+ 4. 相较于b,除第一次step训练外,后续的训练都不会有wait first batch rollout finish的时间,但是会有wait active task
+ finish的时间。
+ 5. 如图c所示;
+
+4. async stream pipeline with partial rollout:
+ 1. **trigger_parameter_sync_step>=1,staleness_threshold>0,partial_rollout=True**
+ 2. 相较于c,触发参数同步时,Rollouter如果有正在生产的sample,会打断rollout过程并进行参数同步,被中断的sample会在参数同步后继续生成。减少了wait
+ active task finish的时间。
+ 3. 如图d所示;
+
+
+
+### 关键指标
+
+| metrics | implication |
+|------------------------------------------------|-----------------------------------------------------------|
+| `trainer/idle_ratio` | Trainer闲置率 |
+| `rollouter/idle_ratio` | Rollouter闲置率 |
+| `fully_async/count/stale_samples_processed` | 训练使用的旧sample总数 |
+| `fully_async/count/stale_trajectory_processed` | 训练使用的旧trajectory总数(一个sample会生产rollout.n条trajectory) |
+| `fully_async/partial/total_partial_num` | 两次trigger_parameter_sync_step之间Trainer处理的partial样本数 |
+| `fully_async/partial/partial_ratio` | 两次trigger_parameter_sync_step之间Trainer处理的partial样本的比例 |
+| `fully_async/partial/max_partial_span` | 两次trigger_parameter_sync_step之间Trainer处理的partial样本的最大参数跨度 |
+
+### 调参建议
+
+* 资源分配与调整:
+ * 合理的资源分配是获得好的训练效率的前提。理想的资源分配情况应该是使得Rollout的时间和Train的时间接近,从而使得整个训练过程流水气泡最小,
+ 避免资源闲置,同时Trainer不会使用旧样本。在真实训练场景下,可以根据实际训练过程中rollout和train的空闲时间调整资源分配,
+ 可从rollouter/idle_ratio和trainer/idle_ratio获得,如果rollouter/idle_ratio较高trainer/idle_ratio较低,
+ 应该增多Trainer的资源减少Rollouter的资源,反之亦然。
+
+* 关键参数:
+ * staleness_threshold: 设置太大会导致较多的旧样本使用,影响模型效果,建议设置小于1。
+ * require_batches:越接近1,越接近纯流式过程,训练过程中bubble越小,能够在速度上获得更快的加速效果,但会对样本的处理顺序产生影响;
+ * trigger_parameter_sync_step: 设置的越小越接近on policy,但会导致频繁的参数同步,长尾样本浪费的资源无法被短样本填充,资源利用率低。
+ 设置的越大有更高的计算效率,但是精度上会受到off policy的影响。
+ * rollout.test_freq: 会占用Rollouter资源,不建议设置太小。
+
+* 模式选择:通过调整不同的参数,Fully Async架构支持不同程度上的优化加速,适用于不同场景的任务。
+ * 对于小规模任务,需要保证训练的稳定性和 on-policy 性,对速度要求不高的场景,可以尝试使用on policy pipeline的模式(模式1)。
+ * 对于需要提高训练吞吐量,但对 staleness 敏感的场景,可以尝试使用 stream off policy pipeline 的模式。即通过
+ 设置trigger_parameter_sync_step>1 ,提高 训练效率,但仍保持同步机制 (staleness_threshold=0 )(模式2)。
+ * 对于大规模任务,对训练速度有较高要求,且可以容忍一定 off-policy 程度、staleness的场景,可以设置staleness_threshold>
+ 0、partial_rollout=True提高训练效率,使用 async stream pipeline 模式(模式 3 或 4)。
+
+### 快速开始
+
+```shell
+rollout_mode="async"
+rollout_name="vllm" # sglang or vllm
+if [ "$rollout_mode" = "async" ]; then
+ export VLLM_USE_V1=1
+ return_raw_chat="True"
+fi
+
+train_prompt_bsz=0
+gen_prompt_bsz=1
+n_resp_per_prompt=16
+train_prompt_mini_bsz=32
+total_rollout_steps=$(((512*400)))
+test_freq=10
+staleness_threshold=0
+trigger_parameter_sync_step=16
+partial_rollout=False
+
+
+python -m recipe.fully_async_policy.fully_async_main \
+ train_batch_size=${train_prompt_bsz} \
+ data.gen_batch_size=${gen_prompt_bsz} \
+ data.return_raw_chat=${return_raw_chat} \
+ actor_rollout_ref.rollout.n=${n_resp_per_prompt} \
+ actor_rollout_ref.actor.strategy=fsdp2 \
+ critic.strategy=fsdp2 \
+ actor_rollout_ref.hybrid_engine=False \
+ actor_rollout_ref.actor.use_dynamic_bsz=${use_dynamic_bsz} \
+ actor_rollout_ref.ref.log_prob_use_dynamic_bsz=${use_dynamic_bsz} \
+ actor_rollout_ref.rollout.log_prob_use_dynamic_bsz=${use_dynamic_bsz} \
+ actor_rollout_ref.rollout.name=${rollout_name} \
+ actor_rollout_ref.rollout.mode=${rollout_mode} \
+ actor_rollout_ref.rollout.calculate_log_probs=True \
+ trainer.nnodes="${NNODES_TRAIN}" \
+ trainer.n_gpus_per_node="${NGPUS_PER_NODE}" \
+ rollout.nnodes="${NNODES_ROLLOUT}" \
+ rollout.n_gpus_per_node="${NGPUS_PER_NODE}" \
+ rollout.total_rollout_steps="${total_rollout_steps}" \
+ rollout.test_freq="${test_freq}" \
+ async_training.staleness_threshold="${staleness_threshold}" \
+ async_training.trigger_parameter_sync_step="${trigger_parameter_sync_step}" \
+ async_training.partial_rollout="${partial_rollout}"
+```
+
+## 实验
+
+### 在7B模型上进行异步训练
+
+我们使用 Qwen2.5-Math-7B 验证 fully async 策略在长候选下,多种资源下的收益情况。
+使用`async stream pipeline with staleness samples` 策略,我们在32卡,64卡,128卡都取得2x左右的性能提升,同时没有显著影响实验效果。
+
+* 机器:H20
+* 模型:Qwen2.5-Math-7B
+* rollout长度:max_response_length FSDP2: 28K tokens;
+* 算法:DAPO
+* 数据集: TRAIN_FILE: dapo-math-17k.parquet TEST_FILE: aime-2024.parquet
+* engine: vllm+FSDP2
+* rollout.n: 16
+* ppo_mini_batch_size: 32
+* test_freq: 20
+
+* colocate sync:
+ * step: 400
+ * train_batch_size: 512
+
+* fully_async_policy
+ * total_rollout_steps: 512*400
+ * require_batches: 4
+ * trigger_parameter_sync_step: 4
+ * staleness_threshold: 0.5
+ * partial_rollout: True
+
+| training mode | resource allocation | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | total time
300 step | total time
400 step | acc/mean@1 |
+|:--------------------:|:---------------------:|:--------:|:--------:|:--------------:|:--------------:|:------------------------:|:------------------------:|:------------------------:|:------------------------:|:-------------------------------:|
+| colocate sync | 32 | 790.10 | 357.41 | 107.71 | 269.80 | 13h 44m | 1d 3h 43m | 2d 9h 22m | 3d 17h 5m | max: 0.3313
last: 0.2448 |
+| fully_async_policy | 16:16 | 294.77 | 21.26 | \ | 313.81 | 7h 58m
(1.72x) | 16h 21m
(1.70x) | 1d 0h 53m
(2.31x) | 1d 9h 26m
(2.66x) | max: 0.3302
last: 0.2333 |
+| colocate sync | 64 | 365.28 | 150.72 | 70.26 | 133.41 | 10h 22m | 20h 45m | 1d 7h 6m | 1d 17h 32m | max: 0.3365
last: 0.2333 |
+| fully_async_policy | 32:32 | 189.26 | 28.46 | \ | 156.98 | 4h 57m
(2.09x) | 10h 14m
(2.03x) | 16h 58m
(1.83x) | 21h 40m
(1.92x) | max: 0.3677
last: 0.3406 |
+| colocate sync | 128 | 356.30 | 177.85 | 53.92 | 113.81 | 8h 36m | 17h 56m | 1d 5h 6m | 1d 16h 48m | max: 0.3573
last: 0.2958 |
+| fully_async_policy | 64:64 | 150.63 | 33.14 | \ | 113.16 | 3h 13m
(2.67x) | 6h 46m
(2.65x) | 10h 53m
(2.67x) | 17h 22m
(2.35x) | max: 0.3521
last: 0.3094 |
+
+> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-colocate_async?nw=nwuserhouzg
+
+### 128卡 7B 异步模式实验
+
+我们使用 Qwen2.5-Math-7B 验证 fully async 所支持的各个模式的效果。
+我们可以看到 stream 带来的收益大约1.6x,叠加 staleness 和 partial_rollout 后,收益为2.35x。
+
+| mode | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | total time
300 step | total time
400 step | acc/mean@1 |
+|:-------------------------------------------------------------------------------------------------------:|:--------:|:--------:|:--------------:|:--------------:|:------------------------:|:------------------------:|:------------------------:|:------------------------:|:------------------------------:|
+| colocate sync | 356.30 | 177.85 | 53.92 | 113.81 | 8h 36m | 17h 56m | 1d 5h 6m | 1d 16h 48m | max: 0.3573
last: 0.2958 |
+| `stream off policy pipeline`
(+fully async: trigger_parameter_sync_step= 4,
require_batches= 4) | 231.34 | 128.47 | \ | 98.77 | 4h 25m | 9h 41m | 15h 2m | 1d 1h 53m | max: 0.2844
last: 0.2604 |
+| `async stream pipeline with staleness samples`
(+staleness_threshold=0.5) | | | | | | | | | |
+| `async stream pipeline with partial rollout`
(+partial_rollout=True) | 150.63 | 33.14 | \ | 113.16 | 3h 13m | 6h 46m | 10h 53m | 17h 22m | max: 0.3521
last: 0.3094 |
+
+> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-stream_stale_partial?nw=nwuserhouzg
+
+### 128卡 stale 消融实验
+
+在 `async stream pipeline with partial rollout` 模式下,我们验证 staleness 的设置对于训练效率的影响。
+我们可以发现,staleness 越大,最终取得的收益越明显。
+同时我们也注意到 staleness 取 0.3 和 0.5 的时间比较接近,原因是随着训练步数的增量,response 长度变化较大,训练出现了不稳定的问题。
+后续还需要针对该问题进行进一步的分析和优化。
+
+| staleness_threshold | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | total time
300 step | total time
400 step | acc/mean@1 |
+|:---------------------:|:--------:|:--------:|:--------------:|:--------------:|:------------------------:|:------------------------:|:------------------------:|:------------------------:|:-----------------------------:|
+| 0 | 231.34 | 128.47 | \ | 98.77 | 4h 25m | 9h 41m | 15h 2m | 1d 1h 53m | max: 0.2844
last: 0.2604 |
+| 0.1 | 171.30 | 58.17 | \ | 109.12 | 3h 53m | 8h 37m | 14h 25m | 19h 59m | max: 0.3542
last: 0.2979 |
+| 0.3 | 146.11 | 38.88 | \ | 103.22 | 3h 18m | 6h 49m | 11h 40m | 17h 20m | max: 0.3469
last: 0.2865 |
+| 0.5 | 150.63 | 33.14 | \ | 113.16 | 3h 13m | 6h 46m | 10h 53m | 17h 22m | max: 0.3521
last: 0.3094 |
+
+> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-ablation_stale?nw=nwuserhouzg
+
+### 128卡 7B require_batches 消融实验
+
+在多次测试下,我们发现流式每次下发样本的数量会影响训练的response长度,进而影响训练时长,我们通过修改
+`async_training.require_batches` 验证对与结果的影响。
+
+| require_batches | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | total time
300 step | acc/mean@1 |
+|:-----------------:|:--------:|:-------:|:--------------:|:--------------:|:------------------------:|:------------------------:|:------------------------:|:-----------------------------:|
+| 1 | 203.47 | 30.88 | \ | 181.08 | 3h 31m | 8h 29m | 17h 36m | max: 0.349
last: 0.326 |
+| 2 | 158.72 | 26.32 | \ | 128.08 | 3h 35m | 7h 38m | 13h 57m | max: 0.351
last: 0.3406 |
+| 4 | 124.64 | 25.62 | \ | 95.06 | 3h 13m | 6h 46m | 10h 53m | max: 0.3521
last: 0.3521 |
+
+> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-ablation_require_batches?nw=nwuserhouzg
+
+### 30B模型模式实验
+
+我们在 Qwen3-30B-A3B-Base 模型上通过`async stream pipeline with staleness samples` 策略,相比于 colocate 方案取得了 1.7
+倍的性能提升。值得说明的是,这距离异步方式所能带来的性能提升上限还有很大空间。首先,对比实验中使用的最大响应长度仅为
+8k,这远低于此前实验的 20k 序列长度,因此 rollout 的长尾效应并不明显。其次,我们采用了极为倾斜的资源分配方案,rollout 使用了
+96 张 GPU,而 trainer 仅使用了 32 张 GPU,这并不是最优的配置。在实验过程中,我们观察到当前的 verl 实现存在一些限制,比如要求数据必须能被
+GPU 数量整除,这使得资源调整的灵活性受到影响。此外,随着异步训练和部署的加速,性能差距也在逐渐缩小。因此,未来我们将重点关注如何实现更灵活的资源分配和动态调整资源。
+
+* 机器:H20
+* 模型:Qwen3-30B-A3B-Base
+* rollout长度:max_response_length : 8K tokens;
+* 算法: GRPO
+* 数据集: TRAIN_FILE: dapo-math-17k.parquet TEST_FILE: aime-2024.parquet
+* Engine: vllm+Megatron
+* rollout.n: 16
+* ppo_mini_batch_size: 128
+* test_freq: 20
+
+* colocate sync:
+ * step:400
+ * train_batch_size: 512
+
+* fully_async_policy
+ * total_rollout_steps: 512*400
+ * trigger_parameter_sync_step: 512/128 = 4
+ * staleness_threshold: 0.5
+ * partial_rollout: True
+
+| Training Mode | Resource Allocation | Step | Gen | Old Log Prob | Ref | Update Actor | Total Time 100 Step | Total Time 200 Step | Total Time 300 Step | Total Time 400 Step | Acc/Mean@1 |
+|----------------------|--------------------|---------|--------|--------------|--------|--------------|---------------------|---------------------|---------------------|---------------------|-----------------------------|
+| Colocate Sync | 128 | 497.89 | 348.05 | 28.73 | 20.86 | 86.27 | 13h 36m | 1d 3h 48m | 1d 19h 4m | 2d 11h 39m | max: 0.3500
last: 0.3208 |
+| Fully Async Policy | 96:32 | 282.75 | 22.06 | \ | 50.05 | 206.63 | 6h 45m (2.01x) | 14h 48m (1.88x) | 1d 0h 9m (1.78x) | 1d 10h 41m (1.72x) | max: 0.3813
last: 0.3448 |
+
+> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-30B?nw=nwuserhouzg
+
+### checkpoint-engine参数同步消融实验
+我们在Qwen2.5-Math-7B,Qwen3-30B-A3B和Qwen3-235B-A22B三个模型上测试了checkpoint-engine参数同步的单步参数同步耗时,使用的参数均为默认参数配置。实验均在H20机器上完成,并使用megatron训练引擎。
+| model | trainer rank | rollout rank | checkpoint-engine | total sync time |
+|:-----------------:|:--------:|:-------:|:--------------:|:--------------:|
+| Qwen2.5-Math-7B | 4 | 4 | False | 0.12s |
+| Qwen2.5-Math-7B | 4 | 4 | True | 0.02s |
+| Qwen3-30B-A3B | 16 | 16 | False | 15.76s |
+| Qwen3-30B-A3B | 16 | 16 | True | 4.38s |
+| Qwen3-235B-A22B | 64 | 64 | False | 58.57s |
+| Qwen3-235B-A22B | 64 | 64 | True | 23.70s |
+
+### use_trainer_do_validate 实验测试
+我们在Qwen2.5-Math-7B模型上测试了`use_trainer_do_validate`参数的影响。这个结果展示使用`use_trainer_do_validate=True`可以减少验证时间开销,并且训练器节点的空闲时间也减少了。
+
+* Machine: H20
+* Model: Qwen2.5-Math-7B
+* Rollout length: max_response_length FSDP2: 10K tokens;
+* Algorithm: DAPO
+* Dataset: TRAIN_FILE: dapo-math-17k.parquet TEST_FILE: aime-2024.parquet
+* Engine: vllm+FSDP2
+* rollout.n: 16
+* ppo_mini_batch_size: 32
+* test_freq: 10
+
+* fully_async_policy
+ * total_rollout_steps: 512*400
+ * require_batches: 4
+ * trigger_parameter_sync_step: 4
+ * staleness_threshold: 0.5
+ * partial_rollout: True
+
+| training mode | resource allocation | step | gen | old_log_prob | update_actor | validate time | total time
50 step | acc/mean@2 |
+|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|
+| colocate sync | 16 | 484.623 | 52.939 | 0 | 430.263 | 205.080 | 7h9m | 22.6 |
+| fully_async_policy | 8:8 | 489.953 | 52.622 | 0 | 435.874 | 95.699 | 7h2m | 21.0 |
+
+
+## 多轮工具调用
+
+参考 **recipe/retool** 和 **ToolAgentLoop**,我们为 **fully_async_policy** 实现了支持partial rollout的多轮工具调用循环 *
+*AsyncPartialToolAgentLoop**。
+
+### 核心设计
+
+`AsyncPartialToolAgentLoop` 继承自 `ToolAgentLoop`,其核心是适配了 `fully_async_policy` 的异步训练模式。当
+`partial_rollout=True` 时,Rollouter 在与 Trainer 同步参数前会中断正在进行的生成任务。`AsyncPartialToolAgentLoop` 能够:
+
+1. **中断任务**: 响应中断信号,保存当前的生成状态。目前,中断会发生在GENERATING过程中,或其他状态结束后;
+2. **恢复任务**: 在参数同步完成后,从保存的状态恢复,继续执行,而不是从头开始。
+
+### 使用方法
+
+`fully_async_policy`多轮与工具调用的RL训练与 `recipe/retool` 类似,通过在配置文件中指定 `multi_turn` 相关配置来启用。
+
+1. **SFT 阶段**: 首先,需要对模型进行 SFT训练,使其具备遵循工具调用格式指令的能力。
+2. **配置启用**: 在 `fully_async_policy` 的训练配置中,设置以下参数:
+ ```yaml
+ actor_rollout_ref:
+ rollout:
+ multi_turn:
+ enable: True # 在fully_async_policy模式下将默认使用AsyncPartialToolAgentLoop
+ # 其他 multi_turn 相关配置
+ ```
+3. **配置async参数**: 为提高效率,在启用多轮工具调用时,同时开启 `partial_rollout`和`staleness_threshold`:
+ ```yaml
+ async_training:
+ partial_rollout: True
+ staleness_threshold: 0.5
+ # 其他async参数
+ ```
+4. **example**: 参考`recipe/fully_async_policy/shell/dapo_7b_async_retool.sh`
+
+### 实验结果
+
+为验证 `fully_async_policy` 在多轮工具调用任务中的性能,我们将其与标准 `colocate` 同步模式进行了对比。实验具体设置如下。
+
+* **SFT模型**: 实验基于 `Qwen2.5-7B-Instruct` 模型,使用`ReTool-SFT`数据集训练6个epoch;
+* **RL算法**: DAPO
+* **数据集**:
+ * 训练集: `DAPO-Math-17k`
+ * 测试集: `aime_2025`
+* **资源与模式对比**:
+ * `colocate sync`: 32卡 H20
+ * `fully_async_policy`: 16卡 Trainer + 16卡 Rollouter
+* **关键配置**:
+ 1. **工具调用配置**:
+ * `multi_turn.enable: True`
+ * `multi_turn.max_user_turns: 16`
+ * `multi_turn.max_assistant_turns: 16`
+ * `multi_turn.tool_config_path: recipe/retool/sandbox_fusion_tool_config.yaml`
+ 2. **`colocate sync`配置**:
+ * `ppo_mini_batch_size: 16`
+ * `train_batch_size: 64`
+ 3. **`fully_async_policy`配置**:
+ * `ppo_mini_batch_size: 16`
+ * `trigger_parameter_sync_step: 4`
+ * `require_batches: 1`
+ * `staleness_threshold: 1`
+ * `partial_rollout: True`
+
+| training mode | Resource allocation | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | aime_2025
acc/mean@30 |
+|:------------------: |:-------------------: |:-------: |:-------: |:------------: |:------------: |:----------------------: |:----------------------: |:---------------------------: |
+| colocate | 32 | 375.47 | 228.03 | 35.19 | 111.84 | 9h 46m | 22h 28m | start:0.1078
last:0.2056 |
+| fully_async_policy | 16: 16 | 221.36 | 40.59 | \ | 179.58 | 6h 19m
(1.55x) | 14h 4m
(1.60x) | start:0.11
last:0.2044 |
+
+> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-multiturn-tool?nw=nwuserhouzg
+
+## 后续计划
+
+* GRPO实验
+* megatron 适配
+* sglang 集成
+* transfer queue 集成
+* 异步参数同步
+* Areal异步算法实现
+* TPPO算法实现
+* 多轮及Tool的支持
\ No newline at end of file
diff --git a/verl/experimental/agent_loop/agent_loop.py b/verl/experimental/agent_loop/agent_loop.py
index e5d210ecf0a..2ca64e64e71 100644
--- a/verl/experimental/agent_loop/agent_loop.py
+++ b/verl/experimental/agent_loop/agent_loop.py
@@ -923,7 +923,7 @@ def _init_agent_loop_workers(self):
node_id = node_ids[i % len(node_ids)]
self.agent_loop_workers.append(
self.agent_loop_workers_class.options(
- name=f"agent_loop_worker_{i}",
+ name=f"agent_loop_worker_{i}" + f"_{uuid4().hex[:8]}",
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=node_id, soft=True
),
diff --git a/verl/experimental/fully_async_policy/README.md b/verl/experimental/fully_async_policy/README.md
index 051e57586b8..54228f81933 100644
--- a/verl/experimental/fully_async_policy/README.md
+++ b/verl/experimental/fully_async_policy/README.md
@@ -105,6 +105,7 @@ but the overlap in their time consumption reduces the end-to-end time consumptio
| `async_training.checkpoint_engine.enable` | Whether to use checkpoint_engine for accelerating, default `True` |
| `async_training.checkpoint_engine.overlap_broadcast_and_consume` | When use checkpoint_engine, whether to overlap broadcast and load_weights, default `False` |
| `async_training.checkpoint_engine.device_buffer_size_M` | When use checkpoint_engine, the user-specific bucket size (MB), default `4096` |
+| `async_training.use_trainer_do_validate` | Whether use trainer node to do validate process, default `False` |
**Further Explanation:**
@@ -194,6 +195,13 @@ but the overlap in their time consumption reduces the end-to-end time consumptio
- When disable `overlap_broadcast_and_consume`, the additional device memory overhead of
trainer rank is `2 * bucket_size`and rollout rank is `1 * bucket_size`。
+- `async_training.use_trainer_do_validate`
+
+ It controls whether to use the trainer's `do_validate` method for validation.
+ If set to True, the trainer will perform validation after each parameter update. It can reduce the validation time
+ overhead and trainer node idle time.
+ If set to False, the trainer will not perform validation.
+
### Supported Modes
1. on policy pipeline:
@@ -477,6 +485,38 @@ We tested the single-step parameter synchronization time of the checkpoint-engin
| Qwen3-235B-A22B | 64 | 64 | False | 58.57s |
| Qwen3-235B-A22B | 64 | 64 | True | 23.70s |
+
+### use_trainer_do_validate Experiment
+
+We tested the effect of setting `use_trainer_do_validate=True` on the training process. The results show that setting
+this parameter to True can reduce the validation time overhead and trainer node idle time.
+We used Qwen2.5-Math-7B to verify the benefits of `use_trainer_do_validate=True` on the training process, we achieved about 2x performance improvement on validation time, and the trainer node idle time is reduced by about 40%.
+
+- Machine: H20
+- Model: Qwen2.5-Math-7B
+- Rollout length: max_response_length FSDP2: 10K tokens;
+- Algorithm: DAPO
+- Dataset:
+ - TRAIN_FILE: dapo-math-17k.parquet
+ - TEST_FILE: aime-2024.parquet
+- Engine: vllm+FSDP2
+- rollout.n: 16
+- ppo_mini_batch_size: 32
+- test_freq: 10
+
+- fully_async_policy
+ - total_rollout_steps: 512*400
+ - require_batches: 4
+ - trigger_parameter_sync_step: 4
+ - staleness_threshold: 0.5
+ - partial_rollout: True
+
+| training mode | resource allocation | step | gen | old_log_prob | update_actor | validate time | total time
50 step | acc/mean@2 |
+|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|
+| colocate sync | 16 | 484.623 | 52.939 | 0 | 430.263 | 205.080 | 7h9m | 22.6 |
+| fully_async_policy | 8:8 | 489.953 | 52.622 | 0 | 435.874 | 95.699 | 7h2m | 21.0 |
+| fully_async_policy_opt_validate | 8:8 | | | 0 | | | | |
+
## Multi-Turn Tool Calling
Referencing **recipe/retool** and **ToolAgentLoop**, we implemented **AsyncPartialToolAgentLoop**, a multi-turn
diff --git a/verl/experimental/fully_async_policy/README_zh.md b/verl/experimental/fully_async_policy/README_zh.md
index a5d7f17bc1d..812e5eb8426 100644
--- a/verl/experimental/fully_async_policy/README_zh.md
+++ b/verl/experimental/fully_async_policy/README_zh.md
@@ -82,7 +82,7 @@ fully_async_policy 的整体架构如下图所示,fully_async_policy 主要由
| `async_training.checkpoint_engine.enable` | 是否开启 checkpoint_engine 模式的加速,默认值 True |
| `async_training.checkpoint_engine.overlap_broadcast_and_consume` | 启动 checkpoint_engine 时,是否在参数同步时在 broadcast 和加载之间使用流水,默认值 False |
| `async_training.checkpoint_engine.device_buffer_size_M` | 启动 checkpoint_engine 时,组装的 bucket 的大小(MB),默认为 4096 |
-
+| `async_training.use_trainer_do_validate` | 是否使用 Trainer 的 do_validate 方法进行 validation,默认值 False |
**进一步的解释:**
- `rollout.total_rollout_steps`
@@ -155,6 +155,12 @@ require_batches * ppo_mini_batch_size)`。
- 在开启`overlap_broadcast_and_consume`时,trainer 节点的临时额外显存开销为 `3 * bucket_size`, rollout 节点的临时额外显存开销为`2 * bucket_size`。
- 在关闭`overlap_broadcast_and_consume`时,trainer 节点的临时额外显存开销为 `2 * bucket_size`, rollout 节点的临时额外显存开销为`1 * bucket_size`。
+- `async_training.use_trainer_do_validate`
+
+ 控制是否使用trainer的 `do_validate` 方法进行 validation 。
+ 如果设置为 True,trainer 会在每次参数更新后,调用 `do_validate` 方法进行 validation。
+ 如果设置为 False,trainer 不会调用 `do_validate` 方法。
+
### 模式支持
1. on policy pipeline:
@@ -407,6 +413,36 @@ GPU 数量整除,这使得资源调整的灵活性受到影响。此外,随
| Qwen3-235B-A22B | 64 | 64 | False | 58.57s |
| Qwen3-235B-A22B | 64 | 64 | True | 23.70s |
+### use_trainer_do_validate 实验测试
+
+我们在Qwen2.5-Math-7B模型上测试了 `use_trainer_do_validate` 参数的影响。这个结果展示使用 `use_trainer_do_validate=True` 可以减少验证时间开销,并且训练器节点的空闲时间也减少了。
+
+- Machine: H20
+- Model: Qwen2.5-Math-7B
+- Rollout length: max_response_length FSDP2: 10K tokens;
+- Algorithm: DAPO
+- Dataset:
+ - TRAIN_FILE: dapo-math-17k.parquet
+ - TEST_FILE: aime-2024.parquet
+- Engine: vllm+FSDP2
+- rollout.n: 16
+- ppo_mini_batch_size: 32
+- test_freq: 10
+
+- fully_async_policy
+ - total_rollout_steps: 512*400
+ - require_batches: 4
+ - trigger_parameter_sync_step: 4
+ - staleness_threshold: 0.5
+ - partial_rollout: True
+
+| training mode | resource allocation | step | gen | old_log_prob | update_actor | validate time | total time
50 step | acc/mean@2 |
+|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|
+| colocate sync | 16 | 484.623 | 52.939 | 0 | 430.263 | 205.080 | 7h9m | 22.6 |
+| fully_async_policy | 8:8 | 489.953 | 52.622 | 0 | 435.874 | 95.699 | 7h2m | 21.0 |
+| fully_async_policy_opt_validate | 8:8 | | | 0 | | | | |
+
+
## 多轮工具调用
参考 **recipe/retool** 和 **ToolAgentLoop**,我们为 **fully_async_policy** 实现了支持 partial rollout 的多轮工具调用循环 \*
diff --git a/verl/experimental/fully_async_policy/config/fully_async_ppo_megatron_trainer.yaml b/verl/experimental/fully_async_policy/config/fully_async_ppo_megatron_trainer.yaml
index c0252390e54..85b8307ee0c 100644
--- a/verl/experimental/fully_async_policy/config/fully_async_ppo_megatron_trainer.yaml
+++ b/verl/experimental/fully_async_policy/config/fully_async_ppo_megatron_trainer.yaml
@@ -27,6 +27,10 @@ async_training:
# compute_prox_log_prob
compute_prox_log_prob: False
+ # whether to use trainer do_validate
+ use_trainer_do_validate: False
+
+
# checkpoint_engine config for accelerating parameter synchronization between rollouter and trainer
checkpoint_engine:
# Whether to use checkpoint_engine
diff --git a/verl/experimental/fully_async_policy/config/fully_async_ppo_trainer.yaml b/verl/experimental/fully_async_policy/config/fully_async_ppo_trainer.yaml
index 21562de54e7..c5692b4a931 100644
--- a/verl/experimental/fully_async_policy/config/fully_async_ppo_trainer.yaml
+++ b/verl/experimental/fully_async_policy/config/fully_async_ppo_trainer.yaml
@@ -27,6 +27,10 @@ async_training:
# compute_prox_log_prob
compute_prox_log_prob: False
+ # whether to use trainer do_validate
+ use_trainer_do_validate: False
+
+
# checkpoint_engine config for accelerating parameter synchronization between rollouter and trainer
checkpoint_engine:
# Whether to use checkpoint_engine
diff --git a/verl/experimental/fully_async_policy/fsdp_workers.py b/verl/experimental/fully_async_policy/fsdp_workers.py
index c23ca4db3f2..96ab922b4fc 100644
--- a/verl/experimental/fully_async_policy/fsdp_workers.py
+++ b/verl/experimental/fully_async_policy/fsdp_workers.py
@@ -89,7 +89,7 @@ def sync_rollout_weights(self, sync_group_name="actor_rollout"):
if self._is_actor and self._is_offload_param:
load_fsdp_model_to_gpu(self.actor_module_fsdp)
params = self._get_actor_params() if self._is_actor else None
- if self._is_rollout:
+ if self._is_rollout and (not self._is_actor):
inference_model = get_inference_model(self.rollout)
from verl.utils.vllm.patch import patch_vllm_moe_model_weight_loader
@@ -107,7 +107,7 @@ def sync_rollout_weights(self, sync_group_name="actor_rollout"):
from ray.util.collective import collective
collective.broadcast(tensor, src_rank=0, group_name=sync_group_name)
- if self._is_rollout:
+ if self._is_rollout and (not self._is_actor):
inference_model.load_weights([(key, tensor)])
if self._is_actor and self._is_offload_param:
@@ -159,7 +159,7 @@ def sync_rollout_weights_by_checkpoint(self, sync_group_name="actor_rollout"):
update_start_time = time.time()
inference_model = None
- if self._is_rollout:
+ if self._is_rollout and (not self._is_actor):
inference_model = get_inference_model(self.rollout)
from verl.utils.vllm.patch import patch_vllm_moe_model_weight_loader
diff --git a/verl/experimental/fully_async_policy/fully_async_main.py b/verl/experimental/fully_async_policy/fully_async_main.py
index 31465286fe3..753e5a82e8d 100644
--- a/verl/experimental/fully_async_policy/fully_async_main.py
+++ b/verl/experimental/fully_async_policy/fully_async_main.py
@@ -45,7 +45,7 @@ def create_resource_pool_manager(config, roles: list) -> ResourcePoolManager:
mapping = {}
# Actor/Critic resource pool
- if any(role in roles for role in [Role.Actor, Role.Critic, Role.RefPolicy, Role.RewardModel]):
+ if any(role in roles for role in [Role.Actor, Role.ActorRollout, Role.Critic, Role.RefPolicy, Role.RewardModel]):
assert config.trainer.n_gpus_per_node > 0, "config.trainer.n_gpus_per_node must be greater than 0"
assert config.trainer.nnodes > 0, "config.trainer.nnodes must be greater than 0"
@@ -53,7 +53,7 @@ def create_resource_pool_manager(config, roles: list) -> ResourcePoolManager:
resource_pool_spec["trainer_pool"] = trainer_pool
# Map training-related roles to the same resource pool
- for role in [Role.Actor, Role.Critic, Role.RefPolicy, Role.RewardModel]:
+ for role in [Role.Actor, Role.ActorRollout, Role.Critic, Role.RefPolicy, Role.RewardModel]:
if role in roles:
mapping[role] = "trainer_pool"
@@ -104,8 +104,9 @@ def create_role_worker_mapping(config):
else:
raise NotImplementedError(f"Unsupported strategy: {config.actor_rollout_ref.actor.strategy}")
+ train_role = Role.ActorRollout if config.async_training.use_trainer_do_validate else Role.Actor
role_worker_mapping = {
- Role.Actor: ray.remote(DetachActorWorker),
+ train_role: ray.remote(DetachActorWorker),
Role.Rollout: ray.remote(DetachAsyncRolloutWorker),
Role.Critic: ray.remote(CriticWorker),
}
@@ -207,7 +208,13 @@ def _initialize_components(self, config) -> None:
# param_version resume from ckpt or default 0
param_version = ray.get(self.components["trainer"].load_checkpoint.remote())
ray.get(self.components["rollouter"].load_checkpoint.remote())
- ray.get(param_synchronizer.sync_weights.remote(version=param_version, validate=val_before_train))
+ ray.get(
+ param_synchronizer.sync_weights.remote(
+ version=param_version,
+ validate=val_before_train,
+ use_trainer_do_validate=config.async_training.use_trainer_do_validate,
+ )
+ )
ray.get(param_synchronizer.wait_last_valid.remote())
self.components["param_synchronizer"] = param_synchronizer
diff --git a/verl/experimental/fully_async_policy/fully_async_rollouter.py b/verl/experimental/fully_async_policy/fully_async_rollouter.py
index 59ddd54bef3..8dc3b3265a0 100644
--- a/verl/experimental/fully_async_policy/fully_async_rollouter.py
+++ b/verl/experimental/fully_async_policy/fully_async_rollouter.py
@@ -102,6 +102,17 @@ def __init__(
train_sampler = create_rl_sampler(config.data, train_dataset)
self._validate_config()
+ if self.config.async_training.use_trainer_do_validate:
+ rollout_gpus = config.rollout.nnodes * config.rollout.n_gpus_per_node
+ train_gpus = config.trainer.nnodes * config.trainer.n_gpus_per_node
+ total_gpus = rollout_gpus + train_gpus
+ print(f"[FullyAsyncRollouter] split before val_dataset total len: {len(val_dataset)}")
+ split_dataset = val_dataset.split(total_gpus)
+ rollout_val_dataset0 = split_dataset[:rollout_gpus]
+ from torch.utils.data import ConcatDataset
+
+ val_dataset = ConcatDataset(rollout_val_dataset0)
+ print(f"[FullyAsyncRollouter] split after val_dataset total len: {len(val_dataset)}")
print(f"[FullyAsyncRollouter] Rollouter _create_dataloader...\n{train_dataset}\n{val_dataset}")
self._create_dataloader(train_dataset, val_dataset, collate_fn, train_sampler)
@@ -207,7 +218,9 @@ def get_max_queue_size(self):
def get_total_train_steps(self):
return self.total_train_steps
- async def update_param_version(self, version: int, validate: bool = False, global_steps: int = 0):
+ async def update_param_version(
+ self, version: int, validate: bool = False, global_steps: int = 0, use_trainer_do_validate: bool = False
+ ):
"""Update current parameter version"""
async with self.lock:
old_version = self.current_param_version
@@ -240,7 +253,7 @@ async def update_param_version(self, version: int, validate: bool = False, globa
and self.current_param_version > 0 # don't test here in the initial parameter sync
) or (validate and self.val_reward_fn is not None):
with marked_timer("rollouter/validate_time", timing_raw, color="green"):
- val_metrics: dict = self._validate()
+ val_metrics: dict = self._validate(use_trainer_do_validate)
data = ValidateMetrics(
timing_raw=timing_raw, metrics=val_metrics, global_steps=global_steps, param_version=version
)
diff --git a/verl/experimental/fully_async_policy/fully_async_trainer.py b/verl/experimental/fully_async_policy/fully_async_trainer.py
index e4bda8af341..c7fd22a1b09 100644
--- a/verl/experimental/fully_async_policy/fully_async_trainer.py
+++ b/verl/experimental/fully_async_policy/fully_async_trainer.py
@@ -104,6 +104,8 @@ def __init__(
self.progress_bar = None
self.trigger_parameter_sync_step = config.async_training.trigger_parameter_sync_step
self.last_ckpt_version = 0
+ self.train_val_metrics = None
+ self.train_role = Role.ActorRollout if config.async_training.use_trainer_do_validate else Role.Actor
# required_samples use ppo_mini_batch_size*require_batches as the minimum number of samples.
self.require_batches = config.async_training.require_batches
@@ -115,6 +117,37 @@ def __init__(
)
self.metrics_aggregator = MetricsAggregator(total_gpus=total_gpus)
+ # use trainer to do validation
+ if self.config.async_training.use_trainer_do_validate:
+ from verl.trainer.main_ppo import create_rl_dataset
+ from verl.utils.dataset.rl_dataset import collate_fn
+
+ val_dataset = create_rl_dataset(config.data.val_files, config.data, tokenizer, processor)
+ rollout_gpus = config.rollout.nnodes * config.rollout.n_gpus_per_node
+ print(f"[FullyAsyncTrainer] split before val_dataset total len: {len(val_dataset)}")
+ split_dataset = val_dataset.split(total_gpus)
+ rollout_val_dataset0 = split_dataset[rollout_gpus:]
+ from torch.utils.data import ConcatDataset
+
+ val_dataset = ConcatDataset(rollout_val_dataset0)
+ print(f"[FullyAsyncTrainer] split after val_dataset total len: {len(val_dataset)}")
+ self.val_dataset = val_dataset
+ # update val_dataloader
+ val_batch_size = self.config.data.val_batch_size # Prefer config value if set
+ if val_batch_size is None:
+ val_batch_size = len(val_dataset)
+ from torchdata.stateful_dataloader import StatefulDataLoader
+
+ print(f"[FullyAsyncTrainer] create val_dataloader with batch_size: {val_batch_size}")
+ self.val_dataloader = StatefulDataLoader(
+ dataset=val_dataset,
+ batch_size=val_batch_size,
+ num_workers=self.config.data["dataloader_num_workers"],
+ shuffle=self.config.data.get("validation_shuffle", True),
+ drop_last=False,
+ collate_fn=collate_fn,
+ )
+
def set_message_queue_client(self, message_queue_client: MessageQueueClient):
"""Set message queue client"""
self.message_queue_client = message_queue_client
@@ -192,7 +225,7 @@ def _get_samples_from_queue(self) -> tuple[None, None] | tuple[int, Any]:
def _create_actor_rollout_classes(self):
# create actor
- for role in [Role.Actor]:
+ for role in [self.train_role]:
resource_pool = self.resource_pool_manager.get_resource_pool(role)
role_cls = RayClassWithInitArgs(
cls=self.role_worker_mapping[role],
@@ -214,14 +247,41 @@ def _init_models(self):
self.rm_wg = self.all_wg[str(Role.RewardModel)]
self.rm_wg.init_model()
- self.actor_wg = self.all_wg[str(Role.Actor)]
+ self.actor_wg = self.all_wg[str(self.train_role)]
self.actor_wg.init_model()
self.actor_rollout_wg = self.actor_wg # to be compatible with the functions that not be modified
- def _init_async_rollout_manager(self):
- pass
+ async def init_workers(self):
+ """Initialize distributed training workers using Ray backend.
+ Creates:
+ 1. Ray resource pools from configuration
+ 2. Worker groups for each role (actor, critic, etc.)
+ """
+ # self._init_async_objects()
+ self._init_resource_pools()
+ self._create_worker_classes()
+ self._init_worker_groups()
+ self._init_models()
+ await self._init_async_rollout_manager()
+
+ async def _init_async_rollout_manager(self):
+ # use async rollout do validate
+ if self.config.async_training.use_trainer_do_validate:
+ print(f"[FullyAsyncTrainer] use_trainer_do_validate: {self.config.async_training.use_trainer_do_validate}")
+ assert self.config.actor_rollout_ref.rollout.mode == "async"
+ self.async_rollout_mode = True
+ print("[FullyAsyncTrainer] Init async rollout manager")
+ from recipe.fully_async_policy.agent_loop import FullyAsyncAgentLoopManager
+
+ self.async_rollout_manager = await FullyAsyncAgentLoopManager.create(
+ config=self.config, worker_group=self.actor_rollout_wg
+ )
+ print("[FullyAsyncTrainer] async_rollout_manager sleep")
+ await self.async_rollout_manager.sleep()
+ else:
+ print("[FullyAsyncTrainer] Skip async rollout manager (use_trainer_do_validate=False)")
- def fit(self):
+ async def fit(self):
"""
The training loop of PPO.
The driver process only need to call the compute functions of the worker group through RPC
@@ -277,7 +337,7 @@ def fit(self):
f"trigger_parameter_sync_step: {self.trigger_parameter_sync_step} "
f"{time_str}"
)
- self._trigger_parameter_sync_after_step(global_steps=self.global_steps)
+ await self._trigger_parameter_sync_after_step(global_steps=self.global_steps)
self._log_validation_data()
self._check_save_checkpoint(timing_raw)
self.global_steps += 1
@@ -288,7 +348,7 @@ def fit(self):
self._log_validation_data()
# 2. perform addtional parameter_sync and validate if trainer already updated
if self.current_param_version % self.config.rollout.test_freq != 0 or self.local_trigger_step > 1:
- self._trigger_parameter_sync_after_step(validate=True, global_steps=self.global_steps)
+ await self._trigger_parameter_sync_after_step(validate=True, global_steps=self.global_steps)
ray.get(self.param_synchronizer.wait_last_valid.remote())
self._log_validation_data()
self.progress_bar.close()
@@ -459,7 +519,7 @@ def _collect_metrics_from_samples(self, batch, metrics):
if key.startswith("fully_async") or key.startswith("timing_s"):
metrics[key] = value
- def _trigger_parameter_sync_after_step(self, validate: bool = False, global_steps: int = None):
+ async def _trigger_parameter_sync_after_step(self, validate: bool = False, global_steps: int = None):
"""
Trigger parameter synchronization after training step
This ensures rollouter always uses the latest trained parameters
@@ -482,9 +542,25 @@ def _trigger_parameter_sync_after_step(self, validate: bool = False, global_step
with marked_timer("timing_s/param_sync", timing_param_sync):
ray.get(
self.param_synchronizer.sync_weights.remote(
- self.current_param_version, validate=validate, global_steps=global_steps
+ self.current_param_version,
+ validate=validate,
+ global_steps=global_steps,
+ use_trainer_do_validate=self.config.async_training.use_trainer_do_validate,
)
)
+
+ # do trainer validate
+ do_validate_param = (
+ self.config.rollout.test_freq > 0
+ and self.current_param_version % self.config.rollout.test_freq == 0
+ and self.current_param_version > 0
+ )
+ print(f"do_validate_param: {do_validate_param}")
+ if do_validate_param and self.reward_fn is not None and self.config.async_training.use_trainer_do_validate:
+ print(f"[FullyAsyncTrainer] validate param version: {self.current_param_version}")
+ await self._validate_process()
+ else:
+ self.train_val_metrics = None
self.logger.log(data=timing_param_sync, step=self.current_param_version)
def _log_validation_data(self):
@@ -496,10 +572,38 @@ def _log_validation_data(self):
return
val_metrics: ValidateMetrics = ray.cloudpickle.loads(val_data)
- if val_metrics.metrics:
- self.logger.log(data=val_metrics.metrics, step=val_metrics.param_version)
- pprint(
- f"[FullyAsyncTrainer] parameter version: {val_metrics.param_version} "
- f"Validation metrics: {val_metrics.metrics}"
- )
+ if self.train_val_metrics and self.config.async_training.use_trainer_do_validate:
+ # merge info
+ timing_param_sync = {}
+ with marked_timer("timing_s/merge_val", timing_param_sync):
+ new_metrics = self._merge_validation_results(self.train_val_metrics, val_metrics.metrics)
+ if new_metrics:
+ self.logger.log(data=new_metrics, step=val_metrics.param_version)
+ pprint(
+ f"[FullyAsyncTrainer] parameter version: {val_metrics.param_version} "
+ f"Validation metrics: {new_metrics}, timing_param_sync: {timing_param_sync['timing_s/merge_val']}"
+ )
+ self.logger.log(data=val_metrics.timing_raw, step=val_metrics.param_version)
+ else:
+ if val_metrics.metrics:
+ self.logger.log(data=val_metrics.metrics, step=val_metrics.param_version)
+ pprint(
+ f"[FullyAsyncTrainer] parameter version: {val_metrics.param_version} "
+ f"Validation metrics: {val_metrics.metrics}"
+ )
self.logger.log(data=val_metrics.timing_raw, step=val_metrics.param_version)
+
+ async def _validate_process(self):
+ if self.config.async_training.use_trainer_do_validate:
+ print("[FullyAsyncTrainer] _validate_process")
+ from verl.utils.profiler import marked_timer
+
+ timing_raw = {}
+ await self.async_rollout_manager.wake_up()
+ with marked_timer("trainer/validate_time", timing_raw):
+ self.train_val_metrics = self._validate(True)
+ await self.async_rollout_manager.sleep()
+ print(f"[FullyAsyncTrainer] validate timing_raw validate: {timing_raw['trainer/validate_time']}")
+ else:
+ self.train_val_metrics = None
+ print("[FullyAsyncTrainer] _validate_process without async_rollout_manager")
diff --git a/verl/experimental/fully_async_policy/param_sync.py b/verl/experimental/fully_async_policy/param_sync.py
index a31ee31212c..b795000be04 100644
--- a/verl/experimental/fully_async_policy/param_sync.py
+++ b/verl/experimental/fully_async_policy/param_sync.py
@@ -45,6 +45,7 @@ def __init__(self, config, trainer, rollouter, mq):
self.sync_group_name = "actor_rollout"
self.wait_last_update = None
self.wait_last_resume = None
+ self.validate_task = None
# Statistics
self.current_version = 0
@@ -94,7 +95,7 @@ def _init_actor_rollout_checkpoint_engine(self):
)
)
- def sync_weights(self, version, validate=False, global_steps=0):
+ def sync_weights(self, version, validate=False, global_steps=0, use_trainer_do_validate=False):
"""Sync weights between trainer and rollouter, and update parameter version"""
start_time = time.time()
@@ -119,8 +120,18 @@ def sync_weights(self, version, validate=False, global_steps=0):
f"[ParameterSynchronizer] sync_weights success. cost {end_time - start_time:.2f} seconds, "
f"pause:{pause_time - start_time:.2f}s, sync:{end_time - pause_time:.2f}s"
)
+
+ # async train do validate
+ print(f"[ParameterSynchronizer] validate: {validate}, use_trainer_do_validate: {use_trainer_do_validate}")
+ if validate and use_trainer_do_validate:
+ print("[ParameterSynchronizer] use trainer to do validate")
+ self.validate_task = self.trainer._validate_process.remote()
+ else:
+ self.validate_task = None
# Async Update rollout version & validation
- self.wait_last_update = self.rollouter.update_param_version.remote(version, validate, global_steps)
+ self.wait_last_update = self.rollouter.update_param_version.remote(
+ version, validate, global_steps, use_trainer_do_validate
+ )
self.wait_last_resume = self.rollouter.resume.remote(self.wait_last_update)
def wait_last_valid(self):
@@ -130,6 +141,8 @@ def wait_last_valid(self):
ray.get(self.wait_last_update)
if self.wait_last_resume:
ray.get(self.wait_last_resume)
+ if self.validate_task:
+ ray.get(self.validate_task)
print(f"[ParameterSynchronizer] Wait last validate cost: {time.time() - start_time:.2f} seconds")
def rollouter_save_checkpoint(self, local_global_step_folder: str):
diff --git a/verl/trainer/ppo/metric_utils.py b/verl/trainer/ppo/metric_utils.py
index 876d6907de4..6ed921a9da0 100644
--- a/verl/trainer/ppo/metric_utils.py
+++ b/verl/trainer/ppo/metric_utils.py
@@ -333,14 +333,28 @@ def bootstrap_metric(
[(3.0, 0.5), (4.5, 0.3)] # Example values
"""
np.random.seed(seed)
+ data_np = np.array(data, dtype=object)
+ n_data = len(data_np)
- bootstrap_metric_lsts = [[] for _ in range(len(reduce_fns))]
- for _ in range(n_bootstrap):
- bootstrap_idxs = np.random.choice(len(data), size=subset_size, replace=True)
- bootstrap_data = [data[i] for i in bootstrap_idxs]
- for i, reduce_fn in enumerate(reduce_fns):
- bootstrap_metric_lsts[i].append(reduce_fn(bootstrap_data))
- return [(np.mean(lst), np.std(lst)) for lst in bootstrap_metric_lsts]
+ # generate bootstrap indices, shape: (n_bootstrap, subset_size)
+ bootstrap_idxs = np.random.choice(n_data, size=(n_bootstrap, subset_size), replace=True)
+
+ # pre-allocate result array, shape: (n_fns, n_bootstrap)
+ n_fns = len(reduce_fns)
+ metric_results = np.empty((n_fns, n_bootstrap), dtype=np.float64)
+
+ # compute metric results for each bootstrap sample
+ for fn_idx, reduce_fn in enumerate(reduce_fns):
+ # bootstrap sample and compute metric
+ for boot_idx in range(n_bootstrap):
+ sample = data_np[bootstrap_idxs[boot_idx]]
+ metric_results[fn_idx, boot_idx] = reduce_fn(sample)
+
+ # compute mean and std for each metric function
+ result = [
+ (float(np.mean(metric_results[fn_idx])), float(np.std(metric_results[fn_idx]))) for fn_idx in range(n_fns)
+ ]
+ return result
def calc_maj_val(data: list[dict[str, Any]], vote_key: str, val_key: str) -> float:
@@ -431,47 +445,88 @@ def process_validation_metrics(
for var_name, var_vals in infos_dict.items():
var2vals[var_name].append(var_vals[sample_idx])
- # Calculate metrics for each group
- data_src2uid2var2metric = defaultdict(lambda: defaultdict(lambda: defaultdict(dict)))
+ np_mean = np.mean
+ np_std = np.std
+ reduce_fns_best_worst = [np.max, np.min]
+ n_bootstrap = 1000
+
+ # 2. cache ns list
+ def gen_ns(n_resps: int) -> list[int]:
+ if n_resps <= 1:
+ return []
+ ns = []
+ n = 2
+ while n < n_resps:
+ ns.append(n)
+ n *= 2
+ ns.append(n_resps)
+ return ns
+
+ ns_cache = {}
+
+ # 3. cache metric results
+ data_src2uid2var2metric = {}
+
+ # 4. flatten loop
for data_source, uid2var2vals in data_src2uid2var2vals.items():
+ # create uid dict
+ uid_dict = data_src2uid2var2metric.setdefault(data_source, {})
+
for uid, var2vals in uid2var2vals.items():
+ pred_vals = var2vals.get("pred")
+ has_pred = pred_vals is not None
+ var_dict = uid_dict.setdefault(uid, {})
+
for var_name, var_vals in var2vals.items():
- if isinstance(var_vals[0], str):
+ # skip empty or string values
+ if not var_vals or isinstance(var_vals[0], str):
continue
- metric = {}
+ # compute mean and std
n_resps = len(var_vals)
- metric[f"mean@{n_resps}"] = np.mean(var_vals)
+ metric = {f"mean@{n_resps}": float(np_mean(var_vals))}
if n_resps > 1:
- metric[f"std@{n_resps}"] = np.std(var_vals)
+ metric[f"std@{n_resps}"] = float(np_std(var_vals))
- ns = []
- n = 2
- while n < n_resps:
- ns.append(n)
- n *= 2
- ns.append(n_resps)
+ # cache ns list
+ if n_resps not in ns_cache:
+ ns_cache[n_resps] = gen_ns(n_resps)
+ ns = ns_cache[n_resps]
+ # compute best/worst metrics
for n in ns:
- [(bon_mean, bon_std), (won_mean, won_std)] = bootstrap_metric(
- data=var_vals, subset_size=n, reduce_fns=[np.max, np.min], seed=seed
+ # compute best/worst metrics
+ (bon_mean, bon_std), (won_mean, won_std) = bootstrap_metric(
+ data=var_vals,
+ subset_size=n,
+ reduce_fns=reduce_fns_best_worst,
+ n_bootstrap=n_bootstrap,
+ seed=seed,
)
- metric[f"best@{n}/mean"], metric[f"best@{n}/std"] = bon_mean, bon_std
- metric[f"worst@{n}/mean"], metric[f"worst@{n}/std"] = won_mean, won_std
- if var2vals.get("pred", None) is not None:
+ metric[f"best@{n}/mean"] = bon_mean
+ metric[f"best@{n}/std"] = bon_std
+ metric[f"worst@{n}/mean"] = won_mean
+ metric[f"worst@{n}/std"] = won_std
+
+ # compute maj metrics
+ if has_pred:
+ # create vote_data
vote_data = [
- {"val": val, "pred": pred} for val, pred in zip(var_vals, var2vals["pred"], strict=True)
+ {"val": val, "pred": pred} for val, pred in zip(var_vals, pred_vals, strict=True)
]
+ # compute maj metrics
[(maj_n_mean, maj_n_std)] = bootstrap_metric(
data=vote_data,
subset_size=n,
reduce_fns=[partial(calc_maj_val, vote_key="pred", val_key="val")],
+ n_bootstrap=n_bootstrap,
seed=seed,
)
- metric[f"maj@{n}/mean"], metric[f"maj@{n}/std"] = maj_n_mean, maj_n_std
+ metric[f"maj@{n}/mean"] = maj_n_mean
+ metric[f"maj@{n}/std"] = maj_n_std
- data_src2uid2var2metric[data_source][uid][var_name] = metric
+ var_dict[var_name] = metric
# Aggregate metrics across uids
data_src2var2metric2uid_vals = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
@@ -486,5 +541,4 @@ def process_validation_metrics(
for var_name, metric2uid_vals in var2metric2uid_vals.items():
for metric_name, uid_vals in metric2uid_vals.items():
data_src2var2metric2val[data_source][var_name][metric_name] = np.mean(uid_vals)
-
return data_src2var2metric2val
diff --git a/verl/trainer/ppo/ray_trainer.py b/verl/trainer/ppo/ray_trainer.py
index 4ee4511d77e..a29e0e4dc79 100644
--- a/verl/trainer/ppo/ray_trainer.py
+++ b/verl/trainer/ppo/ray_trainer.py
@@ -605,7 +605,7 @@ def _get_gen_batch(self, batch: DataProto) -> DataProto:
return gen_batch
- def _validate(self):
+ def _validate(self, merged: bool = False):
data_source_lst = []
reward_extra_infos_dict: dict[str, list] = defaultdict(list)
@@ -721,8 +721,18 @@ def _validate(self):
for key_info, lst in reward_extra_infos_dict.items():
assert len(lst) == 0 or len(lst) == len(sample_scores), f"{key_info}: {len(lst)=}, {len(sample_scores)=}"
+ if merged:
+ print("_merge_validation_results validate result will be merged")
+ return {
+ "data_sources": data_source_lst,
+ "sample_uids": sample_uids,
+ "sample_turns": sample_turns,
+ "reward_extra_infos_dict": reward_extra_infos_dict,
+ }
data_sources = np.concatenate(data_source_lst, axis=0)
+ return self._val_metrics_update(data_sources, sample_uids, reward_extra_infos_dict, sample_turns)
+ def _val_metrics_update(self, data_sources, sample_uids, reward_extra_infos_dict, sample_turns):
data_src2var2metric2val = process_validation_metrics(data_sources, sample_uids, reward_extra_infos_dict)
metric_dict = {}
for data_source, var2metric2val in data_src2var2metric2val.items():
@@ -749,6 +759,30 @@ def _validate(self):
return metric_dict
+ def _merge_validation_results(self, result_a, result_b):
+ if result_a is None and result_b is None:
+ return {}
+ if result_a is None:
+ result_a = {"data_sources": [], "sample_uids": [], "sample_turns": [], "reward_extra_infos_dict": {}}
+ if result_b is None:
+ result_b = {"data_sources": [], "sample_uids": [], "sample_turns": [], "reward_extra_infos_dict": {}}
+
+ if not result_a.get("data_sources") and not result_b.get("data_sources"):
+ return {}
+
+ data_sources = np.concatenate(result_a["data_sources"] + result_b["data_sources"], axis=0)
+ sample_uids = result_a["sample_uids"] + result_b["sample_uids"]
+ sample_turns = result_a["sample_turns"] + result_b["sample_turns"]
+
+ reward_extra_infos_dict = {}
+ all_keys = set(result_a["reward_extra_infos_dict"].keys()) | set(result_b["reward_extra_infos_dict"].keys())
+ for key in all_keys:
+ list_a = result_a["reward_extra_infos_dict"].get(key, [])
+ list_b = result_b["reward_extra_infos_dict"].get(key, [])
+ reward_extra_infos_dict[key] = list_a + list_b
+
+ return self._val_metrics_update(data_sources, sample_uids, reward_extra_infos_dict, sample_turns)
+
def init_workers(self):
"""Initialize distributed training workers using Ray backend.
diff --git a/verl/utils/dataset/rl_dataset.py b/verl/utils/dataset/rl_dataset.py
index bd8ba3f64bb..f773061b193 100644
--- a/verl/utils/dataset/rl_dataset.py
+++ b/verl/utils/dataset/rl_dataset.py
@@ -390,6 +390,57 @@ async def process_vision_info(
images, videos = process_vision_info(messages, image_patch_size=image_patch_size, return_video_metadata=True)
return images, videos
+ def split(self, num_splits: int):
+ """
+ split the dataset into num_splits sub-datasets
+ Args:
+ num_splits: specified number of splits
+ Returns:
+ List[RLHFDataset]: list of RLHFDataset splits
+ Raises:
+ ValueError: if num_splits is not a positive integer
+ """
+ if not isinstance(num_splits, int) or num_splits <= 0:
+ raise ValueError(f"num_splits must be a positive integer, got {num_splits}")
+
+ if not hasattr(self, "dataframe"):
+ raise AttributeError(
+ "dataframe not found in RLHFDataset\n"
+ "reason: _read_files_and_tokenize() not called or Parquet file loading failed"
+ )
+ if self.dataframe is None:
+ raise ValueError("RLHFDataset dataframe 为 None!")
+
+ total_samples = len(self.dataframe)
+ print(f"total_samples: {total_samples}")
+ if total_samples == 0:
+ raise ValueError("Cannot split an empty dataset")
+ if total_samples % num_splits != 0:
+ raise ValueError(f"Cannot split dataset size {total_samples} into {num_splits} splits")
+ split_size = total_samples // num_splits
+ splits = []
+
+ for i in range(num_splits):
+ start_idx = i * split_size
+ end_idx = (i + 1) * split_size if i < num_splits - 1 else total_samples
+
+ split_dataframe = self.dataframe.select(range(start_idx, end_idx))
+
+ split_dataset = RLHFDataset(
+ data_files=self.data_files,
+ tokenizer=self.tokenizer,
+ config=self.config,
+ processor=self.processor,
+ max_samples=self.max_samples,
+ )
+ split_dataset.dataframe = split_dataframe
+ split_dataset.serialize_dataset = self.serialize_dataset
+ split_dataset.original_data_files = self.original_data_files
+
+ splits.append(split_dataset)
+
+ return splits
+
def get_dataset_class(data_config: DictConfig):
"""Get RLHF dataset class.
diff --git a/verl/workers/rollout/vllm_rollout/vllm_async_server.py b/verl/workers/rollout/vllm_rollout/vllm_async_server.py
index 3285e61038d..af1725141fe 100644
--- a/verl/workers/rollout/vllm_rollout/vllm_async_server.py
+++ b/verl/workers/rollout/vllm_rollout/vllm_async_server.py
@@ -20,6 +20,7 @@
from concurrent.futures import Future
from pprint import pprint
from typing import Any, Callable, Optional
+from uuid import uuid4
import cloudpickle as pickle
import numpy as np
@@ -734,6 +735,7 @@ async def launch_servers(self):
if not self.is_reward_model
else f"vllm_server_reward_{self.replica_rank}_{node_rank}"
)
+ name = name + f"_{uuid4().hex[:8]}"
server = self.server_class.options(
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=node_id,