From 64b13fa795e0aeb8c5f5d9cb33636cf69a1c08b8 Mon Sep 17 00:00:00 2001 From: Lucas Pasqualin Date: Thu, 9 Oct 2025 14:44:56 -0700 Subject: [PATCH 1/6] initial provisioner plumbing for v1 --- apps/grpo/main.py | 22 ++++++++++++++++++---- src/forge/controller/provisioner.py | 20 +++++++++++++++----- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index c64f00bc..c3efa144 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -31,6 +31,7 @@ from forge.observability.metric_actors import get_or_create_metric_logger from forge.observability.metrics import record_metric, Reduce from forge.observability.perf_tracker import Tracer +from forge.controller.provisioner import init_provisioner, IS_MONARCH_HOSTMESH_V1 from forge.types import LauncherConfig, ProvisionerConfig from forge.util.ops import compute_logprobs @@ -314,14 +315,17 @@ async def main(cfg: DictConfig): max_res_tokens = cfg.max_res_tokens # ---- Global setups ---- # + provisioner = None if cfg.get("provisioner", None) is not None: - await init_provisioner( + provisioner = await init_provisioner( ProvisionerConfig(launcher_config=LauncherConfig(**cfg.provisioner)) ) metric_logging_cfg = cfg.get("metric_logging", {"console": {"log_per_rank": False}}) mlogger = await get_or_create_metric_logger() await mlogger.init_backends.call_one(metric_logging_cfg) - await ts.initialize(strategy=ts.ControllerStorageVolumes()) + + if provisioner is None or not IS_MONARCH_HOSTMESH_V1: + await ts.initialize(strategy=ts.ControllerStorageVolumes()) # ---- Setup services ---- # @@ -348,8 +352,18 @@ async def main(cfg: DictConfig): reward_functions=[MathReward(), ThinkingReward()] ), ) - - print("All services initialized successfully!") + print("Services initialized successfully!") + + if provisioner is not None and IS_MONARCH_HOSTMESH_V1: + #TODO: support multiple host meshses + trainer_host_mesh_name = cfg.actors.trainer["mesh_name"] + #TODO: support multiple host meshses + trainer_hosts = provisioner.get_host_mesh(trainer_host_mesh_name) + await ts.initialize( + mesh=trainer_hosts.spawn_procs(per_host={"gpus": 8}), + strategy=ts.LocalRankStrategy() + ) + print("Torchstore initialized successfully") # ---- Core RL loops ---- # async def continuous_rollouts(): diff --git a/src/forge/controller/provisioner.py b/src/forge/controller/provisioner.py index 25884942..c4cb23fd 100644 --- a/src/forge/controller/provisioner.py +++ b/src/forge/controller/provisioner.py @@ -26,6 +26,7 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) +IS_MONARCH_HOSTMESH_V1 = os.environ.get("MONARCH_HOSTMESH_V1", "0") == "1" def _get_port() -> str: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -125,6 +126,7 @@ def __init__(self, cfg: ProvisionerConfig | None = None): self._this_host_id: GpuManager(available_local_devices), } self._proc_host_map = {} + self._host_mesh_map = {} self.launcher: BaseLauncher | None = get_launcher( cfg.launcher_config if cfg is not None else None ) @@ -148,15 +150,23 @@ async def create_host_mesh(self, name: str, num_hosts: int) -> HostMesh: alloc, alloc_constraints, server_name = await self.launcher.get_allocator( name, num_hosts ) + + host_mesh = HostMesh( + Shape(["hosts"], NDSlice.new_row_major([num_hosts])), + allocator=alloc, + alloc_constraints=alloc_constraints, + ) + self._host_mesh_map[name] = host_mesh return ( - HostMesh( - Shape(["hosts"], NDSlice.new_row_major([num_hosts])), - allocator=alloc, - alloc_constraints=alloc_constraints, - ), + host_mesh + , server_name, ) + def get_host_mesh(self, name: str) -> HostMesh: + """Returns a HostMesh by name. Assumes the requested hostmesh already exists.""" + return self._host_mesh_map[name] + async def get_proc_mesh( self, num_procs: int, From 96542210ec9e8c043ad4b2146a15c82baa570bcf Mon Sep 17 00:00:00 2001 From: Lucas Pasqualin Date: Thu, 9 Oct 2025 15:10:02 -0700 Subject: [PATCH 2/6] finish patching --- src/forge/controller/provisioner.py | 35 ++++++++++++++++-------- src/forge/env_constants.py | 5 +++- src/forge/observability/metric_actors.py | 11 ++++++-- 3 files changed, 36 insertions(+), 15 deletions(-) diff --git a/src/forge/controller/provisioner.py b/src/forge/controller/provisioner.py index c4cb23fd..3b8817e7 100644 --- a/src/forge/controller/provisioner.py +++ b/src/forge/controller/provisioner.py @@ -13,21 +13,23 @@ import socket import uuid +from forge.env_constants import FORGE_DISABLE_METRICS, IS_MONARCH_HOSTMESH_V1 + from monarch._src.actor.shape import NDSlice, Shape -from monarch.actor import Actor, endpoint, HostMesh, ProcMesh, this_host from monarch.tools import commands +from monarch.actor import Actor, endpoint, ProcMesh -from forge.controller.launcher import BaseLauncher, get_launcher - -from forge.env_constants import FORGE_DISABLE_METRICS +if IS_MONARCH_HOSTMESH_V1: + from monarch._src.actor.v1.host_mesh import HostMesh, this_host +else: + from monarch.actor import HostMesh, this_host +from forge.controller.launcher import BaseLauncher, get_launcher from forge.types import ProcessConfig, ProvisionerConfig logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -IS_MONARCH_HOSTMESH_V1 = os.environ.get("MONARCH_HOSTMESH_V1", "0") == "1" - def _get_port() -> str: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("localhost", 0)) @@ -150,12 +152,21 @@ async def create_host_mesh(self, name: str, num_hosts: int) -> HostMesh: alloc, alloc_constraints, server_name = await self.launcher.get_allocator( name, num_hosts ) - - host_mesh = HostMesh( - Shape(["hosts"], NDSlice.new_row_major([num_hosts])), - allocator=alloc, - alloc_constraints=alloc_constraints, - ) + if IS_MONARCH_HOSTMESH_V1: + # "procs" here is actually a dumby value, which Monarch requires but will ignore + # TODO: remove dummy dimension once Monarch supports HostMesh without it + host_mesh = HostMesh.allocate_nonblocking( + name=name, + extent=Extent(["hosts", "procs"], [num_hosts, 1]), + allocator=alloc, + alloc_constraints=alloc_constraints, + ) + else: + host_mesh = HostMesh( + Shape(["hosts"], NDSlice.new_row_major([num_hosts])), + allocator=alloc, + alloc_constraints=alloc_constraints, + ) self._host_mesh_map[name] = host_mesh return ( host_mesh diff --git a/src/forge/env_constants.py b/src/forge/env_constants.py index 6e0fc30e..891cf621 100644 --- a/src/forge/env_constants.py +++ b/src/forge/env_constants.py @@ -3,7 +3,7 @@ # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. - +import os """Centralized constants for environment variable names used in the project.""" # Performance metrics in forge.observability.perf_tracker.py becomes no-op @@ -16,3 +16,6 @@ # Makes forge.observability.metrics.record_metric a no-op # and disables spawning LocalFetcherActor in get_or_create_metric_logger FORGE_DISABLE_METRICS = "FORGE_DISABLE_METRICS" + +# Experimental monarch features +IS_MONARCH_HOSTMESH_V1 = os.environ.get("MONARCH_HOSTMESH_V1", "0") == "1" diff --git a/src/forge/observability/metric_actors.py b/src/forge/observability/metric_actors.py index e50cc3fd..209e528a 100644 --- a/src/forge/observability/metric_actors.py +++ b/src/forge/observability/metric_actors.py @@ -9,9 +9,16 @@ import os from typing import Any, Union -from monarch.actor import Actor, endpoint, get_or_spawn_controller, ProcMesh, this_proc +from forge.env_constants import IS_MONARCH_HOSTMESH_V1 + +from monarch.actor import Actor, endpoint, ProcMesh +if IS_MONARCH_HOSTMESH_V1: + from monarch._src.actor.v1.proc_mesh import get_or_spawn_controller + from monarch._src.actor.v1.host_mesh import this_proc +else: + from monarch.actor import get_or_spawn_controller, this_proc + -from forge.env_constants import FORGE_DISABLE_METRICS from forge.observability.metrics import ( BackendRole, get_logger_backend_class, From d2cff2008e78df43748bd499359aa7626c63ced7 Mon Sep 17 00:00:00 2001 From: Lucas Pasqualin Date: Thu, 9 Oct 2025 15:10:37 -0700 Subject: [PATCH 3/6] default hostmeshv1=1 --- src/forge/env_constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/forge/env_constants.py b/src/forge/env_constants.py index 891cf621..a9814b09 100644 --- a/src/forge/env_constants.py +++ b/src/forge/env_constants.py @@ -18,4 +18,4 @@ FORGE_DISABLE_METRICS = "FORGE_DISABLE_METRICS" # Experimental monarch features -IS_MONARCH_HOSTMESH_V1 = os.environ.get("MONARCH_HOSTMESH_V1", "0") == "1" +IS_MONARCH_HOSTMESH_V1 = os.environ.get("MONARCH_HOSTMESH_V1", "1") == "1" From 8ff6a12346ec775bb52d654c71fdd227686f7a99 Mon Sep 17 00:00:00 2001 From: Lucas Pasqualin Date: Thu, 9 Oct 2025 16:37:01 -0700 Subject: [PATCH 4/6] updating --- apps/grpo/main.py | 14 ++++++++++++-- apps/mast/main.py | 8 ++++++++ apps/mast/qwen3_1_7b_mast.yaml | 2 +- src/forge/actors/trainer.py | 1 + src/forge/controller/launcher.py | 1 - src/forge/controller/provisioner.py | 22 ++++++++++++++-------- src/forge/observability/metric_actors.py | 2 +- 7 files changed, 37 insertions(+), 13 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index c3efa144..82b4f875 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -6,6 +6,16 @@ # Usage: python -m apps.grpo.main --config apps/grpo/qwen3_1_7b.yaml +from forge.env_constants import IS_MONARCH_HOSTMESH_V1 +if IS_MONARCH_HOSTMESH_V1: + from monarch._rust_bindings.monarch_hyperactor.config import configure + from monarch._rust_bindings.monarch_hyperactor.channel import ChannelTransport + configure( + default_transport=ChannelTransport.MetaTlsWithHostname, + ) + +from forge.env_constants import IS_MONARCH_HOSTMESH_V1 + import asyncio import time import uuid @@ -31,8 +41,8 @@ from forge.observability.metric_actors import get_or_create_metric_logger from forge.observability.metrics import record_metric, Reduce from forge.observability.perf_tracker import Tracer -from forge.controller.provisioner import init_provisioner, IS_MONARCH_HOSTMESH_V1 - +from forge.controller.provisioner import init_provisioner +from forge.env_constants import IS_MONARCH_HOSTMESH_V1 from forge.types import LauncherConfig, ProvisionerConfig from forge.util.ops import compute_logprobs from monarch.actor import endpoint diff --git a/apps/mast/main.py b/apps/mast/main.py index cd5de0be..8564381c 100644 --- a/apps/mast/main.py +++ b/apps/mast/main.py @@ -4,6 +4,14 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. +from forge.env_constants import IS_MONARCH_HOSTMESH_V1 +if IS_MONARCH_HOSTMESH_V1: + from monarch._rust_bindings.monarch_hyperactor.config import configure + from monarch._rust_bindings.monarch_hyperactor.channel import ChannelTransport + configure( + default_transport=ChannelTransport.MetaTlsWithHostname, + ) + import asyncio import getpass import uuid diff --git a/apps/mast/qwen3_1_7b_mast.yaml b/apps/mast/qwen3_1_7b_mast.yaml index 58d87957..ee7d0541 100644 --- a/apps/mast/qwen3_1_7b_mast.yaml +++ b/apps/mast/qwen3_1_7b_mast.yaml @@ -9,7 +9,7 @@ max_res_tokens: 512 model: "Qwen/Qwen3-1.7B" off_by_n: 1 # Off by one by default launcher: mast -job_name: forge-qwen3-1_7b +job_name: forge-qwen3-1_7b-0 checkpoint_folder: /mnt/wsfuse/teamforge/forge_runs/ # Main loop configuration diff --git a/src/forge/actors/trainer.py b/src/forge/actors/trainer.py index 7a399e4f..76b119fe 100644 --- a/src/forge/actors/trainer.py +++ b/src/forge/actors/trainer.py @@ -174,6 +174,7 @@ async def setup(self): "use_dcp", "use_vllm_builtin_load", "dcp_path", + "job", "vllm_tp_DEPRECATED", }: engine_config.pop(key) # Not part of job config diff --git a/src/forge/controller/launcher.py b/src/forge/controller/launcher.py index f2fe5f0f..fb84403b 100644 --- a/src/forge/controller/launcher.py +++ b/src/forge/controller/launcher.py @@ -190,7 +190,6 @@ async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str] async def remote_setup(self, procs: ProcMesh) -> tuple[str, int]: setup = procs.spawn(f"setup-{uuid.uuid1()}", MastSetupActor) await setup.mount.call(mount_dst="/mnt/wsfuse") - return await setup.get_info.choose() async def launch_mast_job(self): handle = self.create_server_handle() diff --git a/src/forge/controller/provisioner.py b/src/forge/controller/provisioner.py index 3b8817e7..bb7f5e1a 100644 --- a/src/forge/controller/provisioner.py +++ b/src/forge/controller/provisioner.py @@ -5,6 +5,7 @@ # LICENSE file in the root directory of this source tree. """Resource allocation and provisioning for both local and remote.""" + import asyncio import functools import logging @@ -13,12 +14,11 @@ import socket import uuid -from forge.env_constants import FORGE_DISABLE_METRICS, IS_MONARCH_HOSTMESH_V1 - -from monarch._src.actor.shape import NDSlice, Shape +from monarch._src.actor.shape import NDSlice, Shape, Extent from monarch.tools import commands from monarch.actor import Actor, endpoint, ProcMesh +from forge.env_constants import IS_MONARCH_HOSTMESH_V1, FORGE_DISABLE_METRICS if IS_MONARCH_HOSTMESH_V1: from monarch._src.actor.v1.host_mesh import HostMesh, this_host else: @@ -157,7 +157,7 @@ async def create_host_mesh(self, name: str, num_hosts: int) -> HostMesh: # TODO: remove dummy dimension once Monarch supports HostMesh without it host_mesh = HostMesh.allocate_nonblocking( name=name, - extent=Extent(["hosts", "procs"], [num_hosts, 1]), + extent=Extent(["hosts", "no_dim"], [num_hosts, 1]), allocator=alloc, alloc_constraints=alloc_constraints, ) @@ -261,10 +261,16 @@ def bootstrap(env: dict[str, str]): # Shows detailed logs for Monarch rust failures env_vars["RUST_BACKTRACE"] = "1" - procs = host_mesh.spawn_procs( - per_host={"gpus": num_procs}, - bootstrap=functools.partial(bootstrap, env=env_vars), - ) + if IS_MONARCH_HOSTMESH_V1: + procs = host_mesh.spawn_procs( + per_host={"gpus": num_procs}, + setup=functools.partial(bootstrap, env=env_vars), + ) + else: + procs = host_mesh.spawn_procs( + per_host={"gpus": num_procs}, + bootstrap=functools.partial(bootstrap, env=env_vars), + ) if is_remote: await self.launcher.remote_setup(procs) diff --git a/src/forge/observability/metric_actors.py b/src/forge/observability/metric_actors.py index 209e528a..a66cff42 100644 --- a/src/forge/observability/metric_actors.py +++ b/src/forge/observability/metric_actors.py @@ -9,7 +9,7 @@ import os from typing import Any, Union -from forge.env_constants import IS_MONARCH_HOSTMESH_V1 +from forge.env_constants import IS_MONARCH_HOSTMESH_V1, FORGE_DISABLE_METRICS from monarch.actor import Actor, endpoint, ProcMesh if IS_MONARCH_HOSTMESH_V1: From 98a3fe828ee580faced37c0774fa858d5b7559cb Mon Sep 17 00:00:00 2001 From: Yuxuan Hu Date: Thu, 9 Oct 2025 17:15:51 -0700 Subject: [PATCH 5/6] turn off dcp by default --- src/forge/actors/trainer.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/forge/actors/trainer.py b/src/forge/actors/trainer.py index 76b119fe..426b81f7 100644 --- a/src/forge/actors/trainer.py +++ b/src/forge/actors/trainer.py @@ -18,6 +18,17 @@ import torch.distributed.checkpoint as dcp import torchstore as ts +from forge.actors._torchstore_utils import ( + DcpHandle, + get_dcp_whole_state_dict_key, + get_param_key, +) + +from forge.controller import ForgeActor +from forge.data.utils import batch_to_device +from forge.observability.metrics import record_metric, Reduce +from forge.observability.perf_tracker import Tracer + from monarch.actor import current_rank, current_size, endpoint from torch import Tensor from torch.distributed.checkpoint._nested_dict import flatten_state_dict @@ -39,17 +50,6 @@ from torchtitan.experiments.forge.engine import ForgeEngine from torchtitan.experiments.forge.job_config import ForgeJobConfig -from forge.actors._torchstore_utils import ( - DcpHandle, - get_dcp_whole_state_dict_key, - get_param_key, -) - -from forge.controller import ForgeActor -from forge.data.utils import batch_to_device -from forge.observability.metrics import record_metric, Reduce -from forge.observability.perf_tracker import Tracer - logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -112,7 +112,7 @@ class RLTrainer(ForgeActor): # Non JobConfig-related fields loss: Callable = lambda logits, **targets: logits state_dict_key: str = "model_state_dict" - use_dcp: bool = True + use_dcp: bool = False dcp_path: str = "forge_dcp_tmp" vllm_tp_DEPRECATED: int = 1 # noqa: N815 use_vllm_builtin_load: bool = True From 9b5e8b73aad8354cccf243d48a561481d7970ce1 Mon Sep 17 00:00:00 2001 From: Lucas Pasqualin Date: Fri, 10 Oct 2025 13:03:19 -0700 Subject: [PATCH 6/6] fix provisioner logic --- apps/grpo/main.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index 82b4f875..4bb7d17d 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -325,15 +325,19 @@ async def main(cfg: DictConfig): max_res_tokens = cfg.max_res_tokens # ---- Global setups ---- # - provisioner = None + provisioner_config = None if cfg.get("provisioner", None) is not None: - provisioner = await init_provisioner( - ProvisionerConfig(launcher_config=LauncherConfig(**cfg.provisioner)) + provisioner_config = ProvisionerConfig( + launcher_config=LauncherConfig(**cfg.provisioner) ) + + provisioner = provisioner = await init_provisioner(provisioner_config) + metric_logging_cfg = cfg.get("metric_logging", {"console": {"log_per_rank": False}}) mlogger = await get_or_create_metric_logger() await mlogger.init_backends.call_one(metric_logging_cfg) + assert provisioner is not None and IS_MONARCH_HOSTMESH_V1 if provisioner is None or not IS_MONARCH_HOSTMESH_V1: await ts.initialize(strategy=ts.ControllerStorageVolumes())