diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index c5692bfdf..8a6503734 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -40,7 +40,7 @@ jobs: run: python -m pip install torch==2.9.0 --index-url https://download.pytorch.org/whl/test/cu130 - name: Install monarch shell: bash -l {0} - run: python -m pip install monarch-no-torch==0.1.0.dev20250826 --find-links assets/ci + run: pip install assets/ci/monarch_no_torch-0.1.0.dev20251010-py3-none-any.whl - name: Install torchforge shell: bash -l {0} env: diff --git a/.github/workflows/unit_test.yaml b/.github/workflows/unit_test.yaml index 9a839f32d..28a30fdc9 100644 --- a/.github/workflows/unit_test.yaml +++ b/.github/workflows/unit_test.yaml @@ -26,7 +26,7 @@ jobs: - name: Install pytorch run: python -m pip install torch==2.9.0.dev20250826 --extra-index-url https://download.pytorch.org/whl/nightly/cpu - name: Install monarch - run: python -m pip install monarch-no-torch==0.1.0.dev20250826 --find-links assets/ci + run: pip install assets/ci/monarch_no_torch-0.1.0.dev20251010-py3-none-any.whl - name: Install torchstore run: pip install assets/wheels/torchstore-0.1.0-py3-none-any.whl - name: Install torchtitan diff --git a/apps/grpo/main.py b/apps/grpo/main.py index c64f00bc2..18b82c3e9 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -28,6 +28,7 @@ from forge.controller.actor import ForgeActor from forge.controller.provisioner import init_provisioner, shutdown from forge.data.rewards import MathReward, ThinkingReward +from forge.env import MONARCH_HOSTMESH_V1 from forge.observability.metric_actors import get_or_create_metric_logger from forge.observability.metrics import record_metric, Reduce from forge.observability.perf_tracker import Tracer @@ -314,14 +315,23 @@ async def main(cfg: DictConfig): max_res_tokens = cfg.max_res_tokens # ---- Global setups ---- # + provisioner = None if cfg.get("provisioner", None) is not None: - await init_provisioner( + provisioner = await init_provisioner( ProvisionerConfig(launcher_config=LauncherConfig(**cfg.provisioner)) ) + else: + provisioner = await init_provisioner() + metric_logging_cfg = cfg.get("metric_logging", {"console": {"log_per_rank": False}}) mlogger = await get_or_create_metric_logger() await mlogger.init_backends.call_one(metric_logging_cfg) - await ts.initialize(strategy=ts.ControllerStorageVolumes()) + + # In the host mesh v0 case, actors on remote hosts are not able to communicate + # with one another. Therefore we use the controller as our storage volume. + if not MONARCH_HOSTMESH_V1.get_value(): + await ts.initialize(strategy=ts.ControllerStorageVolumes()) + print("Torchstore successfully initialized with controller storage strategy") # ---- Setup services ---- # @@ -351,6 +361,22 @@ async def main(cfg: DictConfig): print("All services initialized successfully!") + # In the HostMesh v1 case, we spawn a torchstore storage volume + # per trainer process. + # We initialize after service initialization because torchstore currently + # requires access to the underlying proc meshes in the local rank strategy. + # We should be able to hide this in the future. + if MONARCH_HOSTMESH_V1.get_value(): + # TODO: support multiple host meshes + trainer_num_procs = cfg.actors.trainer["procs"] + trainer_host_mesh_name = cfg.actors.trainer["mesh_name"] + trainer_hosts = provisioner.get_host_mesh(trainer_host_mesh_name) + await ts.initialize( + mesh=trainer_hosts.spawn_procs(per_host={"procs": trainer_num_procs}), + strategy=ts.LocalRankStrategy(), + ) + print("Torchstore successfully initialized with local rank strategy") + # ---- Core RL loops ---- # async def continuous_rollouts(): rollout_count = 0 diff --git a/apps/grpo/qwen3_1_7b.yaml b/apps/grpo/qwen3_1_7b.yaml index 800d2e973..d39b254fe 100644 --- a/apps/grpo/qwen3_1_7b.yaml +++ b/apps/grpo/qwen3_1_7b.yaml @@ -117,26 +117,33 @@ services: policy: procs: ${policy.engine_config.tensor_parallel_size} num_replicas: 1 + mesh_name: policy with_gpus: true ref_model: procs: 1 num_replicas: 1 + mesh_name: ref_model with_gpus: true reward_actor: procs: 1 num_replicas: 1 + mesh_name: reward_actor with_gpus: false actors: dataset: procs: 1 with_gpus: false + mesh_name: dataset trainer: procs: 1 with_gpus: true + mesh_name: trainer replay_buffer: procs: 1 with_gpus: false + mesh_name: replay_buffer compute_advantages: procs: 1 with_gpus: false + mesh_name: compute_advantages diff --git a/apps/grpo/qwen3_32b.yaml b/apps/grpo/qwen3_32b.yaml index 8100a988b..3be8dbdac 100644 --- a/apps/grpo/qwen3_32b.yaml +++ b/apps/grpo/qwen3_32b.yaml @@ -122,26 +122,33 @@ services: num_replicas: 1 hosts: 1 with_gpus: true + mesh_name: policy ref_model: procs: ${ref_model.parallelism.tensor_parallel_degree} num_replicas: 1 with_gpus: true + mesh_name: ref_model reward_actor: procs: 1 num_replicas: 1 with_gpus: false + mesh_name: reward_actor actors: dataset: procs: 1 with_gpus: false + mesh_name: dataset trainer: procs: 8 hosts: 1 with_gpus: true + mesh_name: trainer replay_buffer: procs: 1 with_gpus: false + mesh_name: replay_buffer compute_advantages: procs: 1 with_gpus: false + mesh_name: compute_advantages diff --git a/apps/grpo/qwen3_8b.yaml b/apps/grpo/qwen3_8b.yaml index de855d1cb..ad15fcb61 100644 --- a/apps/grpo/qwen3_8b.yaml +++ b/apps/grpo/qwen3_8b.yaml @@ -117,25 +117,32 @@ services: procs: ${policy.engine_config.tensor_parallel_size} num_replicas: 1 with_gpus: true + mesh_name: policy ref_model: procs: 1 num_replicas: 1 with_gpus: true + mesh_name: ref_model reward_actor: procs: 1 num_replicas: 1 with_gpus: false + mesh_name: reward_actor actors: dataset: procs: 1 with_gpus: false + mesh_name: dataset trainer: procs: 2 with_gpus: true + mesh_name: trainer replay_buffer: procs: 1 with_gpus: false + mesh_name: replay_buffer compute_advantages: procs: 1 with_gpus: false + mesh_name: compute_advantages diff --git a/assets/ci/monarch_no_torch-0.1.0.dev20250826-py3-none-any.whl b/assets/ci/monarch_no_torch-0.1.0.dev20251010-py3-none-any.whl similarity index 59% rename from assets/ci/monarch_no_torch-0.1.0.dev20250826-py3-none-any.whl rename to assets/ci/monarch_no_torch-0.1.0.dev20251010-py3-none-any.whl index 4d3eaeb36..34af61940 100644 Binary files a/assets/ci/monarch_no_torch-0.1.0.dev20250826-py3-none-any.whl and b/assets/ci/monarch_no_torch-0.1.0.dev20251010-py3-none-any.whl differ diff --git a/assets/wheels/monarch-0.0.1-cp310-cp310-linux_x86_64.whl b/assets/wheels/monarch-0.0.1-cp310-cp310-linux_x86_64.whl index b5a86f5f6..7182c00e1 100644 Binary files a/assets/wheels/monarch-0.0.1-cp310-cp310-linux_x86_64.whl and b/assets/wheels/monarch-0.0.1-cp310-cp310-linux_x86_64.whl differ diff --git a/scripts/build_wheels.sh b/scripts/build_wheels.sh index fd206d895..cd0fad2c1 100755 --- a/scripts/build_wheels.sh +++ b/scripts/build_wheels.sh @@ -17,7 +17,7 @@ NC='\033[0m' # Configuration PYTORCH_VERSION="2.9.0.dev20250905" VLLM_BRANCH="v0.10.0" -MONARCH_COMMIT="6ca383aca99480aa1bf5853478d4d09fcb224035" +MONARCH_COMMIT="d1c5ea4732704454efad82db678d4e66a4131bb2" TORCHTITAN_COMMIT="0cfbd0b3c2d827af629a107a77a9e47229c31663" TORCHSTORE_COMMIT="eed96eb55ce87d4a9880597dd7dfd0d291e9ac81" BUILD_DIR="$HOME/forge-build" diff --git a/src/forge/actors/policy.py b/src/forge/actors/policy.py index 9793021d7..dbe0f02a2 100644 --- a/src/forge/actors/policy.py +++ b/src/forge/actors/policy.py @@ -53,6 +53,7 @@ from forge.data.sharding import VLLMSharding from forge.data_models.completion import Completion from forge.data_models.prompt import to_prompt +from forge.env import TORCHSTORE_USE_RDMA from forge.interfaces import Policy as PolicyInterface from forge.observability.metrics import record_metric, Reduce from forge.observability.perf_tracker import Tracer @@ -140,7 +141,9 @@ class Policy(PolicyInterface): engine_config: EngineConfig | Mapping = field(default_factory=EngineConfig) sampling_config: SamplingConfig | Mapping = field(default_factory=SamplingConfig) available_devices: str | None = None - use_dcp: bool = True + use_dcp: bool = ( + TORCHSTORE_USE_RDMA.get_value() == 0 + ) # torchstore currently only accepts 0 or 1 # Gets set up by setup sampling_params: SamplingParams | None = None lora_request: LoRARequest | None = None diff --git a/src/forge/actors/trainer.py b/src/forge/actors/trainer.py index f4199db71..09e0baa72 100644 --- a/src/forge/actors/trainer.py +++ b/src/forge/actors/trainer.py @@ -46,6 +46,7 @@ from forge.controller import ForgeActor from forge.data.utils import batch_to_device +from forge.env import TORCHSTORE_USE_RDMA from forge.observability.metrics import record_metric, Reduce from forge.observability.perf_tracker import Tracer @@ -111,7 +112,9 @@ class RLTrainer(ForgeActor): # Non JobConfig-related fields loss: Callable = lambda logits, **targets: logits state_dict_key: str = "model_state_dict" - use_dcp: bool = True + use_dcp: bool = ( + TORCHSTORE_USE_RDMA.get_value() == 0 + ) # torchstore currently only accepts 0 or 1 dcp_path: str = "forge_dcp_tmp" def __post_init__(self): diff --git a/src/forge/controller/launcher.py b/src/forge/controller/launcher.py index f2fe5f0f2..96f9b42af 100644 --- a/src/forge/controller/launcher.py +++ b/src/forge/controller/launcher.py @@ -15,8 +15,10 @@ import monarch import torchx.specs as specs - from monarch._rust_bindings.monarch_hyperactor.alloc import AllocConstraints +from monarch._rust_bindings.monarch_hyperactor.channel import ChannelTransport + +from monarch._rust_bindings.monarch_hyperactor.config import configure from monarch._src.actor.allocator import RemoteAllocator, TorchXRemoteAllocInitializer from monarch.actor import Actor, endpoint, ProcMesh from monarch.tools import commands @@ -24,6 +26,8 @@ from monarch.tools.components import hyperactor from monarch.tools.config import Config, Workspace +from forge.env import MONARCH_HOSTMESH_V1 + from forge.types import Launcher, LauncherConfig _MAST_AVAILABLE = False @@ -110,13 +114,17 @@ async def initialize(self) -> None: async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]: pass - async def remote_setup(self, procs: ProcMesh) -> tuple[str, int]: + async def remote_setup(self, procs: ProcMesh) -> None: pass class Slurmlauncher(BaseLauncher): async def initialize(self) -> None: - pass + if MONARCH_HOSTMESH_V1.get_value(): + # HostMeshV1 currently requires explicit configuration + # of the underlying transport from client to mesh. + # This can be removed in the future once this has been removed. + configure(default_transport=ChannelTransport.Tcp) async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]: appdef = hyperactor.host_mesh( @@ -148,7 +156,7 @@ async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str] server_name = f"slurm:///{server_info.name}" return alloc, None, server_name # (Allocator, AllocConstraints, SeverName) - async def remote_setup(self, procs: ProcMesh) -> tuple[str, int]: + async def remote_setup(self, procs: ProcMesh) -> None: return @@ -172,6 +180,12 @@ def __init__(self, cfg: LauncherConfig | None = None): self.job_name = self.cfg.job_name or self.create_job_name() async def initialize(self) -> None: + if MONARCH_HOSTMESH_V1.get_value(): + # HostMeshV1 currently requires explicit configuration + # of the underlying transport from client to mesh. + # This can be removed in the future once this has been removed. + configure(default_transport=ChannelTransport.MetaTlsWithHostname) + await self.launch_mast_job() async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]: @@ -187,10 +201,9 @@ async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str] return allocator, alloc_constraints, self.create_server_handle() - async def remote_setup(self, procs: ProcMesh) -> tuple[str, int]: + async def remote_setup(self, procs: ProcMesh) -> None: setup = procs.spawn(f"setup-{uuid.uuid1()}", MastSetupActor) await setup.mount.call(mount_dst="/mnt/wsfuse") - return await setup.get_info.choose() async def launch_mast_job(self): handle = self.create_server_handle() diff --git a/src/forge/controller/provisioner.py b/src/forge/controller/provisioner.py index bad8ce4fe..dfecdb9bd 100644 --- a/src/forge/controller/provisioner.py +++ b/src/forge/controller/provisioner.py @@ -13,13 +13,14 @@ import socket import uuid -from monarch._src.actor.shape import NDSlice, Shape -from monarch.actor import Actor, endpoint, HostMesh, ProcMesh, this_host +from monarch._src.actor.shape import Extent, NDSlice, Shape +from monarch.actor import Actor, endpoint, ProcMesh + from monarch.tools import commands from forge.controller.launcher import BaseLauncher, get_launcher -from forge.env import all_env_vars, FORGE_DISABLE_METRICS +from forge.env import all_env_vars, FORGE_DISABLE_METRICS, MONARCH_HOSTMESH_V1 from forge.types import ProcessConfig, ProvisionerConfig @@ -27,6 +28,14 @@ logger.setLevel(logging.DEBUG) +if MONARCH_HOSTMESH_V1.get_value(): + from monarch._src.actor.v1.host_mesh import HostMesh, this_host + + logger.info("Using Monarch HostMesh v1...") +else: + from monarch.actor import HostMesh, this_host + + def _get_port() -> str: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("localhost", 0)) @@ -125,6 +134,7 @@ def __init__(self, cfg: ProvisionerConfig | None = None): self._this_host_id: GpuManager(available_local_devices), } self._proc_host_map = {} + self._host_mesh_map = {} self.launcher: BaseLauncher | None = get_launcher( cfg.launcher_config if cfg is not None else None ) @@ -148,14 +158,38 @@ async def create_host_mesh(self, name: str, num_hosts: int) -> HostMesh: alloc, alloc_constraints, server_name = await self.launcher.get_allocator( name, num_hosts ) - return ( - HostMesh( + + if MONARCH_HOSTMESH_V1.get_value(): + # We are asking Monarch to allocate a single process on + # every host, reflected in the Extent we provide below. + + # Technically, this is ["hosts", "procs"] but to reduce + # confusion on its relationship with procs elsewhere, + # we call it "no_dim". + + # TODO - remove this once Monarch supports HostMesh without it. + host_mesh = HostMesh.allocate_nonblocking( + name=name, + extent=Extent(["hosts", "no_dim"], [num_hosts, 1]), + allocator=alloc, + alloc_constraints=alloc_constraints, + ) + else: + host_mesh = HostMesh( Shape(["hosts"], NDSlice.new_row_major([num_hosts])), allocator=alloc, alloc_constraints=alloc_constraints, - ), - server_name, - ) + ) + return host_mesh, server_name + + def get_host_mesh(self, name: str) -> HostMesh: + """Returns the host mesh given its associated name. + + This is currently an experimental API for HostMesh v1 and + should not be relied on longer term. + + """ + return self._host_mesh_map[name] async def get_proc_mesh( self, @@ -255,10 +289,16 @@ def bootstrap(env: dict[str, str]): for env_var in all_env_vars(): env_vars[env_var.name] = str(env_var.get_value()) - procs = host_mesh.spawn_procs( - per_host={"gpus": num_procs}, - bootstrap=functools.partial(bootstrap, env=env_vars), - ) + if MONARCH_HOSTMESH_V1.get_value(): + procs = host_mesh.spawn_procs( + per_host={"procs": num_procs}, + setup=functools.partial(bootstrap, env=env_vars), + ) + else: + procs = host_mesh.spawn_procs( + per_host={"procs": num_procs}, + bootstrap=functools.partial(bootstrap, env=env_vars), + ) if is_remote: await self.launcher.remote_setup(procs) @@ -267,6 +307,8 @@ def bootstrap(env: dict[str, str]): if with_gpus: # Applies any launcher specific remote setup. procs._gpu_ids = gpu_ids + + self._host_mesh_map[mesh_name] = host_mesh procs._host = host_mesh # If we created a server, track so we can tear it down later. diff --git a/src/forge/env.py b/src/forge/env.py index 1699ecc90..f5f326a93 100644 --- a/src/forge/env.py +++ b/src/forge/env.py @@ -81,12 +81,6 @@ def get_value(self) -> Any: description="Sets Monarch's stderr log level, i.e. set to 'info' or 'debug'", ) -TORCHSTORE_USE_RDMA = EnvVar( - name="TORCHSTORE_RDMA_ENABLED", - default=False, - description="Whether or not to use RDMA in TorchStore.", -) - RUST_BACKTRACE = EnvVar( name="RUST_BACKTRACE", default="full", @@ -105,6 +99,18 @@ def get_value(self) -> Any: description="Sets the maximum frame length for Monarch's actor message delivery in bytes.", ) +MONARCH_HOSTMESH_V1 = EnvVar( + name="MONARCH_HOSTMESH_V1", + default=False, + description="Whether or not to use Monarch's experimental hostmesh v1 APIs", +) + +TORCHSTORE_USE_RDMA = EnvVar( + name="TORCHSTORE_RDMA_ENABLED", + default=0, + description="Whether or not to use RDMA in TorchStore.", +) + def all_env_vars() -> list[EnvVar]: """Retrieves all registered environment variable names.""" diff --git a/src/forge/observability/metric_actors.py b/src/forge/observability/metric_actors.py index 0815e671a..fae11556f 100644 --- a/src/forge/observability/metric_actors.py +++ b/src/forge/observability/metric_actors.py @@ -8,9 +8,9 @@ import logging from typing import Any, Union -from monarch.actor import Actor, endpoint, get_or_spawn_controller, ProcMesh, this_proc +from monarch.actor import Actor, endpoint, ProcMesh -from forge.env import FORGE_DISABLE_METRICS +from forge.env import FORGE_DISABLE_METRICS, MONARCH_HOSTMESH_V1 from forge.observability.metrics import ( BackendRole, get_logger_backend_class, @@ -19,6 +19,13 @@ reduce_metrics_states, ) +if MONARCH_HOSTMESH_V1.get_value(): + from monarch._src.actor.v1.host_mesh import this_proc + from monarch._src.actor.v1.proc_mesh import get_or_spawn_controller +else: + from monarch.actor import get_or_spawn_controller, this_proc + + logger = logging.getLogger(__name__) _global_logger = None