From bee08c83610286b46421227dd49aad8e31cf666b Mon Sep 17 00:00:00 2001 From: HappyAngel Date: Fri, 26 Dec 2025 20:27:18 +0800 Subject: [PATCH 01/11] feat: add validate process on trainer node when use_trainer_do_validate=true --- verl/experimental/agent_loop/agent_loop.py | 2 +- .../fully_async_ppo_megatron_trainer.yaml | 4 + .../config/fully_async_ppo_trainer.yaml | 4 + .../fully_async_policy/fsdp_workers.py | 6 +- .../fully_async_policy/fully_async_main.py | 15 ++- .../fully_async_rollouter.py | 17 ++- .../fully_async_policy/fully_async_trainer.py | 119 +++++++++++++++--- .../fully_async_policy/param_sync.py | 17 ++- verl/trainer/ppo/ray_trainer.py | 26 +++- verl/utils/dataset/rl_dataset.py | 53 ++++++++ .../rollout/vllm_rollout/vllm_async_server.py | 1 + 11 files changed, 236 insertions(+), 28 deletions(-) diff --git a/verl/experimental/agent_loop/agent_loop.py b/verl/experimental/agent_loop/agent_loop.py index e5d210ecf0a..09c47f7f888 100644 --- a/verl/experimental/agent_loop/agent_loop.py +++ b/verl/experimental/agent_loop/agent_loop.py @@ -923,7 +923,7 @@ def _init_agent_loop_workers(self): node_id = node_ids[i % len(node_ids)] self.agent_loop_workers.append( self.agent_loop_workers_class.options( - name=f"agent_loop_worker_{i}", + name=f"agent_loop_worker_{i}" + f"_{os.getpid()}", scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy( node_id=node_id, soft=True ), diff --git a/verl/experimental/fully_async_policy/config/fully_async_ppo_megatron_trainer.yaml b/verl/experimental/fully_async_policy/config/fully_async_ppo_megatron_trainer.yaml index c0252390e54..85b8307ee0c 100644 --- a/verl/experimental/fully_async_policy/config/fully_async_ppo_megatron_trainer.yaml +++ b/verl/experimental/fully_async_policy/config/fully_async_ppo_megatron_trainer.yaml @@ -27,6 +27,10 @@ async_training: # compute_prox_log_prob compute_prox_log_prob: False + # whether to use trainer do_validate + use_trainer_do_validate: False + + # checkpoint_engine config for accelerating parameter synchronization between rollouter and trainer checkpoint_engine: # Whether to use checkpoint_engine diff --git a/verl/experimental/fully_async_policy/config/fully_async_ppo_trainer.yaml b/verl/experimental/fully_async_policy/config/fully_async_ppo_trainer.yaml index 21562de54e7..c5692b4a931 100644 --- a/verl/experimental/fully_async_policy/config/fully_async_ppo_trainer.yaml +++ b/verl/experimental/fully_async_policy/config/fully_async_ppo_trainer.yaml @@ -27,6 +27,10 @@ async_training: # compute_prox_log_prob compute_prox_log_prob: False + # whether to use trainer do_validate + use_trainer_do_validate: False + + # checkpoint_engine config for accelerating parameter synchronization between rollouter and trainer checkpoint_engine: # Whether to use checkpoint_engine diff --git a/verl/experimental/fully_async_policy/fsdp_workers.py b/verl/experimental/fully_async_policy/fsdp_workers.py index c23ca4db3f2..96ab922b4fc 100644 --- a/verl/experimental/fully_async_policy/fsdp_workers.py +++ b/verl/experimental/fully_async_policy/fsdp_workers.py @@ -89,7 +89,7 @@ def sync_rollout_weights(self, sync_group_name="actor_rollout"): if self._is_actor and self._is_offload_param: load_fsdp_model_to_gpu(self.actor_module_fsdp) params = self._get_actor_params() if self._is_actor else None - if self._is_rollout: + if self._is_rollout and (not self._is_actor): inference_model = get_inference_model(self.rollout) from verl.utils.vllm.patch import patch_vllm_moe_model_weight_loader @@ -107,7 +107,7 @@ def sync_rollout_weights(self, sync_group_name="actor_rollout"): from ray.util.collective import collective collective.broadcast(tensor, src_rank=0, group_name=sync_group_name) - if self._is_rollout: + if self._is_rollout and (not self._is_actor): inference_model.load_weights([(key, tensor)]) if self._is_actor and self._is_offload_param: @@ -159,7 +159,7 @@ def sync_rollout_weights_by_checkpoint(self, sync_group_name="actor_rollout"): update_start_time = time.time() inference_model = None - if self._is_rollout: + if self._is_rollout and (not self._is_actor): inference_model = get_inference_model(self.rollout) from verl.utils.vllm.patch import patch_vllm_moe_model_weight_loader diff --git a/verl/experimental/fully_async_policy/fully_async_main.py b/verl/experimental/fully_async_policy/fully_async_main.py index 31465286fe3..753e5a82e8d 100644 --- a/verl/experimental/fully_async_policy/fully_async_main.py +++ b/verl/experimental/fully_async_policy/fully_async_main.py @@ -45,7 +45,7 @@ def create_resource_pool_manager(config, roles: list) -> ResourcePoolManager: mapping = {} # Actor/Critic resource pool - if any(role in roles for role in [Role.Actor, Role.Critic, Role.RefPolicy, Role.RewardModel]): + if any(role in roles for role in [Role.Actor, Role.ActorRollout, Role.Critic, Role.RefPolicy, Role.RewardModel]): assert config.trainer.n_gpus_per_node > 0, "config.trainer.n_gpus_per_node must be greater than 0" assert config.trainer.nnodes > 0, "config.trainer.nnodes must be greater than 0" @@ -53,7 +53,7 @@ def create_resource_pool_manager(config, roles: list) -> ResourcePoolManager: resource_pool_spec["trainer_pool"] = trainer_pool # Map training-related roles to the same resource pool - for role in [Role.Actor, Role.Critic, Role.RefPolicy, Role.RewardModel]: + for role in [Role.Actor, Role.ActorRollout, Role.Critic, Role.RefPolicy, Role.RewardModel]: if role in roles: mapping[role] = "trainer_pool" @@ -104,8 +104,9 @@ def create_role_worker_mapping(config): else: raise NotImplementedError(f"Unsupported strategy: {config.actor_rollout_ref.actor.strategy}") + train_role = Role.ActorRollout if config.async_training.use_trainer_do_validate else Role.Actor role_worker_mapping = { - Role.Actor: ray.remote(DetachActorWorker), + train_role: ray.remote(DetachActorWorker), Role.Rollout: ray.remote(DetachAsyncRolloutWorker), Role.Critic: ray.remote(CriticWorker), } @@ -207,7 +208,13 @@ def _initialize_components(self, config) -> None: # param_version resume from ckpt or default 0 param_version = ray.get(self.components["trainer"].load_checkpoint.remote()) ray.get(self.components["rollouter"].load_checkpoint.remote()) - ray.get(param_synchronizer.sync_weights.remote(version=param_version, validate=val_before_train)) + ray.get( + param_synchronizer.sync_weights.remote( + version=param_version, + validate=val_before_train, + use_trainer_do_validate=config.async_training.use_trainer_do_validate, + ) + ) ray.get(param_synchronizer.wait_last_valid.remote()) self.components["param_synchronizer"] = param_synchronizer diff --git a/verl/experimental/fully_async_policy/fully_async_rollouter.py b/verl/experimental/fully_async_policy/fully_async_rollouter.py index 59ddd54bef3..8dc3b3265a0 100644 --- a/verl/experimental/fully_async_policy/fully_async_rollouter.py +++ b/verl/experimental/fully_async_policy/fully_async_rollouter.py @@ -102,6 +102,17 @@ def __init__( train_sampler = create_rl_sampler(config.data, train_dataset) self._validate_config() + if self.config.async_training.use_trainer_do_validate: + rollout_gpus = config.rollout.nnodes * config.rollout.n_gpus_per_node + train_gpus = config.trainer.nnodes * config.trainer.n_gpus_per_node + total_gpus = rollout_gpus + train_gpus + print(f"[FullyAsyncRollouter] split before val_dataset total len: {len(val_dataset)}") + split_dataset = val_dataset.split(total_gpus) + rollout_val_dataset0 = split_dataset[:rollout_gpus] + from torch.utils.data import ConcatDataset + + val_dataset = ConcatDataset(rollout_val_dataset0) + print(f"[FullyAsyncRollouter] split after val_dataset total len: {len(val_dataset)}") print(f"[FullyAsyncRollouter] Rollouter _create_dataloader...\n{train_dataset}\n{val_dataset}") self._create_dataloader(train_dataset, val_dataset, collate_fn, train_sampler) @@ -207,7 +218,9 @@ def get_max_queue_size(self): def get_total_train_steps(self): return self.total_train_steps - async def update_param_version(self, version: int, validate: bool = False, global_steps: int = 0): + async def update_param_version( + self, version: int, validate: bool = False, global_steps: int = 0, use_trainer_do_validate: bool = False + ): """Update current parameter version""" async with self.lock: old_version = self.current_param_version @@ -240,7 +253,7 @@ async def update_param_version(self, version: int, validate: bool = False, globa and self.current_param_version > 0 # don't test here in the initial parameter sync ) or (validate and self.val_reward_fn is not None): with marked_timer("rollouter/validate_time", timing_raw, color="green"): - val_metrics: dict = self._validate() + val_metrics: dict = self._validate(use_trainer_do_validate) data = ValidateMetrics( timing_raw=timing_raw, metrics=val_metrics, global_steps=global_steps, param_version=version ) diff --git a/verl/experimental/fully_async_policy/fully_async_trainer.py b/verl/experimental/fully_async_policy/fully_async_trainer.py index e4bda8af341..03c116b5f32 100644 --- a/verl/experimental/fully_async_policy/fully_async_trainer.py +++ b/verl/experimental/fully_async_policy/fully_async_trainer.py @@ -104,6 +104,8 @@ def __init__( self.progress_bar = None self.trigger_parameter_sync_step = config.async_training.trigger_parameter_sync_step self.last_ckpt_version = 0 + self.train_val_metrics = None + self.train_role = Role.ActorRollout if config.async_training.use_trainer_do_validate else Role.Actor # required_samples use ppo_mini_batch_size*require_batches as the minimum number of samples. self.require_batches = config.async_training.require_batches @@ -115,6 +117,37 @@ def __init__( ) self.metrics_aggregator = MetricsAggregator(total_gpus=total_gpus) + # use trainer to do validation + if self.config.async_training.use_trainer_do_validate: + from verl.trainer.main_ppo import create_rl_dataset + from verl.utils.dataset.rl_dataset import collate_fn + + val_dataset = create_rl_dataset(config.data.val_files, config.data, tokenizer, processor) + rollout_gpus = config.rollout.nnodes * config.rollout.n_gpus_per_node + print(f"[FullyAsyncTrainer] split before val_dataset total len: {len(val_dataset)}") + split_dataset = val_dataset.split(total_gpus) + rollout_val_dataset0 = split_dataset[rollout_gpus:] + from torch.utils.data import ConcatDataset + + val_dataset = ConcatDataset(rollout_val_dataset0) + print(f"[FullyAsyncTrainer] split after val_dataset total len: {len(val_dataset)}") + self.val_dataset = val_dataset + # update val_dataloader + val_batch_size = self.config.data.val_batch_size # Prefer config value if set + if val_batch_size is None: + val_batch_size = len(val_dataset) + from torchdata.stateful_dataloader import StatefulDataLoader + + print(f"[FullyAsyncTrainer] create val_dataloader with batch_size: {val_batch_size}") + self.val_dataloader = StatefulDataLoader( + dataset=val_dataset, + batch_size=val_batch_size, + num_workers=self.config.data["dataloader_num_workers"], + shuffle=self.config.data.get("validation_shuffle", True), + drop_last=False, + collate_fn=collate_fn, + ) + def set_message_queue_client(self, message_queue_client: MessageQueueClient): """Set message queue client""" self.message_queue_client = message_queue_client @@ -192,7 +225,7 @@ def _get_samples_from_queue(self) -> tuple[None, None] | tuple[int, Any]: def _create_actor_rollout_classes(self): # create actor - for role in [Role.Actor]: + for role in [self.train_role]: resource_pool = self.resource_pool_manager.get_resource_pool(role) role_cls = RayClassWithInitArgs( cls=self.role_worker_mapping[role], @@ -214,14 +247,41 @@ def _init_models(self): self.rm_wg = self.all_wg[str(Role.RewardModel)] self.rm_wg.init_model() - self.actor_wg = self.all_wg[str(Role.Actor)] + self.actor_wg = self.all_wg[str(self.train_role)] self.actor_wg.init_model() self.actor_rollout_wg = self.actor_wg # to be compatible with the functions that not be modified - def _init_async_rollout_manager(self): - pass + async def init_workers(self): + """Initialize distributed training workers using Ray backend. + Creates: + 1. Ray resource pools from configuration + 2. Worker groups for each role (actor, critic, etc.) + """ + # self._init_async_objects() + self._init_resource_pools() + self._create_worker_classes() + self._init_worker_groups() + self._init_models() + await self._init_async_rollout_manager() + + async def _init_async_rollout_manager(self): + # use async rollout do validate + if self.config.async_training.use_trainer_do_validate: + print(f"[FullyAsyncTrainer] use_trainer_do_validate: {self.config.async_training.use_trainer_do_validate}") + assert self.config.actor_rollout_ref.rollout.mode == "async" + self.async_rollout_mode = True + print("[FullyAsyncTrainer] Init async rollout manager") + from recipe.fully_async_policy.agent_loop import FullyAsyncAgentLoopManager + + self.async_rollout_manager = await FullyAsyncAgentLoopManager.create( + config=self.config, worker_group=self.actor_rollout_wg + ) + print("[FullyAsyncTrainer] async_rollout_manager sleep") + await self.async_rollout_manager.sleep() + else: + print("[FullyAsyncTrainer] Skip async rollout manager (use_trainer_do_validate=False)") - def fit(self): + async def fit(self): """ The training loop of PPO. The driver process only need to call the compute functions of the worker group through RPC @@ -277,7 +337,7 @@ def fit(self): f"trigger_parameter_sync_step: {self.trigger_parameter_sync_step} " f"{time_str}" ) - self._trigger_parameter_sync_after_step(global_steps=self.global_steps) + await self._trigger_parameter_sync_after_step(global_steps=self.global_steps) self._log_validation_data() self._check_save_checkpoint(timing_raw) self.global_steps += 1 @@ -288,7 +348,7 @@ def fit(self): self._log_validation_data() # 2. perform addtional parameter_sync and validate if trainer already updated if self.current_param_version % self.config.rollout.test_freq != 0 or self.local_trigger_step > 1: - self._trigger_parameter_sync_after_step(validate=True, global_steps=self.global_steps) + await self._trigger_parameter_sync_after_step(validate=True, global_steps=self.global_steps) ray.get(self.param_synchronizer.wait_last_valid.remote()) self._log_validation_data() self.progress_bar.close() @@ -459,7 +519,7 @@ def _collect_metrics_from_samples(self, batch, metrics): if key.startswith("fully_async") or key.startswith("timing_s"): metrics[key] = value - def _trigger_parameter_sync_after_step(self, validate: bool = False, global_steps: int = None): + async def _trigger_parameter_sync_after_step(self, validate: bool = False, global_steps: int = None): """ Trigger parameter synchronization after training step This ensures rollouter always uses the latest trained parameters @@ -482,9 +542,25 @@ def _trigger_parameter_sync_after_step(self, validate: bool = False, global_step with marked_timer("timing_s/param_sync", timing_param_sync): ray.get( self.param_synchronizer.sync_weights.remote( - self.current_param_version, validate=validate, global_steps=global_steps + self.current_param_version, + validate=validate, + global_steps=global_steps, + use_trainer_do_validate=self.config.async_training.use_trainer_do_validate, ) ) + + # do trainer validate + do_validate_param = ( + self.config.rollout.test_freq > 0 + and self.current_param_version % self.config.rollout.test_freq == 0 + and self.current_param_version > 0 + ) + print(f"do_validate_param: {do_validate_param}") + if do_validate_param and self.reward_fn is not None and self.config.async_training.use_trainer_do_validate: + print(f"[FullyAsyncTrainer] validate param version: {self.current_param_version}") + await self._validate_process() + else: + self.train_val_metrics = None self.logger.log(data=timing_param_sync, step=self.current_param_version) def _log_validation_data(self): @@ -496,10 +572,23 @@ def _log_validation_data(self): return val_metrics: ValidateMetrics = ray.cloudpickle.loads(val_data) - if val_metrics.metrics: - self.logger.log(data=val_metrics.metrics, step=val_metrics.param_version) - pprint( - f"[FullyAsyncTrainer] parameter version: {val_metrics.param_version} " - f"Validation metrics: {val_metrics.metrics}" - ) + if self.train_val_metrics and self.config.async_training.use_trainer_do_validate: + # merge info + timing_param_sync = {} + with marked_timer("timing_s/merge_val", timing_param_sync): + new_metrics = self._merge_validation_results(self.train_val_metrics, val_metrics.metrics) + if new_metrics: + self.logger.log(data=new_metrics, step=val_metrics.param_version) + pprint( + f"[FullyAsyncTrainer] parameter version: {val_metrics.param_version} " + f"Validation metrics: {new_metrics}, timing_param_sync: {timing_param_sync['timing_s/merge_val']}" + ) + self.logger.log(data=val_metrics.timing_raw, step=val_metrics.param_version) + else: + if val_metrics.metrics: + self.logger.log(data=val_metrics.metrics, step=val_metrics.param_version) + pprint( + f"[FullyAsyncTrainer] parameter version: {val_metrics.param_version} " + f"Validation metrics: {val_metrics.metrics}" + ) self.logger.log(data=val_metrics.timing_raw, step=val_metrics.param_version) diff --git a/verl/experimental/fully_async_policy/param_sync.py b/verl/experimental/fully_async_policy/param_sync.py index a31ee31212c..b795000be04 100644 --- a/verl/experimental/fully_async_policy/param_sync.py +++ b/verl/experimental/fully_async_policy/param_sync.py @@ -45,6 +45,7 @@ def __init__(self, config, trainer, rollouter, mq): self.sync_group_name = "actor_rollout" self.wait_last_update = None self.wait_last_resume = None + self.validate_task = None # Statistics self.current_version = 0 @@ -94,7 +95,7 @@ def _init_actor_rollout_checkpoint_engine(self): ) ) - def sync_weights(self, version, validate=False, global_steps=0): + def sync_weights(self, version, validate=False, global_steps=0, use_trainer_do_validate=False): """Sync weights between trainer and rollouter, and update parameter version""" start_time = time.time() @@ -119,8 +120,18 @@ def sync_weights(self, version, validate=False, global_steps=0): f"[ParameterSynchronizer] sync_weights success. cost {end_time - start_time:.2f} seconds, " f"pause:{pause_time - start_time:.2f}s, sync:{end_time - pause_time:.2f}s" ) + + # async train do validate + print(f"[ParameterSynchronizer] validate: {validate}, use_trainer_do_validate: {use_trainer_do_validate}") + if validate and use_trainer_do_validate: + print("[ParameterSynchronizer] use trainer to do validate") + self.validate_task = self.trainer._validate_process.remote() + else: + self.validate_task = None # Async Update rollout version & validation - self.wait_last_update = self.rollouter.update_param_version.remote(version, validate, global_steps) + self.wait_last_update = self.rollouter.update_param_version.remote( + version, validate, global_steps, use_trainer_do_validate + ) self.wait_last_resume = self.rollouter.resume.remote(self.wait_last_update) def wait_last_valid(self): @@ -130,6 +141,8 @@ def wait_last_valid(self): ray.get(self.wait_last_update) if self.wait_last_resume: ray.get(self.wait_last_resume) + if self.validate_task: + ray.get(self.validate_task) print(f"[ParameterSynchronizer] Wait last validate cost: {time.time() - start_time:.2f} seconds") def rollouter_save_checkpoint(self, local_global_step_folder: str): diff --git a/verl/trainer/ppo/ray_trainer.py b/verl/trainer/ppo/ray_trainer.py index 4ee4511d77e..d2497aba3b9 100644 --- a/verl/trainer/ppo/ray_trainer.py +++ b/verl/trainer/ppo/ray_trainer.py @@ -605,7 +605,7 @@ def _get_gen_batch(self, batch: DataProto) -> DataProto: return gen_batch - def _validate(self): + def _validate(self, merged: bool = False): data_source_lst = [] reward_extra_infos_dict: dict[str, list] = defaultdict(list) @@ -721,8 +721,18 @@ def _validate(self): for key_info, lst in reward_extra_infos_dict.items(): assert len(lst) == 0 or len(lst) == len(sample_scores), f"{key_info}: {len(lst)=}, {len(sample_scores)=}" + if merged: + print("_merge_validation_results validate result will be merged") + return { + "data_sources": data_source_lst, + "sample_uids": sample_uids, + "sample_turns": sample_turns, + "reward_extra_infos_dict": reward_extra_infos_dict, + } data_sources = np.concatenate(data_source_lst, axis=0) + return self._val_metrics_update(data_sources, sample_uids, reward_extra_infos_dict, sample_turns) + def _val_metrics_update(self, data_sources, sample_uids, reward_extra_infos_dict, sample_turns): data_src2var2metric2val = process_validation_metrics(data_sources, sample_uids, reward_extra_infos_dict) metric_dict = {} for data_source, var2metric2val in data_src2var2metric2val.items(): @@ -749,6 +759,20 @@ def _validate(self): return metric_dict + def _merge_validation_results(self, result_a, result_b): + if result_a is None and result_b is None: + raise NotImplementedError("result_a and result_b are None, this is not expected") + data_sources = np.concatenate(result_a["data_sources"] + result_b["data_sources"], axis=0) + sample_uids = result_a["sample_uids"] + result_b["sample_uids"] + sample_turns = result_a["sample_turns"] + result_b["sample_turns"] + reward_extra_infos_dict = {} + for key in result_a["reward_extra_infos_dict"].keys(): + reward_extra_infos_dict[key] = ( + result_a["reward_extra_infos_dict"][key] + result_b["reward_extra_infos_dict"][key] + ) + + return self._val_metrics_update(data_sources, sample_uids, reward_extra_infos_dict, sample_turns) + def init_workers(self): """Initialize distributed training workers using Ray backend. diff --git a/verl/utils/dataset/rl_dataset.py b/verl/utils/dataset/rl_dataset.py index bd8ba3f64bb..8e03b546012 100644 --- a/verl/utils/dataset/rl_dataset.py +++ b/verl/utils/dataset/rl_dataset.py @@ -418,3 +418,56 @@ def get_dataset_class(data_config: DictConfig): print(f"Using dataset class: {dataset_cls.__name__}") return dataset_cls + return self.__dict__.copy() + + def split(self, num_splits: int): + """ + split the dataset into num_splits sub-datasets + Args: + num_splits: specified number of splits + Returns: + List[RLHFDataset]: list of RLHFDataset splits + Raises: + ValueError: if num_splits is not a positive integer + """ + if not isinstance(num_splits, int) or num_splits <= 0: + raise ValueError(f"num_splits must be a positive integer, got {num_splits}") + + if not hasattr(self, "dataframe"): + raise AttributeError( + "dataframe not found in RLHFDataset\n" + "reason: _read_files_and_tokenize() not called or Parquet file loading failed" + ) + if self.dataframe is None: + raise ValueError("RLHFDataset dataframe 为 None!") + + total_samples = len(self.dataframe) + print(f"total_samples: {total_samples}") + if total_samples == 0 or total_samples % num_splits != 0: + raise ValueError( + f"Cannot split empty dataset or dataset size {total_samples} is not divisible by num_splits {num_splits}" + ) + + split_size = total_samples // num_splits + splits = [] + + for i in range(num_splits): + start_idx = i * split_size + end_idx = (i + 1) * split_size if i < num_splits - 1 else total_samples + + split_dataframe = self.dataframe.select(range(start_idx, end_idx)) + + split_dataset = RLHFDataset( + data_files=self.data_files, + tokenizer=self.tokenizer, + config=self.config, + processor=self.processor, + max_samples=self.max_samples, + ) + split_dataset.dataframe = split_dataframe + split_dataset.serialize_dataset = self.serialize_dataset + split_dataset.original_data_files = self.original_data_files + + splits.append(split_dataset) + + return splits diff --git a/verl/workers/rollout/vllm_rollout/vllm_async_server.py b/verl/workers/rollout/vllm_rollout/vllm_async_server.py index 3285e61038d..3ff6a41c38f 100644 --- a/verl/workers/rollout/vllm_rollout/vllm_async_server.py +++ b/verl/workers/rollout/vllm_rollout/vllm_async_server.py @@ -734,6 +734,7 @@ async def launch_servers(self): if not self.is_reward_model else f"vllm_server_reward_{self.replica_rank}_{node_rank}" ) + name = name + f"_{os.getpid()}" server = self.server_class.options( scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy( node_id=node_id, From 9335b79abf64981aaffbe98041d83055064c721e Mon Sep 17 00:00:00 2001 From: HappyAngel Date: Fri, 26 Dec 2025 20:39:06 +0800 Subject: [PATCH 02/11] Refactor usability tests. Co-authored-by: Shangwei-Li --- verl/utils/dataset/rl_dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/verl/utils/dataset/rl_dataset.py b/verl/utils/dataset/rl_dataset.py index 8e03b546012..477d37ba9c0 100644 --- a/verl/utils/dataset/rl_dataset.py +++ b/verl/utils/dataset/rl_dataset.py @@ -445,7 +445,8 @@ def split(self, num_splits: int): print(f"total_samples: {total_samples}") if total_samples == 0 or total_samples % num_splits != 0: raise ValueError( - f"Cannot split empty dataset or dataset size {total_samples} is not divisible by num_splits {num_splits}" + f"Cannot split empty dataset or dataset size {total_samples}" + f" is not divisible by num_splits {num_splits}" ) split_size = total_samples // num_splits From 59b9a17e40f132fcde8e1fa31a35e2bc3dea7bf4 Mon Sep 17 00:00:00 2001 From: HappyAngel Date: Fri, 26 Dec 2025 20:57:49 +0800 Subject: [PATCH 03/11] fix: fix format according to suggestion --- verl/experimental/agent_loop/agent_loop.py | 2 +- verl/trainer/ppo/ray_trainer.py | 35 ++++++++++++------- verl/utils/dataset/rl_dataset.py | 8 ++--- .../rollout/vllm_rollout/vllm_async_server.py | 2 +- 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/verl/experimental/agent_loop/agent_loop.py b/verl/experimental/agent_loop/agent_loop.py index 09c47f7f888..75ffcffa626 100644 --- a/verl/experimental/agent_loop/agent_loop.py +++ b/verl/experimental/agent_loop/agent_loop.py @@ -923,7 +923,7 @@ def _init_agent_loop_workers(self): node_id = node_ids[i % len(node_ids)] self.agent_loop_workers.append( self.agent_loop_workers_class.options( - name=f"agent_loop_worker_{i}" + f"_{os.getpid()}", + name=f"agent_loop_worker_{i}" + f"_{uuid.uuid4().hex[:8]}", scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy( node_id=node_id, soft=True ), diff --git a/verl/trainer/ppo/ray_trainer.py b/verl/trainer/ppo/ray_trainer.py index d2497aba3b9..2768e287466 100644 --- a/verl/trainer/ppo/ray_trainer.py +++ b/verl/trainer/ppo/ray_trainer.py @@ -760,19 +760,28 @@ def _val_metrics_update(self, data_sources, sample_uids, reward_extra_infos_dict return metric_dict def _merge_validation_results(self, result_a, result_b): - if result_a is None and result_b is None: - raise NotImplementedError("result_a and result_b are None, this is not expected") - data_sources = np.concatenate(result_a["data_sources"] + result_b["data_sources"], axis=0) - sample_uids = result_a["sample_uids"] + result_b["sample_uids"] - sample_turns = result_a["sample_turns"] + result_b["sample_turns"] - reward_extra_infos_dict = {} - for key in result_a["reward_extra_infos_dict"].keys(): - reward_extra_infos_dict[key] = ( - result_a["reward_extra_infos_dict"][key] + result_b["reward_extra_infos_dict"][key] - ) - - return self._val_metrics_update(data_sources, sample_uids, reward_extra_infos_dict, sample_turns) - + if result_a is None and result_b is None: + return {} + if result_a is None: + result_a = {"data_sources": [], "sample_uids": [], "sample_turns": [], "reward_extra_infos_dict": {}} + if result_b is None: + result_b = {"data_sources": [], "sample_uids": [], "sample_turns": [], "reward_extra_infos_dict": {}} + + if not result_a.get("data_sources") and not result_b.get("data_sources"): + return {} + + data_sources = np.concatenate(result_a["data_sources"] + result_b["data_sources"], axis=0) + sample_uids = result_a["sample_uids"] + result_b["sample_uids"] + sample_turns = result_a["sample_turns"] + result_b["sample_turns"] + + reward_extra_infos_dict = {} + all_keys = set(result_a["reward_extra_infos_dict"].keys()) | set(result_b["reward_extra_infos_dict"].keys()) + for key in all_keys: + list_a = result_a["reward_extra_infos_dict"].get(key, []) + list_b = result_b["reward_extra_infos_dict"].get(key, []) + reward_extra_infos_dict[key] = list_a + list_b + + return self._val_metrics_update(data_sources, sample_uids, reward_extra_infos_dict, sample_turns) def init_workers(self): """Initialize distributed training workers using Ray backend. diff --git a/verl/utils/dataset/rl_dataset.py b/verl/utils/dataset/rl_dataset.py index 477d37ba9c0..4c3707260af 100644 --- a/verl/utils/dataset/rl_dataset.py +++ b/verl/utils/dataset/rl_dataset.py @@ -443,12 +443,12 @@ def split(self, num_splits: int): total_samples = len(self.dataframe) print(f"total_samples: {total_samples}") - if total_samples == 0 or total_samples % num_splits != 0: + if total_samples == 0: + raise ValueError("Cannot split an empty dataset") + if total_samples % num_splits != 0: raise ValueError( - f"Cannot split empty dataset or dataset size {total_samples}" - f" is not divisible by num_splits {num_splits}" + f"Cannot split dataset size {total_samples} into {num_splits} splits" ) - split_size = total_samples // num_splits splits = [] diff --git a/verl/workers/rollout/vllm_rollout/vllm_async_server.py b/verl/workers/rollout/vllm_rollout/vllm_async_server.py index 3ff6a41c38f..70928144018 100644 --- a/verl/workers/rollout/vllm_rollout/vllm_async_server.py +++ b/verl/workers/rollout/vllm_rollout/vllm_async_server.py @@ -734,7 +734,7 @@ async def launch_servers(self): if not self.is_reward_model else f"vllm_server_reward_{self.replica_rank}_{node_rank}" ) - name = name + f"_{os.getpid()}" + name = name + f"_{uuid.uuid4().hex[:8]}" server = self.server_class.options( scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy( node_id=node_id, From 564588211c4d8a1cf0cc9a89b03c5099603717b7 Mon Sep 17 00:00:00 2001 From: HappyAngel Date: Fri, 26 Dec 2025 21:10:52 +0800 Subject: [PATCH 04/11] fix: fix format --- verl/trainer/ppo/ray_trainer.py | 44 ++++++++++++++++----------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/verl/trainer/ppo/ray_trainer.py b/verl/trainer/ppo/ray_trainer.py index 2768e287466..c14761e838e 100644 --- a/verl/trainer/ppo/ray_trainer.py +++ b/verl/trainer/ppo/ray_trainer.py @@ -760,28 +760,28 @@ def _val_metrics_update(self, data_sources, sample_uids, reward_extra_infos_dict return metric_dict def _merge_validation_results(self, result_a, result_b): - if result_a is None and result_b is None: - return {} - if result_a is None: - result_a = {"data_sources": [], "sample_uids": [], "sample_turns": [], "reward_extra_infos_dict": {}} - if result_b is None: - result_b = {"data_sources": [], "sample_uids": [], "sample_turns": [], "reward_extra_infos_dict": {}} - - if not result_a.get("data_sources") and not result_b.get("data_sources"): - return {} - - data_sources = np.concatenate(result_a["data_sources"] + result_b["data_sources"], axis=0) - sample_uids = result_a["sample_uids"] + result_b["sample_uids"] - sample_turns = result_a["sample_turns"] + result_b["sample_turns"] - - reward_extra_infos_dict = {} - all_keys = set(result_a["reward_extra_infos_dict"].keys()) | set(result_b["reward_extra_infos_dict"].keys()) - for key in all_keys: - list_a = result_a["reward_extra_infos_dict"].get(key, []) - list_b = result_b["reward_extra_infos_dict"].get(key, []) - reward_extra_infos_dict[key] = list_a + list_b - - return self._val_metrics_update(data_sources, sample_uids, reward_extra_infos_dict, sample_turns) + if result_a is None and result_b is None: + return {} + if result_a is None: + result_a = {"data_sources": [], "sample_uids": [], "sample_turns": [], "reward_extra_infos_dict": {}} + if result_b is None: + result_b = {"data_sources": [], "sample_uids": [], "sample_turns": [], "reward_extra_infos_dict": {}} + + if not result_a.get("data_sources") and not result_b.get("data_sources"): + return {} + + data_sources = np.concatenate(result_a["data_sources"] + result_b["data_sources"], axis=0) + sample_uids = result_a["sample_uids"] + result_b["sample_uids"] + sample_turns = result_a["sample_turns"] + result_b["sample_turns"] + + reward_extra_infos_dict = {} + all_keys = set(result_a["reward_extra_infos_dict"].keys()) | set(result_b["reward_extra_infos_dict"].keys()) + for key in all_keys: + list_a = result_a["reward_extra_infos_dict"].get(key, []) + list_b = result_b["reward_extra_infos_dict"].get(key, []) + reward_extra_infos_dict[key] = list_a + list_b + + return self._val_metrics_update(data_sources, sample_uids, reward_extra_infos_dict, sample_turns) def init_workers(self): """Initialize distributed training workers using Ray backend. From 8e4b7aa919ab5e6d1ce67b325c98c5528c3d9ea4 Mon Sep 17 00:00:00 2001 From: HappyAngel Date: Fri, 26 Dec 2025 22:29:06 +0800 Subject: [PATCH 05/11] fix :fix uuid not define --- verl/experimental/agent_loop/agent_loop.py | 2 +- verl/workers/rollout/vllm_rollout/vllm_async_server.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/verl/experimental/agent_loop/agent_loop.py b/verl/experimental/agent_loop/agent_loop.py index 75ffcffa626..2ca64e64e71 100644 --- a/verl/experimental/agent_loop/agent_loop.py +++ b/verl/experimental/agent_loop/agent_loop.py @@ -923,7 +923,7 @@ def _init_agent_loop_workers(self): node_id = node_ids[i % len(node_ids)] self.agent_loop_workers.append( self.agent_loop_workers_class.options( - name=f"agent_loop_worker_{i}" + f"_{uuid.uuid4().hex[:8]}", + name=f"agent_loop_worker_{i}" + f"_{uuid4().hex[:8]}", scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy( node_id=node_id, soft=True ), diff --git a/verl/workers/rollout/vllm_rollout/vllm_async_server.py b/verl/workers/rollout/vllm_rollout/vllm_async_server.py index 70928144018..317b7ccda30 100644 --- a/verl/workers/rollout/vllm_rollout/vllm_async_server.py +++ b/verl/workers/rollout/vllm_rollout/vllm_async_server.py @@ -17,6 +17,7 @@ import json import logging import os +import uuid from concurrent.futures import Future from pprint import pprint from typing import Any, Callable, Optional From 759fd05b53d8c869999bc9a06e69889f7e55ece3 Mon Sep 17 00:00:00 2001 From: HappyAngel Date: Fri, 26 Dec 2025 23:36:55 +0800 Subject: [PATCH 06/11] fix: fix uuid4 not def --- .../fully_async_policy/fully_async_trainer.py | 14 ++++++++++++++ .../rollout/vllm_rollout/vllm_async_server.py | 4 ++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/verl/experimental/fully_async_policy/fully_async_trainer.py b/verl/experimental/fully_async_policy/fully_async_trainer.py index 03c116b5f32..b9cfe6f416d 100644 --- a/verl/experimental/fully_async_policy/fully_async_trainer.py +++ b/verl/experimental/fully_async_policy/fully_async_trainer.py @@ -592,3 +592,17 @@ def _log_validation_data(self): f"Validation metrics: {val_metrics.metrics}" ) self.logger.log(data=val_metrics.timing_raw, step=val_metrics.param_version) + + async def _validate_process(self): + if self.config.async_training.use_trainer_do_validate: + print(f"[FullyAsyncTrainer] _validate_process") + from verl.utils.profiler import marked_timer + timing_raw = {} + await self.async_rollout_manager.wake_up() + with marked_timer("trainer/validate_time", timing_raw): + self.train_val_metrics = self._validate(True) + await self.async_rollout_manager.sleep() + print(f"[FullyAsyncTrainer] validate timing_raw validate: {timing_raw['trainer/validate_time']}") + else: + self.train_val_metrics = None + print(f"[FullyAsyncTrainer] _validate_process without async_rollout_manager") diff --git a/verl/workers/rollout/vllm_rollout/vllm_async_server.py b/verl/workers/rollout/vllm_rollout/vllm_async_server.py index 317b7ccda30..058191a855a 100644 --- a/verl/workers/rollout/vllm_rollout/vllm_async_server.py +++ b/verl/workers/rollout/vllm_rollout/vllm_async_server.py @@ -17,7 +17,7 @@ import json import logging import os -import uuid +from uuid import uuid4 from concurrent.futures import Future from pprint import pprint from typing import Any, Callable, Optional @@ -735,7 +735,7 @@ async def launch_servers(self): if not self.is_reward_model else f"vllm_server_reward_{self.replica_rank}_{node_rank}" ) - name = name + f"_{uuid.uuid4().hex[:8]}" + name = name + f"_{uuid4().hex[:8]}" server = self.server_class.options( scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy( node_id=node_id, From d5587a97f4dbee503ad71b62ce2dceddc97157ab Mon Sep 17 00:00:00 2001 From: HappyAngel Date: Mon, 29 Dec 2025 20:28:14 +0800 Subject: [PATCH 07/11] feat: optimized bootstrap_metric compute --- .../fully_async_policy/fully_async_trainer.py | 5 +- verl/trainer/ppo/metric_utils.py | 110 +++++++++++++----- verl/trainer/ppo/ray_trainer.py | 3 +- verl/utils/dataset/rl_dataset.py | 4 +- .../rollout/vllm_rollout/vllm_async_server.py | 2 +- 5 files changed, 89 insertions(+), 35 deletions(-) diff --git a/verl/experimental/fully_async_policy/fully_async_trainer.py b/verl/experimental/fully_async_policy/fully_async_trainer.py index b9cfe6f416d..c7fd22a1b09 100644 --- a/verl/experimental/fully_async_policy/fully_async_trainer.py +++ b/verl/experimental/fully_async_policy/fully_async_trainer.py @@ -595,8 +595,9 @@ def _log_validation_data(self): async def _validate_process(self): if self.config.async_training.use_trainer_do_validate: - print(f"[FullyAsyncTrainer] _validate_process") + print("[FullyAsyncTrainer] _validate_process") from verl.utils.profiler import marked_timer + timing_raw = {} await self.async_rollout_manager.wake_up() with marked_timer("trainer/validate_time", timing_raw): @@ -605,4 +606,4 @@ async def _validate_process(self): print(f"[FullyAsyncTrainer] validate timing_raw validate: {timing_raw['trainer/validate_time']}") else: self.train_val_metrics = None - print(f"[FullyAsyncTrainer] _validate_process without async_rollout_manager") + print("[FullyAsyncTrainer] _validate_process without async_rollout_manager") diff --git a/verl/trainer/ppo/metric_utils.py b/verl/trainer/ppo/metric_utils.py index 876d6907de4..6ed921a9da0 100644 --- a/verl/trainer/ppo/metric_utils.py +++ b/verl/trainer/ppo/metric_utils.py @@ -333,14 +333,28 @@ def bootstrap_metric( [(3.0, 0.5), (4.5, 0.3)] # Example values """ np.random.seed(seed) + data_np = np.array(data, dtype=object) + n_data = len(data_np) - bootstrap_metric_lsts = [[] for _ in range(len(reduce_fns))] - for _ in range(n_bootstrap): - bootstrap_idxs = np.random.choice(len(data), size=subset_size, replace=True) - bootstrap_data = [data[i] for i in bootstrap_idxs] - for i, reduce_fn in enumerate(reduce_fns): - bootstrap_metric_lsts[i].append(reduce_fn(bootstrap_data)) - return [(np.mean(lst), np.std(lst)) for lst in bootstrap_metric_lsts] + # generate bootstrap indices, shape: (n_bootstrap, subset_size) + bootstrap_idxs = np.random.choice(n_data, size=(n_bootstrap, subset_size), replace=True) + + # pre-allocate result array, shape: (n_fns, n_bootstrap) + n_fns = len(reduce_fns) + metric_results = np.empty((n_fns, n_bootstrap), dtype=np.float64) + + # compute metric results for each bootstrap sample + for fn_idx, reduce_fn in enumerate(reduce_fns): + # bootstrap sample and compute metric + for boot_idx in range(n_bootstrap): + sample = data_np[bootstrap_idxs[boot_idx]] + metric_results[fn_idx, boot_idx] = reduce_fn(sample) + + # compute mean and std for each metric function + result = [ + (float(np.mean(metric_results[fn_idx])), float(np.std(metric_results[fn_idx]))) for fn_idx in range(n_fns) + ] + return result def calc_maj_val(data: list[dict[str, Any]], vote_key: str, val_key: str) -> float: @@ -431,47 +445,88 @@ def process_validation_metrics( for var_name, var_vals in infos_dict.items(): var2vals[var_name].append(var_vals[sample_idx]) - # Calculate metrics for each group - data_src2uid2var2metric = defaultdict(lambda: defaultdict(lambda: defaultdict(dict))) + np_mean = np.mean + np_std = np.std + reduce_fns_best_worst = [np.max, np.min] + n_bootstrap = 1000 + + # 2. cache ns list + def gen_ns(n_resps: int) -> list[int]: + if n_resps <= 1: + return [] + ns = [] + n = 2 + while n < n_resps: + ns.append(n) + n *= 2 + ns.append(n_resps) + return ns + + ns_cache = {} + + # 3. cache metric results + data_src2uid2var2metric = {} + + # 4. flatten loop for data_source, uid2var2vals in data_src2uid2var2vals.items(): + # create uid dict + uid_dict = data_src2uid2var2metric.setdefault(data_source, {}) + for uid, var2vals in uid2var2vals.items(): + pred_vals = var2vals.get("pred") + has_pred = pred_vals is not None + var_dict = uid_dict.setdefault(uid, {}) + for var_name, var_vals in var2vals.items(): - if isinstance(var_vals[0], str): + # skip empty or string values + if not var_vals or isinstance(var_vals[0], str): continue - metric = {} + # compute mean and std n_resps = len(var_vals) - metric[f"mean@{n_resps}"] = np.mean(var_vals) + metric = {f"mean@{n_resps}": float(np_mean(var_vals))} if n_resps > 1: - metric[f"std@{n_resps}"] = np.std(var_vals) + metric[f"std@{n_resps}"] = float(np_std(var_vals)) - ns = [] - n = 2 - while n < n_resps: - ns.append(n) - n *= 2 - ns.append(n_resps) + # cache ns list + if n_resps not in ns_cache: + ns_cache[n_resps] = gen_ns(n_resps) + ns = ns_cache[n_resps] + # compute best/worst metrics for n in ns: - [(bon_mean, bon_std), (won_mean, won_std)] = bootstrap_metric( - data=var_vals, subset_size=n, reduce_fns=[np.max, np.min], seed=seed + # compute best/worst metrics + (bon_mean, bon_std), (won_mean, won_std) = bootstrap_metric( + data=var_vals, + subset_size=n, + reduce_fns=reduce_fns_best_worst, + n_bootstrap=n_bootstrap, + seed=seed, ) - metric[f"best@{n}/mean"], metric[f"best@{n}/std"] = bon_mean, bon_std - metric[f"worst@{n}/mean"], metric[f"worst@{n}/std"] = won_mean, won_std - if var2vals.get("pred", None) is not None: + metric[f"best@{n}/mean"] = bon_mean + metric[f"best@{n}/std"] = bon_std + metric[f"worst@{n}/mean"] = won_mean + metric[f"worst@{n}/std"] = won_std + + # compute maj metrics + if has_pred: + # create vote_data vote_data = [ - {"val": val, "pred": pred} for val, pred in zip(var_vals, var2vals["pred"], strict=True) + {"val": val, "pred": pred} for val, pred in zip(var_vals, pred_vals, strict=True) ] + # compute maj metrics [(maj_n_mean, maj_n_std)] = bootstrap_metric( data=vote_data, subset_size=n, reduce_fns=[partial(calc_maj_val, vote_key="pred", val_key="val")], + n_bootstrap=n_bootstrap, seed=seed, ) - metric[f"maj@{n}/mean"], metric[f"maj@{n}/std"] = maj_n_mean, maj_n_std + metric[f"maj@{n}/mean"] = maj_n_mean + metric[f"maj@{n}/std"] = maj_n_std - data_src2uid2var2metric[data_source][uid][var_name] = metric + var_dict[var_name] = metric # Aggregate metrics across uids data_src2var2metric2uid_vals = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) @@ -486,5 +541,4 @@ def process_validation_metrics( for var_name, metric2uid_vals in var2metric2uid_vals.items(): for metric_name, uid_vals in metric2uid_vals.items(): data_src2var2metric2val[data_source][var_name][metric_name] = np.mean(uid_vals) - return data_src2var2metric2val diff --git a/verl/trainer/ppo/ray_trainer.py b/verl/trainer/ppo/ray_trainer.py index c14761e838e..a29e0e4dc79 100644 --- a/verl/trainer/ppo/ray_trainer.py +++ b/verl/trainer/ppo/ray_trainer.py @@ -773,7 +773,7 @@ def _merge_validation_results(self, result_a, result_b): data_sources = np.concatenate(result_a["data_sources"] + result_b["data_sources"], axis=0) sample_uids = result_a["sample_uids"] + result_b["sample_uids"] sample_turns = result_a["sample_turns"] + result_b["sample_turns"] - + reward_extra_infos_dict = {} all_keys = set(result_a["reward_extra_infos_dict"].keys()) | set(result_b["reward_extra_infos_dict"].keys()) for key in all_keys: @@ -782,6 +782,7 @@ def _merge_validation_results(self, result_a, result_b): reward_extra_infos_dict[key] = list_a + list_b return self._val_metrics_update(data_sources, sample_uids, reward_extra_infos_dict, sample_turns) + def init_workers(self): """Initialize distributed training workers using Ray backend. diff --git a/verl/utils/dataset/rl_dataset.py b/verl/utils/dataset/rl_dataset.py index 4c3707260af..b333fe4ad7f 100644 --- a/verl/utils/dataset/rl_dataset.py +++ b/verl/utils/dataset/rl_dataset.py @@ -446,9 +446,7 @@ def split(self, num_splits: int): if total_samples == 0: raise ValueError("Cannot split an empty dataset") if total_samples % num_splits != 0: - raise ValueError( - f"Cannot split dataset size {total_samples} into {num_splits} splits" - ) + raise ValueError(f"Cannot split dataset size {total_samples} into {num_splits} splits") split_size = total_samples // num_splits splits = [] diff --git a/verl/workers/rollout/vllm_rollout/vllm_async_server.py b/verl/workers/rollout/vllm_rollout/vllm_async_server.py index 058191a855a..af1725141fe 100644 --- a/verl/workers/rollout/vllm_rollout/vllm_async_server.py +++ b/verl/workers/rollout/vllm_rollout/vllm_async_server.py @@ -17,10 +17,10 @@ import json import logging import os -from uuid import uuid4 from concurrent.futures import Future from pprint import pprint from typing import Any, Callable, Optional +from uuid import uuid4 import cloudpickle as pickle import numpy as np From d27bf7550e370312f108a2de7336ddac6dd5cb22 Mon Sep 17 00:00:00 2001 From: HappyAngel Date: Tue, 30 Dec 2025 18:11:58 +0800 Subject: [PATCH 08/11] fix: add some info in docs --- docs/advance/fully_async.md | 38 ++ recipe | 1 - recipe/fully_async_policy/README.md | 598 ++++++++++++++++++ recipe/fully_async_policy/README_zh.md | 518 +++++++++++++++ .../experimental/fully_async_policy/README.md | 40 ++ .../fully_async_policy/README_zh.md | 38 +- 6 files changed, 1231 insertions(+), 2 deletions(-) delete mode 160000 recipe create mode 100644 recipe/fully_async_policy/README.md create mode 100644 recipe/fully_async_policy/README_zh.md diff --git a/docs/advance/fully_async.md b/docs/advance/fully_async.md index 051e57586b8..eef92a0a89e 100644 --- a/docs/advance/fully_async.md +++ b/docs/advance/fully_async.md @@ -105,6 +105,7 @@ but the overlap in their time consumption reduces the end-to-end time consumptio | `async_training.checkpoint_engine.enable` | Whether to use checkpoint_engine for accelerating, default `True` | | `async_training.checkpoint_engine.overlap_broadcast_and_consume` | When use checkpoint_engine, whether to overlap broadcast and load_weights, default `False` | | `async_training.checkpoint_engine.device_buffer_size_M` | When use checkpoint_engine, the user-specific bucket size (MB), default `4096` | +| `async_training.use_trainer_do_validate` | Whether use trainer node to do validate process, default `False`| **Further Explanation:** @@ -194,6 +195,13 @@ but the overlap in their time consumption reduces the end-to-end time consumptio - When disable `overlap_broadcast_and_consume`, the additional device memory overhead of trainer rank is `2 * bucket_size`and rollout rank is `1 * bucket_size`。 +* `async_training.use_trainer_do_validate` + + It controls whether to use the trainer's `do_validate` method for validation. + If set to True, the trainer will perform validation after each parameter update. It can reduce the validation time + overhead and trainer node idle time. + If set to False, the trainer will not perform validation. + ### Supported Modes 1. on policy pipeline: @@ -477,6 +485,36 @@ We tested the single-step parameter synchronization time of the checkpoint-engin | Qwen3-235B-A22B | 64 | 64 | False | 58.57s | | Qwen3-235B-A22B | 64 | 64 | True | 23.70s | +### use_trainer_do_validate Experiment + +We tested the effect of setting `use_trainer_do_validate=True` on the training process. The results show that setting +this parameter to True can reduce the validation time overhead and trainer node idle time. +We used Qwen2.5-Math-7B to verify the benefits of `use_trainer_do_validate=True` on the training process, we achieved about 2x performance improvement on validation time, and the trainer node idle time is reduced by about 40%. + +* Machine: H20 +* Model: Qwen2.5-Math-7B +* Rollout length: max_response_length FSDP2: 10K tokens; +* Algorithm: DAPO +* Dataset: TRAIN_FILE: dapo-math-17k.parquet TEST_FILE: aime-2024.parquet +* Engine: vllm+FSDP2 +* rollout.n: 16 +* ppo_mini_batch_size: 32 +* test_freq: 10 + +* fully_async_policy + * total_rollout_steps: 512*400 + * require_batches: 4 + * trigger_parameter_sync_step: 4 + * staleness_threshold: 0.5 + * partial_rollout: True + +| training mode | resource allocation | step | gen | old_log_prob | update_actor | validate time | total time
50 step | acc/mean@2 | +|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:| +| colocate sync | 16 | 484.623 | 52.939 | 0 | 430.263 | 205.080 | 7h9m | 22.6 | +| fully_async_policy | 8:8 | 489.953 | 52.622 | 0 | 435.874 | 95.699 | 7h2m | 21.0 | +| fully_async_policy_opt_validate | 8:8 | | | 0 | | | | | + + ## Multi-Turn Tool Calling Referencing **recipe/retool** and **ToolAgentLoop**, we implemented **AsyncPartialToolAgentLoop**, a multi-turn diff --git a/recipe b/recipe deleted file mode 160000 index 21892b92769..00000000000 --- a/recipe +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 21892b9276936efab5375c3f6b8415e472ef7118 diff --git a/recipe/fully_async_policy/README.md b/recipe/fully_async_policy/README.md new file mode 100644 index 00000000000..389668c2242 --- /dev/null +++ b/recipe/fully_async_policy/README.md @@ -0,0 +1,598 @@ +# Recipe: Fully Async Policy Trainer + +**Author:** `https://github.com/meituan-search` + +Last updated: 12/25/2025. + +This document introduces a fully asynchronous PPO training system that completely decouples the Trainer and Rollouter, +supporting asynchronous sample generation and training. +Under this system, we achieved a 2.35x-2.67x performance improvement when training the Qwen2.5-7B model with 128 GPUs, +without significantly affecting the results. + +## Introduction + +### Background + +The separated rollout and train architecture, compared to the colocate architecture, can allocate resources more +flexibly and design more flexible training logic, thereby addressing issues such as low GPU utilization and training +efficiency caused by long-tail problems. +The one_step_off_policy alleviates the problem of long rollout times and achieves some gains in training efficiency by +designing a separated architecture and performing asynchronous training between rollout and train for one round. +However, it forcibly uses data from one round of asynchronous training, which is not flexible enough and cannot +completely eliminate the impact of long-tail on training efficiency. +In other frameworks such as AReaL, Magistral, StreamRL, and AsyncFlow, asynchronous training and streaming training have +been implemented based on the separated architecture and have achieved gains. +We borrow from their methods and implemented them in VERL. The fully_async_policy supports asynchronous, streaming, and +partial +rollout training. +By reasonably setting parameters such as resource allocation and parameter synchronization frequency, fully_async_policy +can significantly improve training efficiency. + +> Magistral https://arxiv.org/abs/2506.10910 +> +> AReaL: A Large-Scale Asynchronous Reinforcement Learning System for Language +> Reasoning https://arxiv.org/abs/2505.24298 +> +> StreamRL: Scalable, Heterogeneous, and Elastic RL for LLMs with Disaggregated Stream +> Generation https://arxiv.org/abs/2504.15930 +> +> AsyncFlow: An Asynchronous Streaming RL Framework for Efficient LLM Post-Training https://arxiv.org/abs/2507.01663 +> + +### Core Contributions + +* **Resource Isolation**: Unlike using hybrid_engine, Rollouter and Trainer use separate computing resources and need to + specify the resources they occupy separately. +* **Parallel Generation and Training**: While the Trainer is training, the Rollouter is generating new samples. +* **Multi-step Asynchronous**: Compared to one step off policy, it supports asynchronous settings from 0.x steps to + multiple steps, making the asynchronous solution more flexible. +* **NCCL Parameter Synchronization**: Based on the nccl communication primitive, refer to [checkpoint-engine](https://github.com/MoonshotAI/checkpoint-engine) to + achieve efficient parameter synchronization between Rollouter and Trainer. +* **Stream Inference and Training**: Rollouter generates data sample by sample, and data transmission uses a single + sample as the minimum transmission unit. +* **Asynchronous Training and Freshness Control**: By setting the parameter async_training.staleness_threshold, it + supports training with samples generated by old parameters. +* **PartialRollout**: The Rollouter's inference process supports partial rollout logic. During parameter + synchronization, by adding `sleep() and resume()` logic, it + saves samples from ongoing rollouts and continues using them in the next rollout, reducing the time spent waiting for + ongoing tasks to finish during parameter synchronization. + +Currently, the supported usage mode is megatron/fsdp+vllm. vllm must use the server mode based on AgentLoop. + +## Design + +The overall architecture of fully_async_policy is shown in the figure below. fully_async_policy mainly consists of four +parts: Rollouter, MessageQueue, Trainer, and ParameterSynchronizer. + +![fully_async_policy_structure]( +https://github.com/ArronHZG/verl-community/blob/recipe/async_policy/docs/fully_async_policy_structure.svg?raw=true) + +1. Rollouter generates sequences sample by sample and puts the generated samples into the MessageQueue, with the + production speed controlled by freshness. +2. MessageQueue is used to temporarily store samples generated by Rollouter. +3. Trainer fetches samples from MessageQueue sample by sample. After fetching `require_batches*ppo_mini_batch_size` + samples, it will perform training. After training for async_training.trigger_parameter_sync_step rounds, it triggers + a parameter synchronization with Rollouter. +4. ParameterSynchronizer implements the NCCL synchronous parameter synchronization capability. + +The source of benefits compared to the base scheme lies in the fact that in the colocate case, using more resources for +rollout cannot solve the idleness caused by long-tail samples. +After we perform resource isolation, the time for rollout and train may be longer than before (because fewer resources +are used), +but the overlap in their time consumption reduces the end-to-end time consumption. + +![fully_async_policy_revenue]( +https://github.com/ArronHZG/verl-community/blob/recipe/async_policy/docs/fully_async_policy_revenue.svg?raw=true) + +## Usage + +### Parameter Description + +| super params | implication | +|-----------------------------------------------|------------------------------------------------------------------------------------------------| +| `trainer.nnodes` | Number of nodes for Trainer | +| `trainer.n_gpus_per_node` | Number of GPUs per node for Trainer | +| `rollout.nnodes` | Number of nodes for Rollouter | +| `rollout.n_gpus_per_node` | Number of GPUs per node for Rollouter | +| `data.train_batch_size` | In the fully async strategy, this value is not effective (default is 0) | +| `data.gen_batch_size` | In the fully async strategy, uses streaming sample production logic (default is 1) | +| `rollout.total_rollout_steps` | Total number of rollout samples | +| `rollout.test_freq` | How many times Rollouter updates parameters before performing a validation | +| `actor_rollout_ref.actor.ppo_mini_batch_size` | The ppo_mini_batch_size is a global num across all workers/gpus | +| `async_training.require_batches` | Number of ppo_mini_batch_size that FullyAsyncTrainer fetches at once | +| `async_training.trigger_parameter_sync_step` | Indicates how many local updates FullyAsyncTrainer performs before a parameter synchronization | +| `async_training.staleness_threshold` | Freshness control | +| `async_training.partial_rollout` | Whether to perform partial_rollout | +| `async_training.use_rollout_log_probs` | Use log_probs generated by rollout | +| `async_training.compute_prox_log_prob` | Whether to compute log_prob using the training model's parameters during the training phase. | | +| `async_training.checkpoint_engine.enable`| Whether to use checkpoint_engine for accelerating, default `True`| +| `async_training.checkpoint_engine.overlap_broadcast_and_consume` | When use checkpoint_engine, whether to overlap broadcast and load_weights, default `False`| +| `async_training.checkpoint_engine.device_buffer_size_M` | When use checkpoint_engine, the user-specific bucket size (MB), default `4096`| +| `async_training.use_trainer_do_validate` | Whether use trainer node to do validate process, default `False`| + +**Further Explanation:** + +* `rollout.total_rollout_steps` + + Compared to colocate, the quantity can be aligned by multiplying train_batch_size and step: + `rollout.total_rollout_steps = data.train_batch_size * step`. + +* `async_training.trigger_parameter_sync_step` + + In the fully async strategy, it indicates how many local updates the Trainer performs (i.e., how many times it fetches + `require_batches * ppo_mini_batch_size` samples) before a parameter synchronization with Rollouter. + Between every two parameter synchronizations between Rollouter and Trainer, the Trainer will process + `trigger_parameter_sync_step* require_batches*ppo_mini_batch_size` samples. + To fairly compare speed with colocate, trigger_parameter_sync_step should be set to + `data.train_batch_size / (require_batches * ppo_mini_batch_size)`. + +* `async_training.staleness_threshold` + + In the fully async strategy, it indicates the maximum proportion of stale samples allowed to be used. + + * staleness_threshold=0, indicates synchronous training. + Rollouter will generate a fixed number of samples between two parameter updates, the sample count is: + $$rollout\_num = (trigger\_parameter\_sync\_step*require\_batches*ppo\_mini\_batch\_size)$$ + * staleness_threshold>0, indicates asynchronous training, can be set to a decimal for more flexible asynchronous + calls. + Rollouter will generate at most the following number of samples between two parameter updates: + $$rollout\_num = (1+staleness\_threshold)*(trigger\_parameter\_sync\_step*require\_batches*ppo\_mini\_batch\_size) - num\_staleness\_sample $$ + + num_staleness_sample represents the number of stale samples generated in excess during the last rollout. + + Since it's a streaming system, rollout continues to generate and trainer continues to consume. If rollouter is slower, + trainer will trigger parameter synchronization earlier, and rollouter will not actually produce rollout_num samples. + When rollout is fast enough, setting staleness_threshold to 1 is basically equivalent to one_step_off policy. + To avoid too many expired samples affecting training accuracy, it is recommended to set this value to less than 1. + +* `async_training.partial_rollout` + + partial_rollout only actually takes effect when staleness_threshold>0. + +* `async_training.use_rollout_log_probs` + + In reinforcement learning algorithms, log_probs have implicit correlations with parameter versions and tokens. Due to + the settings of algorithms like PPO/GRPO/DAPO, when calculating importance sampling, + old_log_prob must use the log_probs corresponding to the rollout parameters and tokens to ensure algorithm + correctness. In the fully + async strategy, we default to old_log_prob being calculated by rollout rather than by trainer. + +* `async_training.require_batches` + + In streaming training, require_batches should be set to 1, indicating that training is performed after producing + enough ppo_mini_batch_size samples. + In actual testing, we found that if fewer samples are issued at once, due to the order of data distribution, it can + cause training instability and longer response lengths. + Here, we additionally provide require_batches for streaming distribution and control the number of samples + participating in training at once. + +* `async_training.compute_prox_log_prob` (experimental) + + During the training process, we observed that metrics and response lengths may become unstable in the later + stages of training. To mitigate this issue, we can use + the [Rollout Importance Sampling](https://verl.readthedocs.io/en/latest/advance/rollout_is.html) + technique for importance sampling. To utilize Rollout Importance Sampling, we need to compute log_prob using + the training engine, which requires enabling this switch. + Additionally, when compute_prox_log_prob and Rollout Importance Sampling are enabled under mode d + (async stream pipeline with partial rollout), our implementation approximates `Areal's Decoupled PPO`. + +* `async_training.checkpoint_engine.enable` + + Enabling the checkpoint engine generally reduces synchronization time overhead by more than 60% compared to + the original per-tensor parameter synchronization method. However, assembling buckets incurs additional + temporary GPU memory overhead. + +* `async_training.checkpoint_engine.overlap_broadcast_and_consume` + + Enabling pipeline between the broadcast and load_weights parameters will allocate additional GPU memory. + Since the main time consumption for parameter synchronization is not in the broadcast and load_weights phases, + but in the parameter generation phase (by megatron or FSDP), this option is off by default. + +* `async_training.checkpoint_engine.device_buffer_size_M` + + It controls the size of the memory buffer used for synchronization when the checkpoint-engine is enabled. + The actual `bucket_size` = `max(device_buffer_size_M, maximum parameter tensor size)`. + * When enable `overlap_broadcast_and_consume`, the additional device memory overhead of + trainer rank is `3 * bucket_size`and rollout rank is `2 * bucket_size`。 + * When disable `overlap_broadcast_and_consume`, the additional device memory overhead of + trainer rank is `2 * bucket_size`and rollout rank is `1 * bucket_size`。 + +* `async_training.use_trainer_do_validate` + + It controls whether to use the trainer's `do_validate` method for validation. + If set to True, the trainer will perform validation after each parameter update. It can reduce the validation time + overhead and trainer node idle time. + If set to False, the trainer will not perform validation. + +### Supported Modes + +1. on policy pipeline: + 1. **trigger_parameter_sync_step=1, staleness_threshold=0** + 2. Rollouter produces `require_batches*ppo_mini_batch_size` samples at once, Trainer fetches these samples for + training, and after training completes, Trainer and Rollouter perform a parameter synchronization; + 3. During the rollout phase, if there are long-tail samples but few rollout samples, shorter samples cannot fill + idle resources, causing some resource waste. + 4. As shown in figure a; + +2. stream off policy pipeline: + 1. **trigger_parameter_sync_step>1, staleness_threshold=0** + 2. Synchronous streaming training will be performed. Rollouter produces + `require_batches*ppo_mini_batch_size*trigger_parameter_sync_step` samples at once, Trainer performs a local + training every time it fetches `require_batches*ppo_mini_batch_size` samples, and after training + trigger_parameter_sync_step times, Trainer and Rollouter perform a parameter synchronization; + 3. Compared to a, since more samples are generated at once, resource idleness will be lower. + 4. In one step training, there will be two periods of resource idleness: when fetching the first batch of samples, + train waits for `require_batches*ppo_mini_batch_size` samples to be produced, and during the last parameter + update, rollout waits for training to complete. + 5. As shown in figure b; + +3. async stream pipeline with stale samples: + 1. **trigger_parameter_sync_step>=1, staleness_threshold>0, partial_rollout=False** + 2. After each parameter update, Rollouter will plan to produce at most rollout_num samples (in practice, the number + of samples generated may be less than this value depending on rollout speed). + 3. If the rollout process is relatively fast, Rollouter will generate some additional samples num_stale_samples + before parameter synchronization for immediate use by Trainer after synchronization. + When triggering parameter synchronization, if Rollouter has ongoing tasks, it will wait for the tasks to complete + and not add new tasks; + 4. Compared to b, except for the first step training, subsequent training will not have the time to wait for the + first batch rollout to finish, but will have the time to wait for active tasks to finish. + 5. As shown in figure c; + +4. async stream pipeline with partial rollout: + 1. **trigger_parameter_sync_step>=1, staleness_threshold>0, partial_rollout=True** + 2. Compared to c, when triggering parameter synchronization, if Rollouter has samples being produced, it will + interrupt the rollout process and perform parameter synchronization. The interrupted samples will continue to be + generated after synchronization. This reduces the time to wait for active tasks to finish. + 3. As shown in figure d; + +![fully_async_policy_mode]( +https://github.com/ArronHZG/verl-community/blob/recipe/async_policy/docs/fully_async_policy_mode.svg?raw=true) + +### Key Metrics + +| metrics | implication | +|------------------------------------------------|--------------------------------------------------------------------------------------------------------| +| `trainer/idle_ratio` | Trainer idle rate | +| `rollouter/idle_ratio` | Rollouter idle rate | +| `fully_async/count/stale_samples_processed` | Total number of old samples used in training | +| `fully_async/count/stale_trajectory_processed` | Total number of old trajectories used in training (one sample produces rollout.n trajectories) | +| `fully_async/partial/total_partial_num` | Number of partial samples processed by Trainer between two trigger_parameter_sync_step | +| `fully_async/partial/partial_ratio` | Ratio of partial samples processed by Trainer between two trigger_parameter_sync_step | +| `fully_async/partial/max_partial_span` | Maximum parameter span of partial samples processed by Trainer between two trigger_parameter_sync_step | + +### Parameter Tuning Recommendations + +* Resource Allocation and Adjustment: + * Reasonable resource allocation is the prerequisite for achieving good training efficiency. The ideal resource + allocation should make the rollout time and train time close, thereby minimizing pipeline bubbles in the entire + training process, + avoiding resource idleness, and ensuring Trainer does not use old samples. In real training scenarios, resource + allocation can be adjusted based on the idle time of rollout and train during actual training, + which can be obtained from rollouter/idle_ratio and trainer/idle_ratio. If rollouter/idle_ratio is high and + trainer/idle_ratio is low, + Trainer resources should be increased and Rollouter resources should be reduced, and vice versa. + +* Key Parameters: + * staleness_threshold: Setting it too high will cause more old samples to be used, affecting model performance. It + is recommended to set it to less than 1. + * require_batches: The closer to 1, the closer to a pure streaming process, the smaller the training bubbles, and + the faster the acceleration effect that can be achieved in terms of speed, but it will affect the order of sample + processing; + * trigger_parameter_sync_step: The smaller the setting, the closer to on policy, but it will cause frequent + parameter synchronization. Long-tail samples waste resources that cannot be filled by short samples, resulting in + low resource utilization. + The larger the setting, the higher the computational efficiency, but the accuracy will be affected by off policy. + * rollout.test_freq: It will occupy Rollouter resources and is not recommended to be set too small. + +* Mode Selection: By adjusting different parameters, the Fully Async architecture supports optimization acceleration at + different levels, suitable for tasks in different scenarios. + * For small-scale tasks that need to ensure training stability and on-policy nature, and have low speed + requirements, the on policy pipeline mode (Mode 1) can be tried. + * For scenarios that need to improve training throughput but are sensitive to staleness, the stream off policy + pipeline mode can be tried. That is, by + setting trigger_parameter_sync_step>1 to improve training efficiency, but still maintaining the synchronization + mechanism (staleness_threshold=0) (Mode 2). + * For large-scale tasks with high training speed requirements and can tolerate a certain degree of off-policy and + staleness, setting staleness_threshold> + 0 and partial_rollout=True can improve training efficiency, using the async stream pipeline mode (Mode 3 or 4). + +### Quick Start + +```shell +rollout_mode="async" +rollout_name="vllm" # sglang or vllm +if [ "$rollout_mode" = "async" ]; then + export VLLM_USE_V1=1 + return_raw_chat="True" +fi + +train_prompt_bsz=0 +gen_prompt_bsz=1 +n_resp_per_prompt=16 +train_prompt_mini_bsz=32 +total_rollout_steps=$(((512*400))) +test_freq=10 +staleness_threshold=0 +trigger_parameter_sync_step=16 +partial_rollout=False + + +python -m recipe.fully_async_policy.fully_async_main \ + train_batch_size=${train_prompt_bsz} \ + data.gen_batch_size=${gen_prompt_bsz} \ + data.return_raw_chat=${return_raw_chat} \ + actor_rollout_ref.rollout.n=${n_resp_per_prompt} \ + actor_rollout_ref.actor.strategy=fsdp2 \ + critic.strategy=fsdp2 \ + actor_rollout_ref.hybrid_engine=False \ + actor_rollout_ref.actor.use_dynamic_bsz=${use_dynamic_bsz} \ + actor_rollout_ref.ref.log_prob_use_dynamic_bsz=${use_dynamic_bsz} \ + actor_rollout_ref.rollout.log_prob_use_dynamic_bsz=${use_dynamic_bsz} \ + actor_rollout_ref.rollout.name=${rollout_name} \ + actor_rollout_ref.rollout.mode=${rollout_mode} \ + actor_rollout_ref.rollout.calculate_log_probs=True \ + trainer.nnodes="${NNODES_TRAIN}" \ + trainer.n_gpus_per_node="${NGPUS_PER_NODE}" \ + rollout.nnodes="${NNODES_ROLLOUT}" \ + rollout.n_gpus_per_node="${NGPUS_PER_NODE}" \ + rollout.total_rollout_steps="${total_rollout_steps}" \ + rollout.test_freq="${test_freq}" \ + async_training.staleness_threshold="${staleness_threshold}" \ + async_training.trigger_parameter_sync_step="${trigger_parameter_sync_step}" \ + async_training.partial_rollout="${partial_rollout}" +``` + +## Experiments + +### Asynchronous Training on 7B Model + +We used Qwen2.5-Math-7B to verify the benefits of the fully async strategy under long candidates and multiple resources. +Using the `async stream pipeline with stale samples` strategy, we achieved about 2x performance improvement on 32 cards, +64 cards, and 128 cards without significantly affecting experimental results. + +* Machine: H20 +* Model: Qwen2.5-Math-7B +* Rollout length: max_response_length FSDP2: 28K tokens; +* Algorithm: DAPO +* Dataset: TRAIN_FILE: dapo-math-17k.parquet TEST_FILE: aime-2024.parquet +* Engine: vllm+FSDP2 +* rollout.n: 16 +* ppo_mini_batch_size: 32 +* test_freq: 20 + +* colocate sync: + * step: 400 + * train_batch_size: 512 + +* fully_async_policy + * total_rollout_steps: 512*400 + * require_batches: 4 + * trigger_parameter_sync_step: 4 + * staleness_threshold: 0.5 + * partial_rollout: True + +| training mode | resource allocation | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | total time
300 step | total time
400 step | acc/mean@1 | +|:--------------------:|:---------------------:|:--------:|:--------:|:--------------:|:---------------:|:------------------------:|:------------------------:|:------------------------:|:------------------------:|:-------------------------------:| +| colocate sync | 32 | 790.10 | 357.41 | 107.71 | 269.80 | 13h 44m | 1d 3h 43m | 2d 9h 22m | 3d 17h 5m | max: 0.3313
last: 0.2448 | +| fully_async_policy | 16:16 | 294.77 | 21.26 | \ | 313.81 | 7h 58m
(1.72x) | 16h 21m
(1.70x) | 1d 0h 53m
(2.31x) | 1d 9h 26m
(2.66x) | max: 0.3302
last: 0.2333 | +| colocate sync | 64 | 365.28 | 150.72 | 70.26 | 133.41 | 10h 22m | 20h 45m | 1d 7h 6m | 1d 17h 32m | max: 0.3365
last: 0.2333 | +| fully_async_policy | 32:32 | 189.26 | 28.46 | \ | 156.98 | 4h 57m
(2.09x) | 10h 14m
(2.03x) | 16h 58m
(1.83x) | 21h 40m
(1.92x) | max: 0.3677
last: 0.3406 | +| colocate sync | 128 | 356.30 | 177.85 | 53.92 | 113.81 | 8h 36m | 17h 56m | 1d 5h 6m | 1d 16h 48m | max: 0.3573
last: 0.2958 | +| fully_async_policy | 64:64 | 150.63 | 33.14 | \ | 113.16 | 3h 13m
(2.67x) | 6h 46m
(2.65x) | 10h 53m
(2.67x) | 17h 22m
(2.35x) | max: 0.3521
last: 0.3094 | + +> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-colocate_async?nw=nwuserhouzg + +### 128-card 7B Asynchronous Mode Experiment + +We used Qwen2.5-Math-7B to verify the effects of various modes supported by fully async. +We can see that the benefit brought by streaming is approximately 1.6x, and after combining staleness and +partial_rollout, the benefit reaches 2.35x. + +| mode | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | total time
300 step | total time
400 step | acc/mean@1 | +|:-------------------------------------------------------------------------------------------------------:|:--------:|:--------:|:--------------:|:--------------:|:------------------------:|:------------------------:|:------------------------:|:------------------------:|:------------------------------:| +| colocate sync | 356.30 | 177.85 | 53.92 | 113.81 | 8h 36m | 17h 56m | 1d 5h 6m | 1d 16h 48m | max: 0.3573
last: 0.2958 | +| `stream off policy pipeline`
(+fully async: trigger_parameter_sync_step= 4,
require_batches= 4) | 231.34 | 128.47 | \ | 98.77 | 4h 25m | 9h 41m | 15h 2m | 1d 1h 53m | max: 0.2844
last: 0.2604 | +| `async stream pipeline with stale samples`
(+staleness_threshold=0.5) | | | | | | | | | | +| `async stream pipeline with partial rollout`
(+partial_rollout=True) | 150.63 | 33.14 | \ | 113.16 | 3h 13m | 6h 46m | 10h 53m | 17h 22m | max: 0.3521
last: 0.3094 | + +> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-stream_stale_partial?nw=nwuserhouzg + +### 128-card Stale Ablation Experiment + +Under the `async stream pipeline with partial rollout` mode, we verified the impact of staleness settings on training +efficiency. +We found that the larger the staleness, the more obvious the final gains. +We also noticed that the times for staleness values of 0.3 and 0.5 are quite close, because as the training steps +increase, the response length changes significantly, causing training instability. +Further analysis and optimization are needed for this issue. + +| staleness_threshold | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | total time
300 step | total time
400 step | acc/mean@1 | +|:---------------------:|:--------:|:--------:|:--------------:|:--------------:|:------------------------:|:------------------------:|:------------------------:|:------------------------:|:-----------------------------:| +| 0 | 231.34 | 128.47 | \ | 98.77 | 4h 25m | 9h 41m | 15h 2m | 1d 1h 53m | max: 0.2844
last: 0.2604 | +| 0.1 | 171.30 | 58.17 | \ | 109.12 | 3h 53m | 8h 37m | 14h 25m | 19h 59m | max: 0.3542
last: 0.2979 | +| 0.3 | 146.11 | 38.88 | \ | 103.22 | 3h 18m | 6h 49m | 11h 40m | 17h 20m | max: 0.3469
last: 0.2865 | +| 0.5 | 150.63 | 33.14 | \ | 113.16 | 3h 13m | 6h 46m | 10h 53m | 17h 22m | max: 0.3521
last: 0.3094 | + +> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-stream_stale_partial?nw=nwuserhouzg + +### 128-card 7B require_batches Ablation Experiment + +In multiple tests, we found that the number of samples issued each time in streaming affects the response length during +training, which in turn affects training time. We verified the impact on results by modifying +`async_training.require_batches`. + +| require_batches | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | total time
300 step | acc/mean@1 | +|:-----------------:|:--------:|:-------:|:--------------:|:--------------:|:------------------------:|:------------------------:|:------------------------:|:-----------------------------:| +| 1 | 203.47 | 30.88 | \ | 181.08 | 3h 31m | 8h 29m | 17h 36m | max: 0.349
last: 0.326 | +| 2 | 158.72 | 26.32 | \ | 128.08 | 3h 35m | 7h 38m | 13h 57m | max: 0.351
last: 0.3406 | +| 4 | 124.64 | 25.62 | \ | 95.06 | 3h 13m | 6h 46m | 10h 53m | max: 0.3521
last: 0.3521 | + +> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-ablation_require_batches?nw=nwuserhouzg + +### 30B Model Mode Experiment + +We achieved a 1.7x performance improvement with `async stream pipeline with staleness samples` strategy on the +Qwen3-30B-A3B-Base model compared to the colocate setup. It is worth noting that this is far from the upper limit of +performance gains achievable through asynchrony. Firstly, the comparative experiments used a maximum response length of +only 8k, which is much shorter than the 20k sequence length in previous experiments, resulting in a less pronounced +rollout tail effect. Secondly, we adopted a highly skewed resource allocation, with rollout using 96 GPUs and trainer +using 32 GPUs, which is not an optimal configuration. During the experiments, we observed that the current verl +implementation imposes certain constraints, such as requiring data to be evenly divisible by the number of GPUs, making +resource adjustment less flexible. Additionally, as asynchronous training and deployment accelerate, the performance gap +is gradually narrowing. Therefore, enabling more flexible resource allocation and dynamic resource adjustment in the +future will be our next focus. + +* Machine: H20 +* Model: Qwen3-30B-A3B-Base +* Rollout length: max_response_length : 8K tokens; +* Algorithm: GRPO +* Dataset: TRAIN_FILE: dapo-math-17k.parquet TEST_FILE: aime-2024.parquet +* Engine: vllm+Megatron +* rollout.n: 16 +* ppo_mini_batch_size: 128 +* test_freq: 20 + +* colocate sync: + * step:400 + * train_batch_size: 512 + +* fully_async_policy + * total_rollout_steps: 512*400 + * trigger_parameter_sync_step: 512/128 = 4 + * staleness_threshold: 0.5 + * partial_rollout: True + +| Training Mode | Resource Allocation | Step | Gen | Old Log Prob | Ref | Update Actor | Total Time 100 Step | Total Time 200 Step | Total Time 300 Step | Total Time 400 Step | Acc/Mean@1 | +|--------------------|---------------------|--------|--------|--------------|-------|--------------|---------------------|---------------------|---------------------|---------------------|-----------------------------| +| Colocate Sync | 128 | 497.89 | 348.05 | 28.73 | 20.86 | 86.27 | 13h 36m | 1d 3h 48m | 1d 19h 4m | 2d 11h 39m | max: 0.3500
last: 0.3208 | +| Fully Async Policy | 96:32 | 282.75 | 22.06 | \ | 50.05 | 206.63 | 6h 45m (2.01x) | 14h 48m (1.88x) | 1d 0h 9m (1.78x) | 1d 10h 41m (1.72x) | max: 0.3813
last: 0.3448 | + +> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-30B?nw=nwuserhouzg | | | + +### checkpoint-engine Ablation Experiment +We tested the single-step parameter synchronization time of the checkpoint-engine on three models: Qwen2.5-Math-7B, Qwen3-30B-A3B, and Qwen3-235B-A22B, using default checkpoint-engine configurations. All experiments were performed on H20 machines, and the Megatron engine was used for training. +| model | trainer rank | rollout rank | checkpoint-engine | total sync time | +|:-----------------:|:--------:|:-------:|:--------------:|:--------------:| +| Qwen2.5-Math-7B | 4 | 4 | False | 0.12s | +| Qwen2.5-Math-7B | 4 | 4 | True | 0.02s | +| Qwen3-30B-A3B | 16 | 16 | False | 15.76s | +| Qwen3-30B-A3B | 16 | 16 | True | 4.38s | +| Qwen3-235B-A22B | 64 | 64 | False | 58.57s | +| Qwen3-235B-A22B | 64 | 64 | True | 23.70s | + + +### use_trainer_do_validate Experiment +We tested the effect of setting `use_trainer_do_validate=True` on the training process. The results show that setting +this parameter to True can reduce the validation time overhead and trainer node idle time. +We used Qwen2.5-Math-7B to verify the benefits of `use_trainer_do_validate=True` on the training process, we achieved about 2x performance improvement on validation time, and the trainer node idle time is reduced by about 40%. + +* Machine: H20 +* Model: Qwen2.5-Math-7B +* Rollout length: max_response_length FSDP2: 10K tokens; +* Algorithm: DAPO +* Dataset: TRAIN_FILE: dapo-math-17k.parquet TEST_FILE: aime-2024.parquet +* Engine: vllm+FSDP2 +* rollout.n: 16 +* ppo_mini_batch_size: 32 +* test_freq: 10 + +* fully_async_policy + * total_rollout_steps: 512*400 + * require_batches: 4 + * trigger_parameter_sync_step: 4 + * staleness_threshold: 0.5 + * partial_rollout: True + +| training mode | resource allocation | step | gen | old_log_prob | update_actor | validate time | total time
50 step | acc/mean@2 | +|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:| +| colocate sync | 16 | 484.623 | 52.939 | 0 | 430.263 | 205.080 | 7h9m | 22.6 | +| fully_async_policy | 8:8 | 489.953 | 52.622 | 0 | 435.874 | 95.699 | 7h2m | 21.0 | +| fully_async_policy_opt_validate | 8:8 | | | 0 | | | | | + + +## Multi-Turn Tool Calling + +Referencing **recipe/retool** and **ToolAgentLoop**, we implemented **AsyncPartialToolAgentLoop**, a multi-turn +tool-calling loop that supports partial_rollout for **fully_async_policy**. + +### Core Design + +`AsyncPartialToolAgentLoop` inherits from `ToolAgentLoop` and is adapted for the asynchronous training mode of +`fully_async_policy`. When `partial_rollout=True`, the Rollouter interrupts ongoing generation tasks before +synchronizing parameters with the Trainer. `AsyncPartialToolAgentLoop` is capable of: + +1. **Interrupting Tasks**: Responding to an interrupt signal to save the current state. Currently, interruptions occur + during the `GENERATING` process or after other states have completed. +2. **Resuming Tasks**: Resuming execution from the saved state after parameter synchronization is complete, rather than + starting over. + +### How to Use + +RL training with multi-turn tool calling in `fully_async_policy` is similar to `recipe/retool`. It is enabled by +specifying `multi_turn` configurations in the config file. + +1. **SFT Stage**: First, the model should undergo SFT to learn how to follow tool-calling format instructions. +2. **Multi-turn Configuration**: In the `fully_async_policy` training configuration, set the following parameters: + ```yaml + actor_rollout_ref: + rollout: + multi_turn: + enable: True # AsyncPartialToolAgentLoop will be used by default in fully_async_policy mode + # Other multi_turn related configurations + ``` +3. **Async Parameters**: To improve efficiency, enable `partial_rollout` and `staleness_threshold` when using multi-turn + tool calling: + ```yaml + async_training: + partial_rollout: True + staleness_threshold: 0.5 + # Other async parameters + ``` +4. **Example**: See `recipe/fully_async_policy/shell/dapo_7b_async_retool.sh`. + +### Experimental Results + +To validate the performance of `fully_async_policy` on multi-turn tool-calling tasks, we compared it with the standard +`colocate` synchronous mode. Key parameter settings are as follows. + +* **SFT Model**: Based on `Qwen2.5-7B-Instruct`, trained for 6 epochs on the `ReTool-SFT` dataset +* **RL Algorithm**: DAPO +* **Dataset**: + * Train: `DAPO-Math-17k` + * Test: `aime_2025` +* **Resource and Mode Comparison**: + * `colocate sync`: 32 H20 gpus + * `fully_async_policy`: 16 gpus for Trainer + 16 gpus for Rollouter +* **Key Configurations**: + 1. **Tool Calling Configuration**: + * `multi_turn.enable: True` + * `multi_turn.max_user_turns: 16` + * `multi_turn.max_assistant_turns: 16` + * `multi_turn.tool_config_path: recipe/retool/sandbox_fusion_tool_config.yaml` + 2. **`colocate sync` Configuration**: + * `ppo_mini_batch_size: 16` + * `train_batch_size: 64` + 3. **`fully_async_policy` Configuration**: + * `ppo_mini_batch_size: 16` + * `trigger_parameter_sync_step: 4` + * `require_batches: 1` + * `staleness_threshold: 1` + * `partial_rollout: True` + +| training mode | Resource allocation | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | aime_2025
acc/mean@30 | +|:--------------------:|:---------------------:|:---------:|:---------:|:--------------:|:--------------:|:------------------------:|:------------------------:|:-------------------------------:| +| colocate | 32 | 375.47 | 228.03 | 35.19 | 111.84 | 9h 46m | 22h 28m | start:0.1078
last:0.2056 | +| fully_async_policy | 16: 16 | 221.36 | 40.59 | \ | 179.58 | 6h 19m
(1.55x) | 14h 4m
(1.60x) | start:0.11
last:0.2044 | + +> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-multiturn-tool?nw=nwuserhouzg + +## Future Plans + +* GRPO experiments +* Megatron adaptation +* SGLang integration +* Transfer queue integration +* Asynchronous parameter synchronization +* AReaL asynchronous algorithm implementation +* TPPO algorithm implementation +* Multi-turn and Tool support \ No newline at end of file diff --git a/recipe/fully_async_policy/README_zh.md b/recipe/fully_async_policy/README_zh.md new file mode 100644 index 00000000000..fcd98b0f846 --- /dev/null +++ b/recipe/fully_async_policy/README_zh.md @@ -0,0 +1,518 @@ +# Recipe: Fully Async Policy Trainer + +**Author:** `https://github.com/meituan-search` + +Last updated: 12/15/2025. + +本文档介绍了完全异步PPO训练系统,该系统实现了 Trainer 和 Rollouter 的完全解耦,支持异步样本生成和训练。 +在该系统下,我们使用128卡训练qwen2.5-7B模型取得了2.35x-2.67x的性能提升,同时效果没有显著受到影响。 + +## Introduction + +### Background + +rollout和train分离架构相较于colocate的架构能够更加灵活地分配资源,设计更加灵活的训练逻辑,从而处理长尾等问题带来的GPU利用率低,训练效率低的问题。 +one_step_off_policy通过分离架构的设计并进行rollout和train一轮异步的训练方法,缓解了rollout时间过长的问题,并在训练效率上取得了一些收益, +但其强制使用一轮异步的数据,存在不够灵活等问题,而且并不能完全去除长尾对训练效率带来的的影响;在其他框架如areal、Magistral、streamrl、asyncflow上, +已经基于分离架构实现了异步训练、流式训练,并取得了收益;我们借鉴其方法,在verl上进行了实现。fully_async_policy支持异步、流式、partial +rollout的训练, 通过合理设置资源分配情况、参数同步频率等参数,fully_async_policy能够显著提高训练效率。 + +> Magistral https://arxiv.org/abs/2506.10910 +> +> AReaL: A Large-Scale Asynchronous Reinforcement Learning System for Language +> Reasoning https://arxiv.org/abs/2505.24298 +> +> StreamRL: Scalable, Heterogeneous, and Elastic RL for LLMs with Disaggregated Stream +> Generation https://arxiv.org/abs/2504.15930 +> +> AsyncFlow: An Asynchronous Streaming RL Framework for Efficient LLM Post-Training https://arxiv.org/abs/2507.01663 +> + +### 核心贡献 + +* **资源隔离**:与使用hybrid_engine不同,Rollouter和Trainer使用分离的计算资源,需要分别指定所占用的资源。 +* **生成与训练并行**:Trainer在训练的同时,Rollouter在生成新的样本。 +* **多步异步**: 相比 one step off policy 支持0.x步到多步的异步设定,异步方案更加灵活。 +* **nccl参数同步**:基于nccl通信原语,参考[checkpoint-engine](https://github.com/MoonshotAI/checkpoint-engine)实现Rollouter与Trainer间的高效参数同步。 +* **Stream推理与训练**:Rollouter逐样本生成数据,同时数据传输以单个sample为最小传输单位。 +* **异步训练与新鲜度控制**:通过设置参数async_training.staleness_threshold,支持使用旧参数生成的样本进行训练。 +* **PartialRollout**: Rollouter推理过程支持partial rollout逻辑,通过参数同步时,添加`sleep()`和`resume()` + 逻辑,保存进行中的rollout的样本,并在下一次rollout中继续使用,减少参数同步等待进行中的任务结束时间。 + +目前支持使用模式为 megatron/fsdp+vllm。vllm必须使用基于AgentLoop的server模式。 + +## 设计 + +fully_async_policy的整体架构如下图所示,fully_async_policy主要由Rollouter、MessageQueue、Trainer、ParameterSynchronizer四部分组成。 + +![fully_async_policy_structure]( +https://github.com/ArronHZG/verl-community/blob/recipe/async_policy/docs/fully_async_policy_structure.svg?raw=true) + +1. Rollouter逐样本生成序列,并将生成的sample放入MessageQueue中,生产的速度受新鲜度控制。 +2. MessageQueue用于暂存Rollouter生成的sample。 +3. Trainer逐样本从MessageQueue中获取,获取到`require_batches*ppo_mini_batch_size` + 数量的样本后,就会进行训练,训练async_training.trigger_parameter_sync_step轮后,触发与Rollouter的一次参数同步。 +4. ParameterSynchronizer 实现了Nccl的同步参数同步能力。 + +当前方案对比base的收益来源,在于colocate情况下,rollout使用更多的资源无法解决长尾样本带来的空闲, +当我们进行资源隔离后,rollout的时间和train的时间都可能相较于之前更长(因为使用的资源变少了), +但是相互之间的耗时overlap,端到端的耗时反而有所缩减。 + +![fully_async_policy_revenue]( +https://github.com/ArronHZG/verl-community/blob/recipe/async_policy/docs/fully_async_policy_revenue.svg?raw=true) + +## 使用方式 + +### 参数说明 + +| super params | implication | +|------------------------------------------------------|-----------------------------------------------------------------| +| `trainer.nnodes` | Trainer的node数量 | +| `trainer.n_gpus_per_node` | Trainer每个node上gpu的数量 | +| `rollout.nnodes` | Rollouter的node数量 | +| `rollout.n_gpus_per_node` | Rollouter每个node上gpu的数量 | +| `data.train_batch_size` | 在fully async策略中,该值不生效(默认设置为0) | +| `data.gen_batch_size` | 在fully async策略中,使用流式的样本生产逻辑(默认设置为1) | +| `rollout.total_rollout_steps` | 总的rollout的sample数量 | +| `rollout.test_freq` | Rollouter每更新多少次参数,进行一次validation | +| `actor_rollout_ref.actor.ppo_mini_batch_size` | The ppo_mini_batch_size is a global num across all workers/gpus | +| `async_training.require_batches` | FullyAsyncTrainer一次性获取的ppo_mini_batch_size的数量 | +| `async_training.trigger_parameter_sync_step` | 表示FullyAsyncTrainer进行多少次本地更新后,进行一次参数同步 | +| `async_training.staleness_threshold` | 新鲜度控制 | +| `async_training.partial_rollout` | 是否进行partial_rollout | +| `async_training.use_rollout_log_probs` | 使用rollout产生的log_probs | +| `async_training.compute_prox_log_prob`(experimental) | 是否在train阶段,使用train模型的参数计算token的 log_prob | +| `async_training.checkpoint_engine.enable`| 是否开启checkpoint_engine模式的加速,默认值True | +| `async_training.checkpoint_engine.overlap_broadcast_and_consume` | 启动checkpoint_engine时,是否在参数同步时在broadcast和加载之间使用流水,默认值False| +| `async_training.checkpoint_engine.device_buffer_size_M` | 启动checkpoint_engine时,组装的bucket的大小(MB),默认为4096 | +| `async_training.use_trainer_do_validate` | 是否使用Trainer的do_validate方法进行validation,默认值False | + +**进一步的解释:** + +* `rollout.total_rollout_steps` + + 与 colocate 相比,数量可以通过 train_batch_size 与 step 相乘对齐: + `rollout.total_rollout_steps = data.train_batch_size * step`。 + +* `async_training.trigger_parameter_sync_step` + + 在fully async策略中,表示Trainer进行多少次本地更新后(也就是获取多少次`require_batches * ppo_mini_batch_size`数量样本), + 与Rollouter之间进行一次参数同步。 + 每两次Rollouter和Trainer参数同步之间,Trainer将会处理`trigger_parameter_sync_step* require_batches\ + ppo_mini_batch_size`份sample。 + 如果为了与colocate在公平的情况下对比速度,trigger_parameter_sync_step应该设置为 `data.train_batch_size / ( + require_batches * ppo_mini_batch_size)`。 + +* `async_training.staleness_threshold` + + 在fully async策略中,表示最大允许使用的staleness样本的比例。 + + * staleness_threshold=0,表示同步训练。 + Rollouter两次参数更新之间将会生成固定数量的样本,样本数为: + $$rollout\_num = (trigger\_parameter\_sync\_step*require\_batches*ppo\_mini\_batch\_size)$$ + * staleness_threshold>0,表示异步训练, 可以设置为小数,支持更灵活的异步调用。 + Rollouter两次参数更新之间将会最多生成的样本数为: + $$rollout\_num = (1+staleness\_threshold)*(trigger\_parameter\_sync\_step*require\_batches*ppo\_mini\_batch\_size) - num\_staleness\_sample $$ + + num_staleness_sample 表示上一次rollout多生成的陈旧样本数。 + + 由于是流式系统,rollout持续生成,trainer持续消费。如果rollouter较慢,trainer会更早触发参数同步,rollouter并不会实际生产rollout_num个样本。 + 当rollout 足够快时,staleness_threshold设置为1,基本上等价于one_step_off policy。 + 为了避免过期样本太多影响训练精度,建议该值设置小于1。 + +* `async_training.partial_rollout` + + partial_rollout只会在staleness_threshold>0时才实际上起作用。 + +* `async_training.use_rollout_log_probs` + + 在强化学习算法中,log_probs与参数版本,token都存在隐性的相关性。由于PPO/GRPO/DAPO等算法的设定,我们在计算重要性采样时, + 即 old_log_prob必须使用rollout参数及token所对应log_probs,才能保证算法的正确性。在fully + async策略中,我们默认old_log_prob是有rollout所计算的,而不是由trainer所计算。 + +* `async_training.require_batches` + + 在流式训练中,require_batches 应该设置为1,表示生产够ppo_mini_batch_size样本后,就进行训练。 + 在实际测试中,我们发现,如果单次下发的样本较少,由于数据分发的顺序,会导致训练不稳定,response 长度变长。 + 在这里,我们额外提供 require_batches 进行流式分发,单次参与训练的样本数量控制。 + +* `async_training.compute_prox_log_prob` (experimental) + + 我们在训练过程中,观测到随着训练的进行,训练后期指标和response长度可能会出现不稳定的情况, + 这里我们可以使用 [Rollout Importance Sampling](https://verl.readthedocs.io/en/latest/advance/rollout_is.html) 的技术进行 + 重要性采样,缓解这一问题。为了使用 `Rollout Importance Sampling` 我们需要使用训练引擎使用当前的参数版本计算old_log_prob,此开关需要打开。 + 此外,在 mode d (async stream pipeline with partial rollout) 的情况下开启 `compute_prox_log_prob` 以及 + `Rollout Importance Sampling` 后,我们的实现已近似Areal的 `Decoupled PPO`。 + +* `async_training.checkpoint_engine.enable` + + 开启checkpoint engine后,相较于原始的逐tensor的参数同步方式,同步时间开销普遍可以降低60%以上。但是组装bucket会带来额外的临时显存开销。 + +* `async_training.checkpoint_engine.overlap_broadcast_and_consume` + + 开启参数broadcast和load_weights之间的流水后,会进一步额外申请更多显存。由于目前分析参数同步的主要耗时并非来自broadcast和load_weights阶段,而是在参数生成阶段(由megatron或FSDP),因此该开关默认关闭。 + +* `async_training.checkpoint_engine.device_buffer_size_M` + + 控制开启checkpoint engine后,用于同步的显存buffer大小。实际的`bucket_size` = `max(device_buffer_size_M, 最大参数tensor size)` + * 在开启`overlap_broadcast_and_consume`时,trainer节点的临时额外显存开销为 `3 * bucket_size`, rollout节点的临时额外显存开销为`2 * bucket_size`。 + * 在关闭`overlap_broadcast_and_consume`时,trainer节点的临时额外显存开销为 `2 * bucket_size`, rollout节点的临时额外显存开销为`1 * bucket_size`。 + +* `async_training.use_trainer_do_validate` + + 控制是否使用trainer的`do_validate`方法进行validation。 + 如果设置为True,trainer会在每次参数更新后,调用`do_validate`方法进行validation。 + 如果设置为False,trainer不会调用`do_validate`方法。 + +### 模式支持 + +1. on policy pipeline: + 1. **trigger_parameter_sync_step=1,staleness_threshold=0** + 2. Rollouter一次生产`require_batches*ppo_mini_batch_size` + 的samples,Trainer获取这些samples后进行训练,训练完后Trainer和Rollouter之间进行一次参数同步; + 3. 在rollout阶段,如果存在长尾的样本,但是rollout样本数较少时,较短的样本无法填充到空闲的资源中,会造成一定的资源浪费。 + 4. 如图a所示; + +2. stream off policy pipeline: + 1. **trigger_parameter_sync_step>1,staleness_threshold=0** + 2. 将会进行同步的流式训练,Rollouter一次生产`require_batches*ppo_mini_batch_size*trigger_parameter_sync_step` + 的samples,Trainer每获取`require_batches*ppo_mini_batch_size` + 就进行一次本地训练,训练trigger_parameter_sync_step次后,Trainer和Rollouter之间进行一次参数同步; + 3. 相较于a,由于一次生成的样本更多,资源的空闲会更低。 + 4. 在一次step训练中,会存在两次资源闲置的时间,分别是在第一次获取样本时,train等待`require_batches*ppo_mini_batch_size` + 个样本生产,以及最后一次参数更新时,rollout等待训练完成。 + 5. 如图b所示; + +3. async stream pipeline with staleness samples: + 1. **trigger_parameter_sync_step>=1,staleness_threshold>0,partial_rollout=Flase** + 2. Rollouter在每次参数更新后将计划最多生产rollout_num个样本(实际根据rollout速度,生成的样本可能会少与这个值)。 + 3. 如果rollout过程比较快,Rollouter将会在参数同步前额外生成一部分样本num_stale_samples,用于参数同步后立即给Trainer使用。 + 触发参数同步时,如果Rollouter有正在生产的任务,将会等待任务完成,同时不会添加新的任务; + 4. 相较于b,除第一次step训练外,后续的训练都不会有wait first batch rollout finish的时间,但是会有wait active task + finish的时间。 + 5. 如图c所示; + +4. async stream pipeline with partial rollout: + 1. **trigger_parameter_sync_step>=1,staleness_threshold>0,partial_rollout=True** + 2. 相较于c,触发参数同步时,Rollouter如果有正在生产的sample,会打断rollout过程并进行参数同步,被中断的sample会在参数同步后继续生成。减少了wait + active task finish的时间。 + 3. 如图d所示; + +![fully_async_policy_mode]( +https://github.com/ArronHZG/verl-community/blob/recipe/async_policy/docs/fully_async_policy_mode.svg?raw=true) + +### 关键指标 + +| metrics | implication | +|------------------------------------------------|-----------------------------------------------------------| +| `trainer/idle_ratio` | Trainer闲置率 | +| `rollouter/idle_ratio` | Rollouter闲置率 | +| `fully_async/count/stale_samples_processed` | 训练使用的旧sample总数 | +| `fully_async/count/stale_trajectory_processed` | 训练使用的旧trajectory总数(一个sample会生产rollout.n条trajectory) | +| `fully_async/partial/total_partial_num` | 两次trigger_parameter_sync_step之间Trainer处理的partial样本数 | +| `fully_async/partial/partial_ratio` | 两次trigger_parameter_sync_step之间Trainer处理的partial样本的比例 | +| `fully_async/partial/max_partial_span` | 两次trigger_parameter_sync_step之间Trainer处理的partial样本的最大参数跨度 | + +### 调参建议 + +* 资源分配与调整: + * 合理的资源分配是获得好的训练效率的前提。理想的资源分配情况应该是使得Rollout的时间和Train的时间接近,从而使得整个训练过程流水气泡最小, + 避免资源闲置,同时Trainer不会使用旧样本。在真实训练场景下,可以根据实际训练过程中rollout和train的空闲时间调整资源分配, + 可从rollouter/idle_ratio和trainer/idle_ratio获得,如果rollouter/idle_ratio较高trainer/idle_ratio较低, + 应该增多Trainer的资源减少Rollouter的资源,反之亦然。 + +* 关键参数: + * staleness_threshold: 设置太大会导致较多的旧样本使用,影响模型效果,建议设置小于1。 + * require_batches:越接近1,越接近纯流式过程,训练过程中bubble越小,能够在速度上获得更快的加速效果,但会对样本的处理顺序产生影响; + * trigger_parameter_sync_step: 设置的越小越接近on policy,但会导致频繁的参数同步,长尾样本浪费的资源无法被短样本填充,资源利用率低。 + 设置的越大有更高的计算效率,但是精度上会受到off policy的影响。 + * rollout.test_freq: 会占用Rollouter资源,不建议设置太小。 + +* 模式选择:通过调整不同的参数,Fully Async架构支持不同程度上的优化加速,适用于不同场景的任务。 + * 对于小规模任务,需要保证训练的稳定性和 on-policy 性,对速度要求不高的场景,可以尝试使用on policy pipeline的模式(模式1)。 + * 对于需要提高训练吞吐量,但对 staleness 敏感的场景,可以尝试使用 stream off policy pipeline 的模式。即通过 + 设置trigger_parameter_sync_step>1 ,提高 训练效率,但仍保持同步机制 (staleness_threshold=0 )(模式2)。 + * 对于大规模任务,对训练速度有较高要求,且可以容忍一定 off-policy 程度、staleness的场景,可以设置staleness_threshold> + 0、partial_rollout=True提高训练效率,使用 async stream pipeline 模式(模式 3 或 4)。 + +### 快速开始 + +```shell +rollout_mode="async" +rollout_name="vllm" # sglang or vllm +if [ "$rollout_mode" = "async" ]; then + export VLLM_USE_V1=1 + return_raw_chat="True" +fi + +train_prompt_bsz=0 +gen_prompt_bsz=1 +n_resp_per_prompt=16 +train_prompt_mini_bsz=32 +total_rollout_steps=$(((512*400))) +test_freq=10 +staleness_threshold=0 +trigger_parameter_sync_step=16 +partial_rollout=False + + +python -m recipe.fully_async_policy.fully_async_main \ + train_batch_size=${train_prompt_bsz} \ + data.gen_batch_size=${gen_prompt_bsz} \ + data.return_raw_chat=${return_raw_chat} \ + actor_rollout_ref.rollout.n=${n_resp_per_prompt} \ + actor_rollout_ref.actor.strategy=fsdp2 \ + critic.strategy=fsdp2 \ + actor_rollout_ref.hybrid_engine=False \ + actor_rollout_ref.actor.use_dynamic_bsz=${use_dynamic_bsz} \ + actor_rollout_ref.ref.log_prob_use_dynamic_bsz=${use_dynamic_bsz} \ + actor_rollout_ref.rollout.log_prob_use_dynamic_bsz=${use_dynamic_bsz} \ + actor_rollout_ref.rollout.name=${rollout_name} \ + actor_rollout_ref.rollout.mode=${rollout_mode} \ + actor_rollout_ref.rollout.calculate_log_probs=True \ + trainer.nnodes="${NNODES_TRAIN}" \ + trainer.n_gpus_per_node="${NGPUS_PER_NODE}" \ + rollout.nnodes="${NNODES_ROLLOUT}" \ + rollout.n_gpus_per_node="${NGPUS_PER_NODE}" \ + rollout.total_rollout_steps="${total_rollout_steps}" \ + rollout.test_freq="${test_freq}" \ + async_training.staleness_threshold="${staleness_threshold}" \ + async_training.trigger_parameter_sync_step="${trigger_parameter_sync_step}" \ + async_training.partial_rollout="${partial_rollout}" +``` + +## 实验 + +### 在7B模型上进行异步训练 + +我们使用 Qwen2.5-Math-7B 验证 fully async 策略在长候选下,多种资源下的收益情况。 +使用`async stream pipeline with staleness samples` 策略,我们在32卡,64卡,128卡都取得2x左右的性能提升,同时没有显著影响实验效果。 + +* 机器:H20 +* 模型:Qwen2.5-Math-7B +* rollout长度:max_response_length FSDP2: 28K tokens; +* 算法:DAPO +* 数据集: TRAIN_FILE: dapo-math-17k.parquet TEST_FILE: aime-2024.parquet +* engine: vllm+FSDP2 +* rollout.n: 16 +* ppo_mini_batch_size: 32 +* test_freq: 20 + +* colocate sync: + * step: 400 + * train_batch_size: 512 + +* fully_async_policy + * total_rollout_steps: 512*400 + * require_batches: 4 + * trigger_parameter_sync_step: 4 + * staleness_threshold: 0.5 + * partial_rollout: True + +| training mode | resource allocation | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | total time
300 step | total time
400 step | acc/mean@1 | +|:--------------------:|:---------------------:|:--------:|:--------:|:--------------:|:--------------:|:------------------------:|:------------------------:|:------------------------:|:------------------------:|:-------------------------------:| +| colocate sync | 32 | 790.10 | 357.41 | 107.71 | 269.80 | 13h 44m | 1d 3h 43m | 2d 9h 22m | 3d 17h 5m | max: 0.3313
last: 0.2448 | +| fully_async_policy | 16:16 | 294.77 | 21.26 | \ | 313.81 | 7h 58m
(1.72x) | 16h 21m
(1.70x) | 1d 0h 53m
(2.31x) | 1d 9h 26m
(2.66x) | max: 0.3302
last: 0.2333 | +| colocate sync | 64 | 365.28 | 150.72 | 70.26 | 133.41 | 10h 22m | 20h 45m | 1d 7h 6m | 1d 17h 32m | max: 0.3365
last: 0.2333 | +| fully_async_policy | 32:32 | 189.26 | 28.46 | \ | 156.98 | 4h 57m
(2.09x) | 10h 14m
(2.03x) | 16h 58m
(1.83x) | 21h 40m
(1.92x) | max: 0.3677
last: 0.3406 | +| colocate sync | 128 | 356.30 | 177.85 | 53.92 | 113.81 | 8h 36m | 17h 56m | 1d 5h 6m | 1d 16h 48m | max: 0.3573
last: 0.2958 | +| fully_async_policy | 64:64 | 150.63 | 33.14 | \ | 113.16 | 3h 13m
(2.67x) | 6h 46m
(2.65x) | 10h 53m
(2.67x) | 17h 22m
(2.35x) | max: 0.3521
last: 0.3094 | + +> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-colocate_async?nw=nwuserhouzg + +### 128卡 7B 异步模式实验 + +我们使用 Qwen2.5-Math-7B 验证 fully async 所支持的各个模式的效果。 +我们可以看到 stream 带来的收益大约1.6x,叠加 staleness 和 partial_rollout 后,收益为2.35x。 + +| mode | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | total time
300 step | total time
400 step | acc/mean@1 | +|:-------------------------------------------------------------------------------------------------------:|:--------:|:--------:|:--------------:|:--------------:|:------------------------:|:------------------------:|:------------------------:|:------------------------:|:------------------------------:| +| colocate sync | 356.30 | 177.85 | 53.92 | 113.81 | 8h 36m | 17h 56m | 1d 5h 6m | 1d 16h 48m | max: 0.3573
last: 0.2958 | +| `stream off policy pipeline`
(+fully async: trigger_parameter_sync_step= 4,
require_batches= 4) | 231.34 | 128.47 | \ | 98.77 | 4h 25m | 9h 41m | 15h 2m | 1d 1h 53m | max: 0.2844
last: 0.2604 | +| `async stream pipeline with staleness samples`
(+staleness_threshold=0.5) | | | | | | | | | | +| `async stream pipeline with partial rollout`
(+partial_rollout=True) | 150.63 | 33.14 | \ | 113.16 | 3h 13m | 6h 46m | 10h 53m | 17h 22m | max: 0.3521
last: 0.3094 | + +> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-stream_stale_partial?nw=nwuserhouzg + +### 128卡 stale 消融实验 + +在 `async stream pipeline with partial rollout` 模式下,我们验证 staleness 的设置对于训练效率的影响。 +我们可以发现,staleness 越大,最终取得的收益越明显。 +同时我们也注意到 staleness 取 0.3 和 0.5 的时间比较接近,原因是随着训练步数的增量,response 长度变化较大,训练出现了不稳定的问题。 +后续还需要针对该问题进行进一步的分析和优化。 + +| staleness_threshold | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | total time
300 step | total time
400 step | acc/mean@1 | +|:---------------------:|:--------:|:--------:|:--------------:|:--------------:|:------------------------:|:------------------------:|:------------------------:|:------------------------:|:-----------------------------:| +| 0 | 231.34 | 128.47 | \ | 98.77 | 4h 25m | 9h 41m | 15h 2m | 1d 1h 53m | max: 0.2844
last: 0.2604 | +| 0.1 | 171.30 | 58.17 | \ | 109.12 | 3h 53m | 8h 37m | 14h 25m | 19h 59m | max: 0.3542
last: 0.2979 | +| 0.3 | 146.11 | 38.88 | \ | 103.22 | 3h 18m | 6h 49m | 11h 40m | 17h 20m | max: 0.3469
last: 0.2865 | +| 0.5 | 150.63 | 33.14 | \ | 113.16 | 3h 13m | 6h 46m | 10h 53m | 17h 22m | max: 0.3521
last: 0.3094 | + +> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-ablation_stale?nw=nwuserhouzg + +### 128卡 7B require_batches 消融实验 + +在多次测试下,我们发现流式每次下发样本的数量会影响训练的response长度,进而影响训练时长,我们通过修改 +`async_training.require_batches` 验证对与结果的影响。 + +| require_batches | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | total time
300 step | acc/mean@1 | +|:-----------------:|:--------:|:-------:|:--------------:|:--------------:|:------------------------:|:------------------------:|:------------------------:|:-----------------------------:| +| 1 | 203.47 | 30.88 | \ | 181.08 | 3h 31m | 8h 29m | 17h 36m | max: 0.349
last: 0.326 | +| 2 | 158.72 | 26.32 | \ | 128.08 | 3h 35m | 7h 38m | 13h 57m | max: 0.351
last: 0.3406 | +| 4 | 124.64 | 25.62 | \ | 95.06 | 3h 13m | 6h 46m | 10h 53m | max: 0.3521
last: 0.3521 | + +> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-ablation_require_batches?nw=nwuserhouzg + +### 30B模型模式实验 + +我们在 Qwen3-30B-A3B-Base 模型上通过`async stream pipeline with staleness samples` 策略,相比于 colocate 方案取得了 1.7 +倍的性能提升。值得说明的是,这距离异步方式所能带来的性能提升上限还有很大空间。首先,对比实验中使用的最大响应长度仅为 +8k,这远低于此前实验的 20k 序列长度,因此 rollout 的长尾效应并不明显。其次,我们采用了极为倾斜的资源分配方案,rollout 使用了 +96 张 GPU,而 trainer 仅使用了 32 张 GPU,这并不是最优的配置。在实验过程中,我们观察到当前的 verl 实现存在一些限制,比如要求数据必须能被 +GPU 数量整除,这使得资源调整的灵活性受到影响。此外,随着异步训练和部署的加速,性能差距也在逐渐缩小。因此,未来我们将重点关注如何实现更灵活的资源分配和动态调整资源。 + +* 机器:H20 +* 模型:Qwen3-30B-A3B-Base +* rollout长度:max_response_length : 8K tokens; +* 算法: GRPO +* 数据集: TRAIN_FILE: dapo-math-17k.parquet TEST_FILE: aime-2024.parquet +* Engine: vllm+Megatron +* rollout.n: 16 +* ppo_mini_batch_size: 128 +* test_freq: 20 + +* colocate sync: + * step:400 + * train_batch_size: 512 + +* fully_async_policy + * total_rollout_steps: 512*400 + * trigger_parameter_sync_step: 512/128 = 4 + * staleness_threshold: 0.5 + * partial_rollout: True + +| Training Mode | Resource Allocation | Step | Gen | Old Log Prob | Ref | Update Actor | Total Time 100 Step | Total Time 200 Step | Total Time 300 Step | Total Time 400 Step | Acc/Mean@1 | +|----------------------|--------------------|---------|--------|--------------|--------|--------------|---------------------|---------------------|---------------------|---------------------|-----------------------------| +| Colocate Sync | 128 | 497.89 | 348.05 | 28.73 | 20.86 | 86.27 | 13h 36m | 1d 3h 48m | 1d 19h 4m | 2d 11h 39m | max: 0.3500
last: 0.3208 | +| Fully Async Policy | 96:32 | 282.75 | 22.06 | \ | 50.05 | 206.63 | 6h 45m (2.01x) | 14h 48m (1.88x) | 1d 0h 9m (1.78x) | 1d 10h 41m (1.72x) | max: 0.3813
last: 0.3448 | + +> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-30B?nw=nwuserhouzg + +### checkpoint-engine参数同步消融实验 +我们在Qwen2.5-Math-7B,Qwen3-30B-A3B和Qwen3-235B-A22B三个模型上测试了checkpoint-engine参数同步的单步参数同步耗时,使用的参数均为默认参数配置。实验均在H20机器上完成,并使用megatron训练引擎。 +| model | trainer rank | rollout rank | checkpoint-engine | total sync time | +|:-----------------:|:--------:|:-------:|:--------------:|:--------------:| +| Qwen2.5-Math-7B | 4 | 4 | False | 0.12s | +| Qwen2.5-Math-7B | 4 | 4 | True | 0.02s | +| Qwen3-30B-A3B | 16 | 16 | False | 15.76s | +| Qwen3-30B-A3B | 16 | 16 | True | 4.38s | +| Qwen3-235B-A22B | 64 | 64 | False | 58.57s | +| Qwen3-235B-A22B | 64 | 64 | True | 23.70s | + +### use_trainer_do_validate 实验测试 +我们在Qwen2.5-Math-7B模型上测试了`use_trainer_do_validate`参数的影响。这个结果展示使用`use_trainer_do_validate=True`可以减少验证时间开销,并且训练器节点的空闲时间也减少了。 + +* Machine: H20 +* Model: Qwen2.5-Math-7B +* Rollout length: max_response_length FSDP2: 10K tokens; +* Algorithm: DAPO +* Dataset: TRAIN_FILE: dapo-math-17k.parquet TEST_FILE: aime-2024.parquet +* Engine: vllm+FSDP2 +* rollout.n: 16 +* ppo_mini_batch_size: 32 +* test_freq: 10 + +* fully_async_policy + * total_rollout_steps: 512*400 + * require_batches: 4 + * trigger_parameter_sync_step: 4 + * staleness_threshold: 0.5 + * partial_rollout: True + +| training mode | resource allocation | step | gen | old_log_prob | update_actor | validate time | total time
50 step | acc/mean@2 | +|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:| +| colocate sync | 16 | 484.623 | 52.939 | 0 | 430.263 | 205.080 | 7h9m | 22.6 | +| fully_async_policy | 8:8 | 489.953 | 52.622 | 0 | 435.874 | 95.699 | 7h2m | 21.0 | +| fully_async_policy_opt_validate | 8:8 | | | 0 | | | | | + + +## 多轮工具调用 + +参考 **recipe/retool** 和 **ToolAgentLoop**,我们为 **fully_async_policy** 实现了支持partial rollout的多轮工具调用循环 * +*AsyncPartialToolAgentLoop**。 + +### 核心设计 + +`AsyncPartialToolAgentLoop` 继承自 `ToolAgentLoop`,其核心是适配了 `fully_async_policy` 的异步训练模式。当 +`partial_rollout=True` 时,Rollouter 在与 Trainer 同步参数前会中断正在进行的生成任务。`AsyncPartialToolAgentLoop` 能够: + +1. **中断任务**: 响应中断信号,保存当前的生成状态。目前,中断会发生在GENERATING过程中,或其他状态结束后; +2. **恢复任务**: 在参数同步完成后,从保存的状态恢复,继续执行,而不是从头开始。 + +### 使用方法 + +`fully_async_policy`多轮与工具调用的RL训练与 `recipe/retool` 类似,通过在配置文件中指定 `multi_turn` 相关配置来启用。 + +1. **SFT 阶段**: 首先,需要对模型进行 SFT训练,使其具备遵循工具调用格式指令的能力。 +2. **配置启用**: 在 `fully_async_policy` 的训练配置中,设置以下参数: + ```yaml + actor_rollout_ref: + rollout: + multi_turn: + enable: True # 在fully_async_policy模式下将默认使用AsyncPartialToolAgentLoop + # 其他 multi_turn 相关配置 + ``` +3. **配置async参数**: 为提高效率,在启用多轮工具调用时,同时开启 `partial_rollout`和`staleness_threshold`: + ```yaml + async_training: + partial_rollout: True + staleness_threshold: 0.5 + # 其他async参数 + ``` +4. **example**: 参考`recipe/fully_async_policy/shell/dapo_7b_async_retool.sh` + +### 实验结果 + +为验证 `fully_async_policy` 在多轮工具调用任务中的性能,我们将其与标准 `colocate` 同步模式进行了对比。实验具体设置如下。 + +* **SFT模型**: 实验基于 `Qwen2.5-7B-Instruct` 模型,使用`ReTool-SFT`数据集训练6个epoch; +* **RL算法**: DAPO +* **数据集**: + * 训练集: `DAPO-Math-17k` + * 测试集: `aime_2025` +* **资源与模式对比**: + * `colocate sync`: 32卡 H20 + * `fully_async_policy`: 16卡 Trainer + 16卡 Rollouter +* **关键配置**: + 1. **工具调用配置**: + * `multi_turn.enable: True` + * `multi_turn.max_user_turns: 16` + * `multi_turn.max_assistant_turns: 16` + * `multi_turn.tool_config_path: recipe/retool/sandbox_fusion_tool_config.yaml` + 2. **`colocate sync`配置**: + * `ppo_mini_batch_size: 16` + * `train_batch_size: 64` + 3. **`fully_async_policy`配置**: + * `ppo_mini_batch_size: 16` + * `trigger_parameter_sync_step: 4` + * `require_batches: 1` + * `staleness_threshold: 1` + * `partial_rollout: True` + +| training mode | Resource allocation | step | gen | old_log_prob | update_actor | total time
100 step | total time
200 step | aime_2025
acc/mean@30 | +|:------------------: |:-------------------: |:-------: |:-------: |:------------: |:------------: |:----------------------: |:----------------------: |:---------------------------: | +| colocate | 32 | 375.47 | 228.03 | 35.19 | 111.84 | 9h 46m | 22h 28m | start:0.1078
last:0.2056 | +| fully_async_policy | 16: 16 | 221.36 | 40.59 | \ | 179.58 | 6h 19m
(1.55x) | 14h 4m
(1.60x) | start:0.11
last:0.2044 | + +> source data: https://wandb.ai/hou-zg-meituan/fully-async-policy-multiturn-tool?nw=nwuserhouzg + +## 后续计划 + +* GRPO实验 +* megatron 适配 +* sglang 集成 +* transfer queue 集成 +* 异步参数同步 +* Areal异步算法实现 +* TPPO算法实现 +* 多轮及Tool的支持 \ No newline at end of file diff --git a/verl/experimental/fully_async_policy/README.md b/verl/experimental/fully_async_policy/README.md index 051e57586b8..54228f81933 100644 --- a/verl/experimental/fully_async_policy/README.md +++ b/verl/experimental/fully_async_policy/README.md @@ -105,6 +105,7 @@ but the overlap in their time consumption reduces the end-to-end time consumptio | `async_training.checkpoint_engine.enable` | Whether to use checkpoint_engine for accelerating, default `True` | | `async_training.checkpoint_engine.overlap_broadcast_and_consume` | When use checkpoint_engine, whether to overlap broadcast and load_weights, default `False` | | `async_training.checkpoint_engine.device_buffer_size_M` | When use checkpoint_engine, the user-specific bucket size (MB), default `4096` | +| `async_training.use_trainer_do_validate` | Whether use trainer node to do validate process, default `False` | **Further Explanation:** @@ -194,6 +195,13 @@ but the overlap in their time consumption reduces the end-to-end time consumptio - When disable `overlap_broadcast_and_consume`, the additional device memory overhead of trainer rank is `2 * bucket_size`and rollout rank is `1 * bucket_size`。 +- `async_training.use_trainer_do_validate` + + It controls whether to use the trainer's `do_validate` method for validation. + If set to True, the trainer will perform validation after each parameter update. It can reduce the validation time + overhead and trainer node idle time. + If set to False, the trainer will not perform validation. + ### Supported Modes 1. on policy pipeline: @@ -477,6 +485,38 @@ We tested the single-step parameter synchronization time of the checkpoint-engin | Qwen3-235B-A22B | 64 | 64 | False | 58.57s | | Qwen3-235B-A22B | 64 | 64 | True | 23.70s | + +### use_trainer_do_validate Experiment + +We tested the effect of setting `use_trainer_do_validate=True` on the training process. The results show that setting +this parameter to True can reduce the validation time overhead and trainer node idle time. +We used Qwen2.5-Math-7B to verify the benefits of `use_trainer_do_validate=True` on the training process, we achieved about 2x performance improvement on validation time, and the trainer node idle time is reduced by about 40%. + +- Machine: H20 +- Model: Qwen2.5-Math-7B +- Rollout length: max_response_length FSDP2: 10K tokens; +- Algorithm: DAPO +- Dataset: + - TRAIN_FILE: dapo-math-17k.parquet + - TEST_FILE: aime-2024.parquet +- Engine: vllm+FSDP2 +- rollout.n: 16 +- ppo_mini_batch_size: 32 +- test_freq: 10 + +- fully_async_policy + - total_rollout_steps: 512*400 + - require_batches: 4 + - trigger_parameter_sync_step: 4 + - staleness_threshold: 0.5 + - partial_rollout: True + +| training mode | resource allocation | step | gen | old_log_prob | update_actor | validate time | total time
50 step | acc/mean@2 | +|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:| +| colocate sync | 16 | 484.623 | 52.939 | 0 | 430.263 | 205.080 | 7h9m | 22.6 | +| fully_async_policy | 8:8 | 489.953 | 52.622 | 0 | 435.874 | 95.699 | 7h2m | 21.0 | +| fully_async_policy_opt_validate | 8:8 | | | 0 | | | | | + ## Multi-Turn Tool Calling Referencing **recipe/retool** and **ToolAgentLoop**, we implemented **AsyncPartialToolAgentLoop**, a multi-turn diff --git a/verl/experimental/fully_async_policy/README_zh.md b/verl/experimental/fully_async_policy/README_zh.md index a5d7f17bc1d..812e5eb8426 100644 --- a/verl/experimental/fully_async_policy/README_zh.md +++ b/verl/experimental/fully_async_policy/README_zh.md @@ -82,7 +82,7 @@ fully_async_policy 的整体架构如下图所示,fully_async_policy 主要由 | `async_training.checkpoint_engine.enable` | 是否开启 checkpoint_engine 模式的加速,默认值 True | | `async_training.checkpoint_engine.overlap_broadcast_and_consume` | 启动 checkpoint_engine 时,是否在参数同步时在 broadcast 和加载之间使用流水,默认值 False | | `async_training.checkpoint_engine.device_buffer_size_M` | 启动 checkpoint_engine 时,组装的 bucket 的大小(MB),默认为 4096 | - +| `async_training.use_trainer_do_validate` | 是否使用 Trainer 的 do_validate 方法进行 validation,默认值 False | **进一步的解释:** - `rollout.total_rollout_steps` @@ -155,6 +155,12 @@ require_batches * ppo_mini_batch_size)`。 - 在开启`overlap_broadcast_and_consume`时,trainer 节点的临时额外显存开销为 `3 * bucket_size`, rollout 节点的临时额外显存开销为`2 * bucket_size`。 - 在关闭`overlap_broadcast_and_consume`时,trainer 节点的临时额外显存开销为 `2 * bucket_size`, rollout 节点的临时额外显存开销为`1 * bucket_size`。 +- `async_training.use_trainer_do_validate` + + 控制是否使用trainer的 `do_validate` 方法进行 validation 。 + 如果设置为 True,trainer 会在每次参数更新后,调用 `do_validate` 方法进行 validation。 + 如果设置为 False,trainer 不会调用 `do_validate` 方法。 + ### 模式支持 1. on policy pipeline: @@ -407,6 +413,36 @@ GPU 数量整除,这使得资源调整的灵活性受到影响。此外,随 | Qwen3-235B-A22B | 64 | 64 | False | 58.57s | | Qwen3-235B-A22B | 64 | 64 | True | 23.70s | +### use_trainer_do_validate 实验测试 + +我们在Qwen2.5-Math-7B模型上测试了 `use_trainer_do_validate` 参数的影响。这个结果展示使用 `use_trainer_do_validate=True` 可以减少验证时间开销,并且训练器节点的空闲时间也减少了。 + +- Machine: H20 +- Model: Qwen2.5-Math-7B +- Rollout length: max_response_length FSDP2: 10K tokens; +- Algorithm: DAPO +- Dataset: + - TRAIN_FILE: dapo-math-17k.parquet + - TEST_FILE: aime-2024.parquet +- Engine: vllm+FSDP2 +- rollout.n: 16 +- ppo_mini_batch_size: 32 +- test_freq: 10 + +- fully_async_policy + - total_rollout_steps: 512*400 + - require_batches: 4 + - trigger_parameter_sync_step: 4 + - staleness_threshold: 0.5 + - partial_rollout: True + +| training mode | resource allocation | step | gen | old_log_prob | update_actor | validate time | total time
50 step | acc/mean@2 | +|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:| +| colocate sync | 16 | 484.623 | 52.939 | 0 | 430.263 | 205.080 | 7h9m | 22.6 | +| fully_async_policy | 8:8 | 489.953 | 52.622 | 0 | 435.874 | 95.699 | 7h2m | 21.0 | +| fully_async_policy_opt_validate | 8:8 | | | 0 | | | | | + + ## 多轮工具调用 参考 **recipe/retool** 和 **ToolAgentLoop**,我们为 **fully_async_policy** 实现了支持 partial rollout 的多轮工具调用循环 \* From ca44f950630c2bc46303c88f546a69d322dbcfba Mon Sep 17 00:00:00 2001 From: HappyAngel Date: Mon, 5 Jan 2026 10:47:22 +0800 Subject: [PATCH 09/11] fix: update fully_async_policy documents --- docs/advance/fully_async.md | 1 - recipe/fully_async_policy/README.md | 1 - recipe/fully_async_policy/README_zh.md | 1 - 3 files changed, 3 deletions(-) diff --git a/docs/advance/fully_async.md b/docs/advance/fully_async.md index eef92a0a89e..5cb9b6acc7b 100644 --- a/docs/advance/fully_async.md +++ b/docs/advance/fully_async.md @@ -512,7 +512,6 @@ We used Qwen2.5-Math-7B to verify the benefits of `use_trainer_do_validate=True` |:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:| | colocate sync | 16 | 484.623 | 52.939 | 0 | 430.263 | 205.080 | 7h9m | 22.6 | | fully_async_policy | 8:8 | 489.953 | 52.622 | 0 | 435.874 | 95.699 | 7h2m | 21.0 | -| fully_async_policy_opt_validate | 8:8 | | | 0 | | | | | ## Multi-Turn Tool Calling diff --git a/recipe/fully_async_policy/README.md b/recipe/fully_async_policy/README.md index 389668c2242..70c8faa788b 100644 --- a/recipe/fully_async_policy/README.md +++ b/recipe/fully_async_policy/README.md @@ -507,7 +507,6 @@ We used Qwen2.5-Math-7B to verify the benefits of `use_trainer_do_validate=True` |:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:| | colocate sync | 16 | 484.623 | 52.939 | 0 | 430.263 | 205.080 | 7h9m | 22.6 | | fully_async_policy | 8:8 | 489.953 | 52.622 | 0 | 435.874 | 95.699 | 7h2m | 21.0 | -| fully_async_policy_opt_validate | 8:8 | | | 0 | | | | | ## Multi-Turn Tool Calling diff --git a/recipe/fully_async_policy/README_zh.md b/recipe/fully_async_policy/README_zh.md index fcd98b0f846..92af46dce59 100644 --- a/recipe/fully_async_policy/README_zh.md +++ b/recipe/fully_async_policy/README_zh.md @@ -433,7 +433,6 @@ GPU 数量整除,这使得资源调整的灵活性受到影响。此外,随 |:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:|:---------------:| | colocate sync | 16 | 484.623 | 52.939 | 0 | 430.263 | 205.080 | 7h9m | 22.6 | | fully_async_policy | 8:8 | 489.953 | 52.622 | 0 | 435.874 | 95.699 | 7h2m | 21.0 | -| fully_async_policy_opt_validate | 8:8 | | | 0 | | | | | ## 多轮工具调用 From ea8d97d4109bb7e94c726a5bab2fdf7ea87a6e8e Mon Sep 17 00:00:00 2001 From: HappyAngel Date: Thu, 8 Jan 2026 15:25:26 +0800 Subject: [PATCH 10/11] fix: fix format --- verl/utils/dataset/rl_dataset.py | 58 ++++++++++++++++---------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/verl/utils/dataset/rl_dataset.py b/verl/utils/dataset/rl_dataset.py index b333fe4ad7f..cc53526081b 100644 --- a/verl/utils/dataset/rl_dataset.py +++ b/verl/utils/dataset/rl_dataset.py @@ -391,35 +391,6 @@ async def process_vision_info( return images, videos -def get_dataset_class(data_config: DictConfig): - """Get RLHF dataset class. - - Args: - data_config: The data config. - - Returns: - dataset_cls: The dataset class. - """ - - # Check if a custom dataset class is specified in the data configuration - # and if the path to the custom class is provided - if "custom_cls" in data_config and data_config.custom_cls.get("path", None) is not None: - # Dynamically load the custom dataset class - dataset_cls = load_extern_object(data_config.custom_cls.path, data_config.custom_cls.name) - # Verify that the custom dataset class inherits from torch.utils.data.Dataset - if not issubclass(dataset_cls, Dataset): - raise TypeError( - f"The custom dataset class '{data_config.custom_cls.name}' from " - f"'{data_config.custom_cls.path}' must inherit from torch.utils.data.Dataset" - ) - else: - # Use the default RLHFDataset class if no custom class is specified - dataset_cls = RLHFDataset - print(f"Using dataset class: {dataset_cls.__name__}") - - return dataset_cls - return self.__dict__.copy() - def split(self, num_splits: int): """ split the dataset into num_splits sub-datasets @@ -470,3 +441,32 @@ def split(self, num_splits: int): splits.append(split_dataset) return splits + + +def get_dataset_class(data_config: DictConfig): + """Get RLHF dataset class. + + Args: + data_config: The data config. + + Returns: + dataset_cls: The dataset class. + """ + + # Check if a custom dataset class is specified in the data configuration + # and if the path to the custom class is provided + if "custom_cls" in data_config and data_config.custom_cls.get("path", None) is not None: + # Dynamically load the custom dataset class + dataset_cls = load_extern_object(data_config.custom_cls.path, data_config.custom_cls.name) + # Verify that the custom dataset class inherits from torch.utils.data.Dataset + if not issubclass(dataset_cls, Dataset): + raise TypeError( + f"The custom dataset class '{data_config.custom_cls.name}' from " + f"'{data_config.custom_cls.path}' must inherit from torch.utils.data.Dataset" + ) + else: + # Use the default RLHFDataset class if no custom class is specified + dataset_cls = RLHFDataset + print(f"Using dataset class: {dataset_cls.__name__}") + + return dataset_cls From 2e0990206351f99b3521b845f66fe45eaa66db26 Mon Sep 17 00:00:00 2001 From: HappyAngel Date: Thu, 8 Jan 2026 15:26:59 +0800 Subject: [PATCH 11/11] fix: fix format --- verl/utils/dataset/rl_dataset.py | 1 - 1 file changed, 1 deletion(-) diff --git a/verl/utils/dataset/rl_dataset.py b/verl/utils/dataset/rl_dataset.py index cc53526081b..f773061b193 100644 --- a/verl/utils/dataset/rl_dataset.py +++ b/verl/utils/dataset/rl_dataset.py @@ -390,7 +390,6 @@ async def process_vision_info( images, videos = process_vision_info(messages, image_patch_size=image_patch_size, return_video_metadata=True) return images, videos - def split(self, num_splits: int): """ split the dataset into num_splits sub-datasets