Skip to content

Commit b559831

Browse files
committed
initial commit for hostmesh v1
1 parent 493ff0a commit b559831

File tree

3 files changed

+55
-15
lines changed

3 files changed

+55
-15
lines changed

src/forge/controller/launcher.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,19 @@
1515
import monarch
1616

1717
import torchx.specs as specs
18-
1918
from monarch._rust_bindings.monarch_hyperactor.alloc import AllocConstraints
19+
from monarch._rust_bindings.monarch_hyperactor.channel import ChannelTransport
20+
21+
from monarch._rust_bindings.monarch_hyperactor.config import configure
2022
from monarch._src.actor.allocator import RemoteAllocator, TorchXRemoteAllocInitializer
2123
from monarch.actor import Actor, endpoint, ProcMesh
2224
from monarch.tools import commands
2325
from monarch.tools.commands import info
2426
from monarch.tools.components import hyperactor
2527
from monarch.tools.config import Config, Workspace
2628

29+
from forge.env import MONARCH_HOSTMESH_V1
30+
2731
from forge.types import Launcher, LauncherConfig
2832

2933
_MAST_AVAILABLE = False
@@ -116,7 +120,8 @@ async def remote_setup(self, procs: ProcMesh) -> tuple[str, int]:
116120

117121
class Slurmlauncher(BaseLauncher):
118122
async def initialize(self) -> None:
119-
pass
123+
if MONARCH_HOSTMESH_V1.get_value():
124+
configure(default_transport=ChannelTransport.Tcp)
120125

121126
async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]:
122127
appdef = hyperactor.host_mesh(
@@ -172,6 +177,9 @@ def __init__(self, cfg: LauncherConfig | None = None):
172177
self.job_name = self.cfg.job_name or self.create_job_name()
173178

174179
async def initialize(self) -> None:
180+
if MONARCH_HOSTMESH_V1.get_value():
181+
configure(default_transport=ChannelTransport.MetaTlsWithHostname)
182+
175183
await self.launch_mast_job()
176184

177185
async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]:

src/forge/controller/provisioner.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
# This source code is licensed under the BSD-style license found in the
55
# LICENSE file in the root directory of this source tree.
66

7+
from forge.env import MONARCH_HOSTMESH_V1
8+
79
"""Remote and local resource manager for allocation and provisioning."""
810
import asyncio
911
import functools
@@ -14,7 +16,8 @@
1416
import uuid
1517

1618
from monarch._src.actor.shape import NDSlice, Shape
17-
from monarch.actor import Actor, endpoint, HostMesh, ProcMesh, this_host
19+
from monarch.actor import Actor, endpoint, ProcMesh
20+
1821
from monarch.tools import commands
1922

2023
from forge.controller.launcher import BaseLauncher, get_launcher
@@ -27,6 +30,14 @@
2730
logger.setLevel(logging.DEBUG)
2831

2932

33+
if MONARCH_HOSTMESH_V1.get_value():
34+
from monarch._src.actor.v1.host_mesh import HostMesh, this_host
35+
36+
logger.info("Using Monarch HostMesh v1...")
37+
else:
38+
from monarch.actor import HostMesh, this_host
39+
40+
3041
def _get_port() -> str:
3142
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
3243
s.bind(("localhost", 0))
@@ -148,14 +159,29 @@ async def create_host_mesh(self, name: str, num_hosts: int) -> HostMesh:
148159
alloc, alloc_constraints, server_name = await self.launcher.get_allocator(
149160
name, num_hosts
150161
)
151-
return (
152-
HostMesh(
162+
163+
if MONARCH_HOSTMESH_V1.get_value():
164+
# We are asking Monarch to allocate a single process on
165+
# every host, reflected in the Extent we provide below.
166+
167+
# Technically, this is ["hosts", "procs"] but to reduce
168+
# confusion on its relationship with procs elsewhere,
169+
# we call it "no_dim".
170+
171+
# TODO - remove this once Monarch supports HostMesh without it.
172+
host_mesh = HostMesh.allocate_nonblocking(
173+
name=name,
174+
extent=Extent(["hosts", "no_dim"], [num_hosts, 1]),
175+
allocator=alloc,
176+
alloc_constraints=alloc_constraints,
177+
)
178+
else:
179+
host_mesh = HostMesh(
153180
Shape(["hosts"], NDSlice.new_row_major([num_hosts])),
154181
allocator=alloc,
155182
alloc_constraints=alloc_constraints,
156-
),
157-
server_name,
158-
)
183+
)
184+
return host_mesh, server_name
159185

160186
async def get_proc_mesh(
161187
self,
@@ -256,7 +282,7 @@ def bootstrap(env: dict[str, str]):
256282
env_vars[env_var.name] = str(env_var.get_value())
257283

258284
procs = host_mesh.spawn_procs(
259-
per_host={"gpus": num_procs},
285+
per_host={"procs": num_procs},
260286
bootstrap=functools.partial(bootstrap, env=env_vars),
261287
)
262288

src/forge/env.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,6 @@ def get_value(self) -> Any:
8282
description="Sets Monarch's stderr log level, i.e. set to 'info' or 'debug'",
8383
)
8484

85-
TORCHSTORE_USE_RDMA = EnvVar(
86-
name="TORCHSTORE_RDMA_ENABLED",
87-
default=False,
88-
description="Whether or not to use RDMA in TorchStore.",
89-
)
90-
9185
RUST_BACKTRACE = EnvVar(
9286
name="RUST_BACKTRACE",
9387
default="full",
@@ -106,6 +100,18 @@ def get_value(self) -> Any:
106100
description="Sets the maximum frame length for Monarch's actor message delivery in bytes.",
107101
)
108102

103+
MONARCH_HOSTMESH_V1 = EnvVar(
104+
name="MONARCH_HOSTMESH_V1",
105+
default=True,
106+
description="Whether or not to use Monarch's experimental hostmesh v1 APIs",
107+
)
108+
109+
TORCHSTORE_USE_RDMA = EnvVar(
110+
name="TORCHSTORE_RDMA_ENABLED",
111+
default=True,
112+
description="Whether or not to use RDMA in TorchStore.",
113+
)
114+
109115

110116
@functools.cache
111117
def all_env_vars() -> list[EnvVar]:

0 commit comments

Comments
 (0)