Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/forge/actors/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,12 @@ async def launch( # pyright: ignore[reportIncompatibleMethodOverride]
policy_proc_config.procs = 1
policy_proc_config.hosts = None
policy_proc_config.with_gpus = False

policy_proc = await get_proc_mesh(process_config=policy_proc_config)

if isinstance(engine_config, Mapping):
engine_config = EngineConfig.from_dict(engine_config)

vllm_config = engine_config.create_vllm_config()
# TODO (felipemello): LocalFetcherActor doesnt spawn with this, so cannot
# do logging within PolicyWorker
workers = worker_procs.spawn(
"vllm_worker", PolicyWorker, vllm_config=vllm_config, use_dcp=use_dcp
)
Expand Down
17 changes: 15 additions & 2 deletions src/forge/controller/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
from .actor import ForgeActor
from .proc_mesh import get_proc_mesh, stop_proc_mesh
from .provisioner import (
get_proc_mesh,
host_mesh_from_proc,
init_provisioner,
shutdown,
stop_proc_mesh,
)

__all__ = ["stop_proc_mesh", "get_proc_mesh", "ForgeActor"]
__all__ = [
"ForgeActor",
"get_proc_mesh",
"stop_proc_mesh",
"init_provisioner",
"shutdown",
"host_mesh_from_proc",
]
28 changes: 1 addition & 27 deletions src/forge/controller/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from monarch.actor import Actor, current_rank, current_size, endpoint

from forge.controller.proc_mesh import get_proc_mesh, stop_proc_mesh
from forge.controller.provisioner import get_proc_mesh, stop_proc_mesh

from forge.types import ProcessConfig, ServiceConfig

Expand Down Expand Up @@ -144,28 +144,6 @@ async def setup(self):
"""
pass

@endpoint
async def set_env(self, addr: str, port: str):
"""A temporary workaround to set master addr/port.

TODO - issues/144. This should be done in proc_mesh creation.
The ideal path:
- Create a host mesh
- Grab a host from host mesh, from proc 0 spawn an actor that
gets addr/port
- Spawn procs on the HostMesh with addr/port, setting the
addr/port in bootstrap.

We can't currently do this because HostMesh only supports single
proc_mesh creation at the moment. This will be possible once
we have "proper HostMesh support".

"""
import os

os.environ["MASTER_ADDR"] = addr
os.environ["MASTER_PORT"] = port

@classmethod
async def launch(cls, *args, **kwargs) -> "ForgeActor":
"""Provisions and deploys a new actor.
Expand Down Expand Up @@ -193,10 +171,6 @@ async def launch(cls, *args, **kwargs) -> "ForgeActor":
actor_name = kwargs.pop("name", cls.__name__)
actor = proc_mesh.spawn(actor_name, cls, *args, **kwargs)
actor._proc_mesh = proc_mesh

if hasattr(proc_mesh, "_hostname") and hasattr(proc_mesh, "_port"):
host, port = proc_mesh._hostname, proc_mesh._port
await actor.set_env.call(addr=host, port=port)
await actor.setup.call()
return actor

Expand Down
31 changes: 9 additions & 22 deletions src/forge/controller/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

import getpass
import os
import socket
import subprocess

import tempfile
import uuid
from typing import Any

Expand All @@ -32,28 +33,14 @@
from torchx.specs import AppState
from torchx.specs.fb.component_helpers import Packages
except ImportError as e:
print(f"Warning: Monarch meta/fb inetrnal imports failed: {e}")
print("Monarch functionality will be limited")
# This means there is an erorr with MAST
pass

JOB_NAME_KEY = "job_name"
LAUNCHER_KEY = "launcher"


def _get_port() -> str:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("localhost", 0))
addr = s.getsockname()
port = addr[1]
return str(port)


class SetupActor(Actor):
@endpoint
def get_info(self) -> [str, str]:
return socket.gethostname(), _get_port()


class MastSetupActor(SetupActor):
class MastSetupActor(Actor):
@endpoint
def mount(self, mount_dst: str):
point = current_rank()
Expand Down Expand Up @@ -138,11 +125,12 @@ async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]
role.resource.cpu = 128
role.resource.gpu = 8

# TODO - multi scheduler support
# Note - we cannot add in an empty workspace, so we create a fake temporary one
temp_workspace = tempfile.mkdtemp(prefix="forge_workspace_")
server_config = Config(
scheduler="slurm",
appdef=appdef,
workspace=monarch.tools.config.workspace.Workspace(dirs=[""]),
workspace=monarch.tools.config.workspace.Workspace(dirs=[temp_workspace]),
)
server_info = await commands.get_or_create(
"forge_job",
Expand All @@ -157,8 +145,7 @@ async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]
return alloc, None, server_name # (Allocator, AllocConstraints, SeverName)

async def remote_setup(self, procs: ProcMesh) -> tuple[str, int]:
setup = procs.spawn(f"setup-{uuid.uuid1()}", SetupActor)
return await setup.get_info.choose()
return


class Mastlauncher(BaseLauncher):
Expand Down
30 changes: 0 additions & 30 deletions src/forge/controller/proc_mesh.py

This file was deleted.

Loading
Loading