diff --git a/apps/grpo/main.py b/apps/grpo/main.py index 4948681c3..b17850b6d 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -11,6 +11,7 @@ from dataclasses import dataclass from typing import Any, Callable +from forge.controller import provisioner import torch import torch.nn.functional as F import torchstore as ts @@ -237,15 +238,17 @@ async def pad_token(self): return self._tokenizer.pad_token_id +from forge.controller.provisioner import _get_provisioner + async def main(cfg: DictConfig): """Main GRPO training loop with rollout and training processes.""" group_size = cfg.group_size max_req_tokens = cfg.max_req_tokens max_res_tokens = cfg.max_res_tokens mlogger = get_metric_logger( - "wandb", + # "wandb", freq=1, - project="grpo-training", + # project="grpo-training", ) # ---- Setup services ---- # @@ -387,6 +390,11 @@ async def continuous_training(): @parse def _main(cfg): + # import pickle + # with open("../qwen3_multinode.pkl", "wb") as fp: + # pickle.dump(cfg, fp) + + # return asyncio.run(main(cfg)) _main() # @parse grabs the cfg from CLI diff --git a/apps/grpo/qwen3_1_7b.yaml b/apps/grpo/qwen3_1_7b.yaml index 0f54a9bc1..62a641961 100644 --- a/apps/grpo/qwen3_1_7b.yaml +++ b/apps/grpo/qwen3_1_7b.yaml @@ -19,8 +19,10 @@ dataset: # Policy configuration policy: + checkpoint_path: checkpoints engine_config: - model: ${model} + tokenizer: /home/lpasqualin/titan_manifold/tree/forge/qwen3-1-7b-tokenizer + model: /home/lpasqualin/titan_manifold/tree/qwen3/Qwen3-1-7B tensor_parallel_size: 1 pipeline_parallel_size: 1 enforce_eager: false @@ -35,7 +37,7 @@ trainer: model: name: qwen3 flavor: 1.7B - hf_assets_path: hf://${model} + hf_assets_path: /home/lpasqualin/titan_manifold/tree/qwen3/Qwen3-1-7B/ optimizer: name: AdamW lr: 1e-5 @@ -47,7 +49,7 @@ trainer: seq_len: 2048 max_norm: 1.0 steps: 1000000 - dtype: bfloat16 + # dtype: bfloat16 compile: enable: false parallelism: @@ -60,11 +62,12 @@ trainer: disable_loss_parallel: true checkpoint: enable: true - initial_load_path: hf://${model} + initial_load_path: /home/lpasqualin/titan_manifold/tree/qwen3/Qwen3-1-7B/ initial_load_in_hf: true last_save_in_hf: true interval: 500 async_mode: "disabled" + folder: checkpoints activation_checkpoint: mode: selective selective_ac_option: op @@ -80,9 +83,9 @@ ref_model: model: name: qwen3 flavor: 1.7B - hf_assets_path: hf://${model} - training: - dtype: bfloat16 + hf_assets_path: /home/lpasqualin/titan_manifold/tree/qwen3/Qwen3-1-7B/ + # training: + # dtype: bfloat16 compile: enable: false parallelism: @@ -93,7 +96,7 @@ ref_model: context_parallel_degree: 1 expert_parallel_degree: 1 checkpoint: - initial_load_path: hf://${model} + initial_load_path: /home/lpasqualin/titan_manifold/tree/qwen3/Qwen3-1-7B/ initial_load_in_hf: true # All resource allocations diff --git a/apps/grpo/qwen3_multinode.yaml b/apps/grpo/qwen3_multinode.yaml index ade01855f..1eec56eb0 100644 --- a/apps/grpo/qwen3_multinode.yaml +++ b/apps/grpo/qwen3_multinode.yaml @@ -1,6 +1,5 @@ -# GRPO Training Configuration -# Currently a fork of the main yaml, this just shows -# placement of trainer and inference servers on separate hosts. +# Grouped Relative Policy Optimization (GRPO) +# >>> python -m apps.grpo.qwen3_1_7b --config apps/grpo/qwen3_1_7b.yaml # Global configuration group_size: 8 @@ -8,10 +7,11 @@ batch_size: 16 max_req_tokens: 512 max_res_tokens: 512 model: "Qwen/Qwen3-1.7B" +off_by_n: 1 # Off by one by default # Dataset configuration dataset: - path: "openai/gsm8k" + path: "openai/gsm8k" #add manifold path here revision: "main" data_split: "train" streaming: true @@ -20,7 +20,8 @@ dataset: # Policy configuration policy: engine_config: - model: ${model} + tokenizer: /mnt/mffuse/forge/qwen3-1-7b-tokenizer + model: /mnt/mffuse/qwen3/Qwen3-1-7B/ tensor_parallel_size: 1 pipeline_parallel_size: 1 enforce_eager: false @@ -32,46 +33,99 @@ policy: # Trainer configuration trainer: - model_name: ${model} - learning_rate: 1e-5 + model: + name: qwen3 + flavor: 1.7B + hf_assets_path: /mnt/mffuse/qwen3/Qwen3-1-7B/ + optimizer: + name: AdamW + lr: 1e-5 + eps: 1e-8 + lr_scheduler: + warmup_steps: 1 + training: + local_batch_size: ${batch_size} + seq_len: 2048 + max_norm: 1.0 + steps: 1000000 + # dtype: bfloat16 + compile: + enable: false + parallelism: + data_parallel_replicate_degree: 1 + data_parallel_shard_degree: 1 + tensor_parallel_degree: 1 + pipeline_parallel_degree: 1 + context_parallel_degree: 1 + expert_parallel_degree: 1 + disable_loss_parallel: true + checkpoint: + enable: true + initial_load_path: /mnt/mffuse/qwen3/Qwen3-1-7B/ + initial_load_in_hf: true + last_save_in_hf: false + interval: 500 + async_mode: "disabled" + activation_checkpoint: + mode: selective + selective_ac_option: op # Replay buffer configuration replay_buffer: batch_size: ${batch_size} - max_policy_age: 1 # Async by 1 - dp_size: 1 + max_policy_age: ${off_by_n} + dp_size: ${trainer.parallelism.data_parallel_shard_degree} # Must equal trainer DP degree # Reference model configuration ref_model: - model_name: ${model} + model: + name: qwen3 + flavor: 1.7B + hf_assets_path: /mnt/mffuse/qwen3/Qwen3-1-7B + # training: + # dtype: bfloat16 + compile: + enable: false + parallelism: + data_parallel_replicate_degree: 1 + data_parallel_shard_degree: 1 + tensor_parallel_degree: 1 + pipeline_parallel_degree: 1 + context_parallel_degree: 1 + expert_parallel_degree: 1 + checkpoint: + initial_load_path: /mnt/mffuse/qwen3/Qwen3-1-7B/ + initial_load_in_hf: true +# All resource allocations services: dataset: procs: 1 num_replicas: 1 with_gpus: false policy: - procs: 1 - hosts: 1 + procs: ${policy.engine_config.tensor_parallel_size} num_replicas: 1 with_gpus: true + hosts: 1 trainer: procs: 1 - hosts: 1 num_replicas: 1 with_gpus: true + hosts: 1 replay_buffer: procs: 1 num_replicas: 1 with_gpus: false - compute_advantages: - procs: 1 - num_replicas: 1 - with_gpus: false ref_model: procs: 1 num_replicas: 1 with_gpus: true + hosts: 1 + compute_advantages: + procs: 1 + num_replicas: 1 + with_gpus: false reward_actor: procs: 1 num_replicas: 1 diff --git a/src/forge/actors/policy.py b/src/forge/actors/policy.py index cbfff78bf..3b585ff33 100644 --- a/src/forge/actors/policy.py +++ b/src/forge/actors/policy.py @@ -12,6 +12,7 @@ import time from collections.abc import Mapping from copy import copy +from typing import Optional from dataclasses import asdict, dataclass, field, fields import torch @@ -166,10 +167,10 @@ async def launch( # pyright: ignore[reportIncompatibleMethodOverride] if isinstance(engine_config, Mapping): engine_config = EngineConfig.from_dict(engine_config) - vllm_config = engine_config.create_vllm_config() workers = await worker_procs.spawn( - "vllm_worker", PolicyWorker, vllm_config=vllm_config + "vllm_worker", PolicyWorker, ) + await workers.create_config.call(engine_config=engine_config) if isinstance(sampling_config, Mapping): sampling_config = SamplingConfig(**sampling_config) @@ -399,10 +400,15 @@ async def stop(self): @dataclass class PolicyWorker(ForgeActor): - vllm_config: VllmConfig + vllm_config: VllmConfig = None state_dict_key: str = "model_state_dict" + checkpoint_path: str = "" use_dcp: bool = True + @endpoint + def create_config(self, engine_config) -> None: + self.vllm_config = engine_config.create_vllm_config() + @endpoint async def setup(self): # TODO: remove ["gpus"] when monarch implements a flat rank @@ -423,7 +429,7 @@ async def _load_tensor_parallel_state_dict( self.vllm_config.parallel_config.tensor_parallel_size, self.rank ) - checkpoint_id = f"{self.state_dict_key}{DELIM}{version}" + checkpoint_id = f"{self.checkpoint_path}/{self.state_dict_key}{DELIM}{version}" dcp_metadata = None if self.use_dcp: dcp_metadata = await ts.get(checkpoint_id) diff --git a/src/forge/actors/trainer.py b/src/forge/actors/trainer.py index 5dffbac1e..1e220e555 100644 --- a/src/forge/actors/trainer.py +++ b/src/forge/actors/trainer.py @@ -95,6 +95,22 @@ def __post_init__(self): @endpoint async def setup(self): + + import socket + print(f"HOST= {socket.gethostname()=}") + + # Print all files in the specified directory + import glob + snapshot_dir = "/mnt/mffuse/qwen3/Qwen3-1-7B/snapshots/70d244cc86ccca08cf5af4e1e306ecf908b1ad5e" + files = glob.glob(f"{snapshot_dir}/**", recursive=True) + print("Files in snapshot directory:") + for f in files: + print(f"FILE= {f=}") + + if not files: + raise RuntimeError("No files found in snapshot directory") + + # TODO: update ForgeEngine to not use ForgeJobConfig engine_config = {f.name: getattr(self, f.name) for f in fields(self)} for key in {"loss", "state_dict_key", "use_dcp"}: @@ -220,7 +236,7 @@ async def push_weights(self, policy_version: int) -> None: # TODO: Figure out how to gracefully handle which model to-vLLM conversion is needed vllm_ready_hf_sd = _qwen3_hf_to_vllm(sd=hf_state_dict, num_layers=28) - key = f"{self.state_dict_key}{DELIM}{policy_version}" + key = f"{self.checkpoint.folder}/{self.state_dict_key}{DELIM}{policy_version}" start_time = time.time() if self.use_dcp: metadata = dcp.save(checkpoint_id=key, state_dict=vllm_ready_hf_sd) diff --git a/src/forge/controller/provisioner.py b/src/forge/controller/provisioner.py index 27aa1293e..1ac662cf8 100644 --- a/src/forge/controller/provisioner.py +++ b/src/forge/controller/provisioner.py @@ -19,6 +19,9 @@ from monarch.tools.components import hyperactor from monarch.tools.config import Config +from monarch.tools.components.meta import hyperactor +from torchx.specs.fb.component_helpers import Packages + from forge.types import ProcessConfig logger = logging.getLogger(__name__) @@ -83,44 +86,69 @@ def __init__(self): self._host_gpu_map = { self._this_host_id: GpuManager(), } + self.server_info = None + + async def connect_job(self, job_name, config, alloc_factory): + self.server_info = await commands.get_or_create(job_name, config) + self.alloc_factory = alloc_factory + + return self.server_info + async def create_host_mesh(self, name: str, num_hosts: int) -> HostMesh: """Creates a remote server and a HostMesh on it.""" # no need to lock here because this is already locked behind `get_proc_mesh` logger.debug(f"Creating remote server for alloc {name}") - appdef = hyperactor.host_mesh( - image="test", meshes=[f"{name}:{num_hosts}:gpu.small"] - ) - for role in appdef.roles: - # Note - this is hardcoded to SLURM - # We got this with sinfo - role.resource.memMB = 2062607 - role.resource.cpu = 128 - role.resource.gpu = 8 - - # TODO - multi scheduler support - server_config = Config( - scheduler="slurm", - appdef=appdef, - workspace=monarch.tools.config.workspace.Workspace(dirs=[""]), - ) - server_info = await commands.get_or_create( - "forge_job", - server_config, - force_restart=False, + # appdef = hyperactor.host_mesh( + # image="test", meshes=[f"{name}:{num_hosts}:gpu.small"] + # ) + # for role in appdef.roles: + # # Note - this is hardcoded to SLURM + # # We got this with sinfo + # role.resource.memMB = 2062607 + # role.resource.cpu = 128 + # role.resource.gpu = 8 + + # # TODO - multi scheduler support + # server_config = Config( + # scheduler="slurm", + # appdef=appdef, + # workspace=None, + # ) + assert self.server_info is not None + alloc, alloc_constraints = self.alloc_factory( + server_handle=self.server_info, + task_group=name ) - alloc = RemoteAllocator( - world_id=name, - initializer=TorchXRemoteAllocInitializer(server_info.server_handle), - ) - server_name = f"slurm:///{server_info.name}" + # alloc = RemoteAllocator( + # world_id=name, + # initializer=TorchXRemoteAllocInitializer(self.server_info.server_handle), + # ) + + server_name = f"{self.server_info.scheduler}:///{self.server_info.name}" + + # shape = Shape(["hosts"], NDSlice.new_row_major([num_hosts])) + # host_mesh = HostMesh( + # shape=shape, + # allocator=allocator, + # alloc_constraints=AllocConstraints({MastAllocator.ALLOC_LABEL_TASK_GROUP: task_group}) + # ) + + return ( - HostMesh(Shape(["hosts"], NDSlice.new_row_major([num_hosts])), alloc), + HostMesh( + shape=Shape(["hosts"], NDSlice.new_row_major([num_hosts])), + allocator=alloc, + alloc_constraints=alloc_constraints + ), server_name, ) async def get_proc_mesh( - self, num_procs: int, with_gpus: bool = False, num_hosts: int | None = None + self, + num_procs: int, + with_gpus: bool = False, + num_hosts: int | None = None, ): """Gets a proc mesh. @@ -130,10 +158,11 @@ async def get_proc_mesh( async with self._lock: server_name = None if num_hosts is not None and num_hosts > 0: + # if True: #testing created_hosts = len(self._server_names) host_mesh, server_name = await self.create_host_mesh( - name=f"alloc-{created_hosts}", - num_hosts=num_hosts, + name=f"alloc{created_hosts}", + num_hosts=num_hosts ) host_id = uuid.uuid1() gpu_manager = GpuManager()