Skip to content

Commit aa4ca31

Browse files
authored
[fsdp, megatron] refactor: Refactor Fully Async Implementation via Engine Workers (verl-project#5269)
### What does this PR do? Refactor the fully async worker into an engine worker based on the separate ray trainer. > Add **concise** overview of what this PR aims to achieve or accomplish. Reference related GitHub issues and PRs that help with the review. ### Checklist Before Starting - [x] Search for similar PRs. Paste at least one query link here: ... - [x] Format the PR title as `[{modules}] {type}: {description}` (This will be checked by the CI) - `{modules}` include `fsdp`, `megatron`, `veomni`, `sglang`, `vllm`, `rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data`, `cfg`, `reward` - If this PR involves multiple modules, separate them with `,` like `[megatron, fsdp, doc]` - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test` - If this PR breaks any API (CLI arguments, config, function signature, etc.), add `[BREAKING]` to the beginning of the title. - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching` ### Test > For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc. ### API and Usage Example > Demonstrate how the API changes if any, and provide usage example(s) if possible. ```python # Add code snippet or script demonstrating how to use this ``` ### Design & Code Changes > Demonstrate the high-level design if this PR is complex, and list the specific changes. ### Checklist Before Submitting > [!IMPORTANT] > Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review. - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [ ] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). - [ ] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: ... - [ ] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). (If not accessible, please try [the Feishu group (飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).) - [ ] If your PR is related to the `recipe` submodule, please also update the reference to the submodule commit via `git submodule update --remote` or `cd recipe && git pull origin main`.
1 parent c9dabc7 commit aa4ca31

File tree

4 files changed

+482
-31
lines changed

4 files changed

+482
-31
lines changed

verl/experimental/fully_async_policy/fully_async_main.py

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -80,29 +80,42 @@ def create_role_worker_mapping(config):
8080
dict: Mapping from roles to worker classes
8181
"""
8282
# Select worker class based on strategy
83-
if config.actor_rollout_ref.actor.strategy in ["fsdp", "fsdp2"]:
84-
assert config.actor_rollout_ref.actor.strategy == config.critic.strategy
85-
from verl.experimental.fully_async_policy.fsdp_workers import (
86-
CriticWorker,
83+
use_legacy_worker_impl = config.trainer.get("use_legacy_worker_impl", "auto")
84+
if use_legacy_worker_impl == "disable":
85+
from verl.experimental.separation.engine_workers import (
8786
DetachActorWorker,
8887
DetachAsyncRolloutWorker,
88+
TrainingWorker,
8989
)
9090
from verl.single_controller.ray import RayWorkerGroup
9191

9292
ray_worker_group_cls = RayWorkerGroup
9393

94-
elif config.actor_rollout_ref.actor.strategy == "megatron":
95-
assert config.critic.strategy == "megatron"
96-
from verl.experimental.fully_async_policy.megatron_worker import (
97-
CriticWorker,
98-
DetachActorWorker,
99-
DetachAsyncRolloutWorker,
100-
)
101-
from verl.single_controller.ray import RayWorkerGroup
102-
103-
ray_worker_group_cls = RayWorkerGroup
94+
CriticWorker = TrainingWorker
10495
else:
105-
raise NotImplementedError(f"Unsupported strategy: {config.actor_rollout_ref.actor.strategy}")
96+
if config.actor_rollout_ref.actor.strategy in ["fsdp", "fsdp2"]:
97+
assert config.actor_rollout_ref.actor.strategy == config.critic.strategy
98+
from verl.experimental.fully_async_policy.fsdp_workers import (
99+
CriticWorker,
100+
DetachActorWorker,
101+
DetachAsyncRolloutWorker,
102+
)
103+
from verl.single_controller.ray import RayWorkerGroup
104+
105+
ray_worker_group_cls = RayWorkerGroup
106+
107+
elif config.actor_rollout_ref.actor.strategy == "megatron":
108+
assert config.critic.strategy == "megatron"
109+
from verl.experimental.fully_async_policy.megatron_worker import (
110+
CriticWorker,
111+
DetachActorWorker,
112+
DetachAsyncRolloutWorker,
113+
)
114+
from verl.single_controller.ray import RayWorkerGroup
115+
116+
ray_worker_group_cls = RayWorkerGroup
117+
else:
118+
raise NotImplementedError(f"Unsupported strategy: {config.actor_rollout_ref.actor.strategy}")
106119

107120
train_role = Role.ActorRollout if config.async_training.use_trainer_do_validate else Role.Actor
108121
role_worker_mapping = {

verl/experimental/one_step_off_policy/main_ppo.py

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -82,29 +82,42 @@ def create_role_worker_mapping(config):
8282
dict: Mapping from roles to worker classes
8383
"""
8484
# Select worker class based on strategy
85-
if config.actor_rollout_ref.actor.strategy in ["fsdp", "fsdp2"]:
86-
assert config.actor_rollout_ref.actor.strategy == config.critic.strategy
87-
from verl.experimental.one_step_off_policy.fsdp_workers import (
88-
CriticWorker,
85+
use_legacy_worker_impl = config.trainer.get("use_legacy_worker_impl", "auto")
86+
if use_legacy_worker_impl == "disable":
87+
from verl.experimental.separation.engine_workers import (
8988
DetachActorWorker,
9089
DetachAsyncRolloutWorker,
90+
TrainingWorker,
9191
)
9292
from verl.single_controller.ray import RayWorkerGroup
9393

9494
ray_worker_group_cls = RayWorkerGroup
9595

96-
elif config.actor_rollout_ref.actor.strategy == "megatron":
97-
assert config.critic.strategy == "megatron"
98-
from verl.experimental.one_step_off_policy.megatron_workers import (
99-
CriticWorker,
100-
DetachActorWorker,
101-
DetachAsyncRolloutWorker,
102-
)
103-
from verl.single_controller.ray import RayWorkerGroup
104-
105-
ray_worker_group_cls = RayWorkerGroup
96+
CriticWorker = TrainingWorker
10697
else:
107-
raise NotImplementedError(f"Unsupported strategy: {config.actor_rollout_ref.actor.strategy}")
98+
if config.actor_rollout_ref.actor.strategy in ["fsdp", "fsdp2"]:
99+
assert config.actor_rollout_ref.actor.strategy == config.critic.strategy
100+
from verl.experimental.one_step_off_policy.fsdp_workers import (
101+
CriticWorker,
102+
DetachActorWorker,
103+
DetachAsyncRolloutWorker,
104+
)
105+
from verl.single_controller.ray import RayWorkerGroup
106+
107+
ray_worker_group_cls = RayWorkerGroup
108+
109+
elif config.actor_rollout_ref.actor.strategy == "megatron":
110+
assert config.critic.strategy == "megatron"
111+
from verl.experimental.one_step_off_policy.megatron_workers import (
112+
CriticWorker,
113+
DetachActorWorker,
114+
DetachAsyncRolloutWorker,
115+
)
116+
from verl.single_controller.ray import RayWorkerGroup
117+
118+
ray_worker_group_cls = RayWorkerGroup
119+
else:
120+
raise NotImplementedError(f"Unsupported strategy: {config.actor_rollout_ref.actor.strategy}")
108121

109122
role_worker_mapping = {
110123
Role.Actor: ray.remote(DetachActorWorker),

0 commit comments

Comments
 (0)