-
Notifications
You must be signed in to change notification settings - Fork 276
feat: support GDPO #1986
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
feat: support GDPO #1986
Changes from 1 commit
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
f614e4a
squash (remove dependency update)
yuki-97 295e315
update dataset to new structure
yuki-97 578698e
register env and remove run_gdpo_gsm8k.py
yuki-97 16b0d53
align compute_advantage interface and move the initialization forward
6e1fd96
fix a small bug forgot to import re
e6a28ad
revert dependency
yuki-97 0428ef1
update math_gdpo_data_processor and system_prompt
yuki-97 665219d
add assert and revert some missing comments
yuki-97 b7597c6
revert async change and enable fast test
yuki-97 e898d01
addressed all comments and fixed bugs
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| # GDPO: inherits from grpo_math_1B.yaml and overrides only what differs. | ||
| defaults: grpo_math_1B.yaml | ||
|
|
||
| grpo: | ||
| adv_estimator: | ||
| name: "gdpo" | ||
| normalize_rewards: true | ||
| use_leave_one_out_baseline: false | ||
|
|
||
| checkpointing: | ||
| checkpoint_dir: "results/gdpo" | ||
|
|
||
| policy: | ||
| model_name: "Qwen/Qwen2.5-1.5B-Instruct" | ||
| logprob_batch_size: 4 | ||
| max_total_sequence_length: 1024 | ||
| megatron_cfg: | ||
| optimizer: | ||
| weight_decay: 0.0 | ||
| scheduler: | ||
| lr_decay_style: "cosine" | ||
| lr_warmup_iters: 10 | ||
|
|
||
| # GDPO uses a single flat data config (GSM8K + math_gdpo_data_processor); replace parent's train/validation/default. | ||
| data: | ||
| _override_: true | ||
| max_input_seq_length: ${policy.max_total_sequence_length} | ||
| prompt_file: "examples/prompts/cot.txt" | ||
| system_prompt_file: "examples/prompts/gsm8k.txt" | ||
| shuffle: true | ||
| num_workers: 1 | ||
| processor: "math_gdpo_data_processor" | ||
| env_name: "math" | ||
| dataset_name: "gsm8k" | ||
|
|
||
| env: | ||
| math: | ||
| num_workers: 8 | ||
| math_verify_impl: "hf_math_verify" | ||
|
|
||
| logger: | ||
| wandb_enabled: true | ||
| wandb: | ||
| project: "gdpo-dev" | ||
| name: "gdpo-dev-logger" | ||
| swanlab: | ||
| project: "gdpo-dev" | ||
| name: "gdpo-dev-logger" | ||
| mlflow: | ||
| experiment_name: "gdpo-dev" | ||
| run_name: "gdpo-dev-logger" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| You are a helpful AI assistant. | ||
|
|
||
| For every request, you should carefully think through the math problem step by step, then provide the final answer in integer format. | ||
|
|
||
| Steps for Each Request: | ||
| 1. Think: Provide detailed, step-by-step reasoning, calculations, or derivations. | ||
| 2. Produce Final Answer: After step-by-step reasoning, output the final answer in integer format. | ||
|
|
||
| Output Format: | ||
| <think>Your thoughts and reasoning</think> | ||
| <answer>Final answer in integer format</answer> | ||
|
|
||
| Important Notes: | ||
| 1. You must include your reasoning steps inside <think>. | ||
| 2. You must always output the Final Answer within <answer> after the reasoning steps is done. | ||
| 3. You should consistently work through the solution step by step before giving the final answer. | ||
| 4. The final answer can only be an integer. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,258 @@ | ||
| # Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. | ||
yuki-97 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| import argparse | ||
| import os | ||
| import pprint | ||
| from collections import defaultdict | ||
| from typing import Any, Optional | ||
|
|
||
| from omegaconf import OmegaConf | ||
| from transformers import PreTrainedTokenizerBase | ||
|
|
||
| from nemo_rl.algorithms.grpo import MasterConfig, grpo_train, setup | ||
| from nemo_rl.algorithms.utils import get_tokenizer | ||
| from nemo_rl.data import DataConfig | ||
| from nemo_rl.data.datasets import AllTaskProcessedDataset, load_response_dataset | ||
| from nemo_rl.data.interfaces import ( | ||
| TaskDataProcessFnCallable, | ||
| TaskDataSpec, | ||
| ) | ||
| from nemo_rl.data.processors import math_gdpo_data_processor | ||
| from nemo_rl.distributed.ray_actor_environment_registry import ( | ||
| get_actor_python_env, | ||
| ) | ||
| from nemo_rl.distributed.virtual_cluster import init_ray | ||
| from nemo_rl.environments.interfaces import EnvironmentInterface | ||
| from nemo_rl.environments.math_environment import MathMultiRewardEnvironment | ||
| from nemo_rl.models.generation import configure_generation_config | ||
| from nemo_rl.utils.config import load_config, parse_hydra_overrides | ||
| from nemo_rl.utils.logger import get_next_experiment_dir | ||
|
|
||
| OmegaConf.register_new_resolver("mul", lambda a, b: a * b) | ||
|
|
||
|
|
||
| def parse_args() -> tuple[argparse.Namespace, list[str]]: | ||
| """Parse command line arguments.""" | ||
| parser = argparse.ArgumentParser(description="Run GRPO training with configuration") | ||
| parser.add_argument( | ||
| "--config", type=str, default=None, help="Path to YAML config file" | ||
| ) | ||
|
|
||
| # Parse known args for the script | ||
| args, overrides = parser.parse_known_args() | ||
|
|
||
| return args, overrides | ||
|
|
||
|
|
||
| # =============================================================================== | ||
| # Math Data Processor | ||
| # =============================================================================== | ||
| TokenizerType = PreTrainedTokenizerBase | ||
|
|
||
|
|
||
| def setup_data( | ||
| tokenizer: TokenizerType, | ||
| data_config: DataConfig, | ||
| env_configs: dict[str, Any], | ||
| seed: int, | ||
| ) -> tuple[ | ||
| AllTaskProcessedDataset, | ||
| Optional[AllTaskProcessedDataset], | ||
| dict[str, EnvironmentInterface], | ||
| dict[str, EnvironmentInterface], | ||
| ]: | ||
| print("\n▶ Setting up data...") | ||
| math_task_spec = TaskDataSpec( | ||
| task_name="math", | ||
| prompt_file=data_config["prompt_file"], | ||
| system_prompt_file=data_config["system_prompt_file"], | ||
| ) | ||
|
|
||
| # load dataset | ||
| data: Any = load_response_dataset(data_config) | ||
| task_name = ( | ||
| data.task_name if hasattr(data, "task_name") else data.task_spec.task_name | ||
| ) | ||
|
|
||
| # data processor | ||
| task_data_processors: dict[str, tuple[TaskDataSpec, TaskDataProcessFnCallable]] = ( | ||
| defaultdict(lambda: (math_task_spec, math_gdpo_data_processor)) | ||
| ) | ||
| task_data_processors[task_name] = (math_task_spec, math_gdpo_data_processor) | ||
|
|
||
| # setup math environment | ||
| math_env = MathMultiRewardEnvironment.options( # type: ignore # it's wrapped with ray.remote | ||
| runtime_env={ | ||
| "py_executable": get_actor_python_env( | ||
| "nemo_rl.environments.math_environment.MathMultiRewardEnvironment" | ||
| ), | ||
| "env_vars": dict(os.environ), # Pass thru all user environment variables | ||
| } | ||
| ).remote(env_configs["math"]) | ||
|
|
||
| dataset = AllTaskProcessedDataset( | ||
| data.dataset, | ||
| tokenizer, | ||
| math_task_spec, | ||
| task_data_processors, | ||
| max_seq_length=data_config["max_input_seq_length"], | ||
| ) | ||
|
|
||
| val_dataset: Optional[AllTaskProcessedDataset] = None | ||
| if data.val_dataset is not None: | ||
| val_dataset = AllTaskProcessedDataset( | ||
| data.val_dataset, | ||
| tokenizer, | ||
| math_task_spec, | ||
| task_data_processors, | ||
| max_seq_length=data_config["max_input_seq_length"], | ||
| ) | ||
|
|
||
| task_to_env: dict[str, EnvironmentInterface] = defaultdict(lambda: math_env) | ||
| task_to_env[task_name] = math_env | ||
| return dataset, val_dataset, task_to_env, task_to_env | ||
|
|
||
|
|
||
| def main() -> None: | ||
| """Main entry point.""" | ||
| # Parse arguments | ||
| args, overrides = parse_args() | ||
|
|
||
| if not args.config: | ||
| args.config = os.path.join( | ||
| os.path.dirname(__file__), "configs", "gdpo_math_1B.yaml" | ||
| ) | ||
|
|
||
| config = load_config(args.config) | ||
| print(f"Loaded configuration from: {args.config}") | ||
|
|
||
| if overrides: | ||
| print(f"Overrides: {overrides}") | ||
| config = parse_hydra_overrides(config, overrides) | ||
|
|
||
| config: MasterConfig = OmegaConf.to_container(config, resolve=True) | ||
| print("Applied CLI overrides") | ||
|
|
||
| # Print config | ||
| print("Final config:") | ||
| pprint.pprint(config) | ||
|
|
||
| # Get the next experiment directory with incremented ID | ||
| config["logger"]["log_dir"] = get_next_experiment_dir(config["logger"]["log_dir"]) | ||
| print(f"📊 Using log directory: {config['logger']['log_dir']}") | ||
| if config["checkpointing"]["enabled"]: | ||
| print( | ||
| f"📊 Using checkpoint directory: {config['checkpointing']['checkpoint_dir']}" | ||
| ) | ||
|
|
||
| init_ray() | ||
|
|
||
| # setup tokenizer | ||
| tokenizer = get_tokenizer(config["policy"]["tokenizer"]) | ||
| assert config["policy"]["generation"] is not None, ( | ||
| "A generation config is required for GRPO" | ||
| ) | ||
| config["policy"]["generation"] = configure_generation_config( | ||
| config["policy"]["generation"], tokenizer | ||
| ) | ||
|
|
||
| # setup data | ||
| ( | ||
| dataset, | ||
| val_dataset, | ||
| task_to_env, | ||
| val_task_to_env, | ||
| ) = setup_data(tokenizer, config["data"], config["env"], config["grpo"]["seed"]) | ||
|
|
||
| ( | ||
| policy, | ||
| policy_generation, | ||
| cluster, | ||
| dataloader, | ||
| val_dataloader, | ||
| loss_fn, | ||
| logger, | ||
| checkpointer, | ||
| grpo_state, | ||
| master_config, | ||
| ) = setup(config, tokenizer, dataset, val_dataset) | ||
|
|
||
| # Check if async mode is enabled | ||
| if "async_grpo" in config["grpo"] and config["grpo"]["async_grpo"]["enabled"]: | ||
| # Async GRPO does not support dynamic sampling, reward scaling, or reward shaping (DAPO features) | ||
| unsupported_features = [ | ||
| "use_dynamic_sampling", | ||
| "reward_scaling", | ||
| "reward_shaping", | ||
| ] | ||
|
|
||
| for feature in unsupported_features: | ||
| if feature not in config["grpo"]: | ||
| continue | ||
|
|
||
| if feature == "use_dynamic_sampling": | ||
| if config["grpo"][feature]: | ||
| raise NotImplementedError( | ||
| f"{feature} is not supported with async GRPO" | ||
| ) | ||
| else: | ||
| if config["grpo"][feature]["enabled"]: | ||
| raise NotImplementedError( | ||
| f"{feature} is not supported with async GRPO" | ||
| ) | ||
|
|
||
| from nemo_rl.algorithms.grpo import async_grpo_train | ||
|
|
||
| print("🚀 Running async GRPO training") | ||
|
|
||
| async_config = config["grpo"]["async_grpo"] | ||
| # Run async GRPO training | ||
| async_grpo_train( | ||
| policy=policy, | ||
| policy_generation=policy_generation, | ||
| dataloader=dataloader, | ||
| val_dataloader=val_dataloader, | ||
| tokenizer=tokenizer, | ||
| loss_fn=loss_fn, | ||
| task_to_env=task_to_env, | ||
| val_task_to_env=val_task_to_env, | ||
| logger=logger, | ||
| checkpointer=checkpointer, | ||
| grpo_save_state=grpo_state, | ||
| master_config=master_config, | ||
| max_trajectory_age_steps=async_config["max_trajectory_age_steps"], | ||
| ) | ||
| else: | ||
| print("🚀 Running synchronous GRPO training") | ||
|
|
||
| # Run standard GRPO training | ||
| grpo_train( | ||
| policy, | ||
| policy_generation, | ||
| dataloader, | ||
| val_dataloader, | ||
| tokenizer, | ||
| loss_fn, | ||
| task_to_env, | ||
| val_task_to_env, | ||
| logger, | ||
| checkpointer, | ||
| grpo_state, | ||
| master_config, | ||
| ) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.