Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions docs/advance/fully_async.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ but the overlap in their time consumption reduces the end-to-end time consumptio
| `async_training.checkpoint_engine.enable` | Whether to use checkpoint_engine for accelerating, default `True` |
| `async_training.checkpoint_engine.overlap_broadcast_and_consume` | When use checkpoint_engine, whether to overlap broadcast and load_weights, default `False` |
| `async_training.checkpoint_engine.device_buffer_size_M` | When use checkpoint_engine, the user-specific bucket size (MB), default `4096` |
| `async_training.use_trainer_do_validate` | Whether use trainer node to do validate process, default `False`|

**Further Explanation:**

Expand Down Expand Up @@ -194,6 +195,13 @@ but the overlap in their time consumption reduces the end-to-end time consumptio
- When disable `overlap_broadcast_and_consume`, the additional device memory overhead of
trainer rank is `2 * bucket_size`and rollout rank is `1 * bucket_size`。

* `async_training.use_trainer_do_validate`

It controls whether to use the trainer's `do_validate` method for validation.
If set to True, the trainer will perform validation after each parameter update. It can reduce the validation time
overhead and trainer node idle time.
If set to False, the trainer will not perform validation.

### Supported Modes

1. on policy pipeline:
Expand Down Expand Up @@ -477,6 +485,35 @@ We tested the single-step parameter synchronization time of the checkpoint-engin
| Qwen3-235B-A22B | 64 | 64 | False | 58.57s |
| Qwen3-235B-A22B | 64 | 64 | True | 23.70s |

### use_trainer_do_validate Experiment

We tested the effect of setting `use_trainer_do_validate=True` on the training process. The results show that setting
this parameter to True can reduce the validation time overhead and trainer node idle time.
We used Qwen2.5-Math-7B to verify the benefits of `use_trainer_do_validate=True` on the training process, we achieved about 2x performance improvement on validation time, and the trainer node idle time is reduced by about 40%.

* Machine: H20
* Model: Qwen2.5-Math-7B
* Rollout length: max_response_length FSDP2: 10K tokens;
* Algorithm: DAPO
* Dataset: TRAIN_FILE: dapo-math-17k.parquet TEST_FILE: aime-2024.parquet
* Engine: vllm+FSDP2
* rollout.n: 16
* ppo_mini_batch_size: 32
* test_freq: 10

* fully_async_policy
* total_rollout_steps: 512*400
* require_batches: 4
* trigger_parameter_sync_step: 4
* staleness_threshold: 0.5
* partial_rollout: True

| training mode | resource allocation | step | gen | old_log_prob | update_actor | validate time | total time<br>50 step | acc/mean@2 |
|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|
| colocate sync | 16 | 484.623 | 52.939 | 0 | 430.263 | 205.080 | 7h9m | 22.6 |
| fully_async_policy | 8:8 | 489.953 | 52.622 | 0 | 435.874 | 95.699 | 7h2m | 21.0 |


## Multi-Turn Tool Calling

Referencing **recipe/retool** and **ToolAgentLoop**, we implemented **AsyncPartialToolAgentLoop**, a multi-turn
Expand Down
1 change: 0 additions & 1 deletion recipe
Submodule recipe deleted from 21892b
597 changes: 597 additions & 0 deletions recipe/fully_async_policy/README.md

Large diffs are not rendered by default.

517 changes: 517 additions & 0 deletions recipe/fully_async_policy/README_zh.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion verl/experimental/agent_loop/agent_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ def _init_agent_loop_workers(self):
node_id = node_ids[i % len(node_ids)]
self.agent_loop_workers.append(
self.agent_loop_workers_class.options(
name=f"agent_loop_worker_{i}",
name=f"agent_loop_worker_{i}" + f"_{uuid4().hex[:8]}",
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=node_id, soft=True
),
Expand Down
40 changes: 40 additions & 0 deletions verl/experimental/fully_async_policy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ but the overlap in their time consumption reduces the end-to-end time consumptio
| `async_training.checkpoint_engine.enable` | Whether to use checkpoint_engine for accelerating, default `True` |
| `async_training.checkpoint_engine.overlap_broadcast_and_consume` | When use checkpoint_engine, whether to overlap broadcast and load_weights, default `False` |
| `async_training.checkpoint_engine.device_buffer_size_M` | When use checkpoint_engine, the user-specific bucket size (MB), default `4096` |
| `async_training.use_trainer_do_validate` | Whether use trainer node to do validate process, default `False` |

**Further Explanation:**

Expand Down Expand Up @@ -194,6 +195,13 @@ but the overlap in their time consumption reduces the end-to-end time consumptio
- When disable `overlap_broadcast_and_consume`, the additional device memory overhead of
trainer rank is `2 * bucket_size`and rollout rank is `1 * bucket_size`。

- `async_training.use_trainer_do_validate`

It controls whether to use the trainer's `do_validate` method for validation.
If set to True, the trainer will perform validation after each parameter update. It can reduce the validation time
overhead and trainer node idle time.
If set to False, the trainer will not perform validation.

### Supported Modes

1. on policy pipeline:
Expand Down Expand Up @@ -477,6 +485,38 @@ We tested the single-step parameter synchronization time of the checkpoint-engin
| Qwen3-235B-A22B | 64 | 64 | False | 58.57s |
| Qwen3-235B-A22B | 64 | 64 | True | 23.70s |


### use_trainer_do_validate Experiment

We tested the effect of setting `use_trainer_do_validate=True` on the training process. The results show that setting
this parameter to True can reduce the validation time overhead and trainer node idle time.
We used Qwen2.5-Math-7B to verify the benefits of `use_trainer_do_validate=True` on the training process, we achieved about 2x performance improvement on validation time, and the trainer node idle time is reduced by about 40%.

- Machine: H20
- Model: Qwen2.5-Math-7B
- Rollout length: max_response_length FSDP2: 10K tokens;
- Algorithm: DAPO
- Dataset:
- TRAIN_FILE: dapo-math-17k.parquet
- TEST_FILE: aime-2024.parquet
- Engine: vllm+FSDP2
- rollout.n: 16
- ppo_mini_batch_size: 32
- test_freq: 10

- fully_async_policy
- total_rollout_steps: 512*400
- require_batches: 4
- trigger_parameter_sync_step: 4
- staleness_threshold: 0.5
- partial_rollout: True

| training mode | resource allocation | step | gen | old_log_prob | update_actor | validate time | total time<br>50 step | acc/mean@2 |
|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|
| colocate sync | 16 | 484.623 | 52.939 | 0 | 430.263 | 205.080 | 7h9m | 22.6 |
| fully_async_policy | 8:8 | 489.953 | 52.622 | 0 | 435.874 | 95.699 | 7h2m | 21.0 |
| fully_async_policy_opt_validate | 8:8 | | | 0 | | | | |

## Multi-Turn Tool Calling

Referencing **recipe/retool** and **ToolAgentLoop**, we implemented **AsyncPartialToolAgentLoop**, a multi-turn
Expand Down
38 changes: 37 additions & 1 deletion verl/experimental/fully_async_policy/README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ fully_async_policy 的整体架构如下图所示,fully_async_policy 主要由
| `async_training.checkpoint_engine.enable` | 是否开启 checkpoint_engine 模式的加速,默认值 True |
| `async_training.checkpoint_engine.overlap_broadcast_and_consume` | 启动 checkpoint_engine 时,是否在参数同步时在 broadcast 和加载之间使用流水,默认值 False |
| `async_training.checkpoint_engine.device_buffer_size_M` | 启动 checkpoint_engine 时,组装的 bucket 的大小(MB),默认为 4096 |

| `async_training.use_trainer_do_validate` | 是否使用 Trainer 的 do_validate 方法进行 validation,默认值 False |
**进一步的解释:**

- `rollout.total_rollout_steps`
Expand Down Expand Up @@ -155,6 +155,12 @@ require_batches * ppo_mini_batch_size)`。
- 在开启`overlap_broadcast_and_consume`时,trainer 节点的临时额外显存开销为 `3 * bucket_size`, rollout 节点的临时额外显存开销为`2 * bucket_size`。
- 在关闭`overlap_broadcast_and_consume`时,trainer 节点的临时额外显存开销为 `2 * bucket_size`, rollout 节点的临时额外显存开销为`1 * bucket_size`。

- `async_training.use_trainer_do_validate`

控制是否使用trainer的 `do_validate` 方法进行 validation 。
如果设置为 True,trainer 会在每次参数更新后,调用 `do_validate` 方法进行 validation。
如果设置为 False,trainer 不会调用 `do_validate` 方法。

### 模式支持

1. on policy pipeline:
Expand Down Expand Up @@ -407,6 +413,36 @@ GPU 数量整除,这使得资源调整的灵活性受到影响。此外,随
| Qwen3-235B-A22B | 64 | 64 | False | 58.57s |
| Qwen3-235B-A22B | 64 | 64 | True | 23.70s |

### use_trainer_do_validate 实验测试

我们在Qwen2.5-Math-7B模型上测试了 `use_trainer_do_validate` 参数的影响。这个结果展示使用 `use_trainer_do_validate=True` 可以减少验证时间开销,并且训练器节点的空闲时间也减少了。

- Machine: H20
- Model: Qwen2.5-Math-7B
- Rollout length: max_response_length FSDP2: 10K tokens;
- Algorithm: DAPO
- Dataset:
- TRAIN_FILE: dapo-math-17k.parquet
- TEST_FILE: aime-2024.parquet
- Engine: vllm+FSDP2
- rollout.n: 16
- ppo_mini_batch_size: 32
- test_freq: 10

- fully_async_policy
- total_rollout_steps: 512*400
- require_batches: 4
- trigger_parameter_sync_step: 4
- staleness_threshold: 0.5
- partial_rollout: True

| training mode | resource allocation | step | gen | old_log_prob | update_actor | validate time | total time<br>50 step | acc/mean@2 |
|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|
| colocate sync | 16 | 484.623 | 52.939 | 0 | 430.263 | 205.080 | 7h9m | 22.6 |
| fully_async_policy | 8:8 | 489.953 | 52.622 | 0 | 435.874 | 95.699 | 7h2m | 21.0 |
| fully_async_policy_opt_validate | 8:8 | | | 0 | | | | |


## 多轮工具调用

参考 **recipe/retool** 和 **ToolAgentLoop**,我们为 **fully_async_policy** 实现了支持 partial rollout 的多轮工具调用循环 \*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ async_training:
# compute_prox_log_prob
compute_prox_log_prob: False

# whether to use trainer do_validate
use_trainer_do_validate: False


# checkpoint_engine config for accelerating parameter synchronization between rollouter and trainer
checkpoint_engine:
# Whether to use checkpoint_engine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ async_training:
# compute_prox_log_prob
compute_prox_log_prob: False

# whether to use trainer do_validate
use_trainer_do_validate: False


# checkpoint_engine config for accelerating parameter synchronization between rollouter and trainer
checkpoint_engine:
# Whether to use checkpoint_engine
Expand Down
6 changes: 3 additions & 3 deletions verl/experimental/fully_async_policy/fsdp_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def sync_rollout_weights(self, sync_group_name="actor_rollout"):
if self._is_actor and self._is_offload_param:
load_fsdp_model_to_gpu(self.actor_module_fsdp)
params = self._get_actor_params() if self._is_actor else None
if self._is_rollout:
if self._is_rollout and (not self._is_actor):
inference_model = get_inference_model(self.rollout)

from verl.utils.vllm.patch import patch_vllm_moe_model_weight_loader
Expand All @@ -107,7 +107,7 @@ def sync_rollout_weights(self, sync_group_name="actor_rollout"):
from ray.util.collective import collective

collective.broadcast(tensor, src_rank=0, group_name=sync_group_name)
if self._is_rollout:
if self._is_rollout and (not self._is_actor):
inference_model.load_weights([(key, tensor)])

if self._is_actor and self._is_offload_param:
Expand Down Expand Up @@ -159,7 +159,7 @@ def sync_rollout_weights_by_checkpoint(self, sync_group_name="actor_rollout"):
update_start_time = time.time()

inference_model = None
if self._is_rollout:
if self._is_rollout and (not self._is_actor):
inference_model = get_inference_model(self.rollout)
from verl.utils.vllm.patch import patch_vllm_moe_model_weight_loader

Expand Down
15 changes: 11 additions & 4 deletions verl/experimental/fully_async_policy/fully_async_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ def create_resource_pool_manager(config, roles: list) -> ResourcePoolManager:
mapping = {}

# Actor/Critic resource pool
if any(role in roles for role in [Role.Actor, Role.Critic, Role.RefPolicy, Role.RewardModel]):
if any(role in roles for role in [Role.Actor, Role.ActorRollout, Role.Critic, Role.RefPolicy, Role.RewardModel]):
assert config.trainer.n_gpus_per_node > 0, "config.trainer.n_gpus_per_node must be greater than 0"
assert config.trainer.nnodes > 0, "config.trainer.nnodes must be greater than 0"

trainer_pool = [config.trainer.n_gpus_per_node] * config.trainer.nnodes
resource_pool_spec["trainer_pool"] = trainer_pool

# Map training-related roles to the same resource pool
for role in [Role.Actor, Role.Critic, Role.RefPolicy, Role.RewardModel]:
for role in [Role.Actor, Role.ActorRollout, Role.Critic, Role.RefPolicy, Role.RewardModel]:
if role in roles:
mapping[role] = "trainer_pool"

Expand Down Expand Up @@ -104,8 +104,9 @@ def create_role_worker_mapping(config):
else:
raise NotImplementedError(f"Unsupported strategy: {config.actor_rollout_ref.actor.strategy}")

train_role = Role.ActorRollout if config.async_training.use_trainer_do_validate else Role.Actor
role_worker_mapping = {
Role.Actor: ray.remote(DetachActorWorker),
train_role: ray.remote(DetachActorWorker),
Role.Rollout: ray.remote(DetachAsyncRolloutWorker),
Role.Critic: ray.remote(CriticWorker),
}
Expand Down Expand Up @@ -207,7 +208,13 @@ def _initialize_components(self, config) -> None:
# param_version resume from ckpt or default 0
param_version = ray.get(self.components["trainer"].load_checkpoint.remote())
ray.get(self.components["rollouter"].load_checkpoint.remote())
ray.get(param_synchronizer.sync_weights.remote(version=param_version, validate=val_before_train))
ray.get(
param_synchronizer.sync_weights.remote(
version=param_version,
validate=val_before_train,
use_trainer_do_validate=config.async_training.use_trainer_do_validate,
)
)
ray.get(param_synchronizer.wait_last_valid.remote())

self.components["param_synchronizer"] = param_synchronizer
Expand Down
17 changes: 15 additions & 2 deletions verl/experimental/fully_async_policy/fully_async_rollouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ def __init__(
train_sampler = create_rl_sampler(config.data, train_dataset)

self._validate_config()
if self.config.async_training.use_trainer_do_validate:
rollout_gpus = config.rollout.nnodes * config.rollout.n_gpus_per_node
train_gpus = config.trainer.nnodes * config.trainer.n_gpus_per_node
total_gpus = rollout_gpus + train_gpus
print(f"[FullyAsyncRollouter] split before val_dataset total len: {len(val_dataset)}")
split_dataset = val_dataset.split(total_gpus)
rollout_val_dataset0 = split_dataset[:rollout_gpus]
from torch.utils.data import ConcatDataset

val_dataset = ConcatDataset(rollout_val_dataset0)
print(f"[FullyAsyncRollouter] split after val_dataset total len: {len(val_dataset)}")
print(f"[FullyAsyncRollouter] Rollouter _create_dataloader...\n{train_dataset}\n{val_dataset}")

self._create_dataloader(train_dataset, val_dataset, collate_fn, train_sampler)
Expand Down Expand Up @@ -207,7 +218,9 @@ def get_max_queue_size(self):
def get_total_train_steps(self):
return self.total_train_steps

async def update_param_version(self, version: int, validate: bool = False, global_steps: int = 0):
async def update_param_version(
self, version: int, validate: bool = False, global_steps: int = 0, use_trainer_do_validate: bool = False
):
"""Update current parameter version"""
async with self.lock:
old_version = self.current_param_version
Expand Down Expand Up @@ -240,7 +253,7 @@ async def update_param_version(self, version: int, validate: bool = False, globa
and self.current_param_version > 0 # don't test here in the initial parameter sync
) or (validate and self.val_reward_fn is not None):
with marked_timer("rollouter/validate_time", timing_raw, color="green"):
val_metrics: dict = self._validate()
val_metrics: dict = self._validate(use_trainer_do_validate)
data = ValidateMetrics(
timing_raw=timing_raw, metrics=val_metrics, global_steps=global_steps, param_version=version
)
Expand Down
Loading
Loading