-
Notifications
You must be signed in to change notification settings - Fork 16
Auto-track and globally shut down all Forge actors and services #357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
cc5fb5a
9cc5531
84dcd17
8ba3e5f
5b8d81c
5fe99d5
36225af
d705df9
52fa867
f0ba99a
4e477b5
e7ae73d
eb9f667
77c2344
b39fc4b
d4f3d57
0db9ed6
f31aa3a
ffbf0ca
7c5417d
5dc138c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,7 @@ | |
|
||
from monarch.actor import Actor, current_rank, current_size, endpoint | ||
|
||
from forge.controller.provisioner import get_proc_mesh, stop_proc_mesh | ||
from forge.controller.provisioner import _get_provisioner, get_proc_mesh, stop_proc_mesh | ||
|
||
from forge.types import ProcessConfig, ServiceConfig | ||
|
||
|
@@ -127,7 +127,9 @@ async def as_service( | |
logger.info("Spawning Service for %s", cls.__name__) | ||
service = Service(cfg, cls, actor_args, actor_kwargs) | ||
await service.__initialize__() | ||
return ServiceInterface(service, cls) | ||
service_interface = ServiceInterface(service, cls) | ||
await cls.register_allocation(service_interface) | ||
return service_interface | ||
|
||
@endpoint | ||
async def setup(self): | ||
|
@@ -144,6 +146,15 @@ async def setup(self): | |
""" | ||
pass | ||
|
||
@classmethod | ||
async def register_allocation(cls, alloc: "ForgeActor | ServiceInterface") -> None: | ||
"""Registers an allocation (service/actor) with the provisioner.""" | ||
try: | ||
provisioner = await _get_provisioner() | ||
await provisioner.track_allocation(alloc) | ||
except Exception as e: | ||
logger.warning(f"Failed to register allocation {alloc}: {e}") | ||
|
||
@classmethod | ||
async def launch(cls, *args, **kwargs) -> "ForgeActor": | ||
"""Provisions and deploys a new actor. | ||
|
@@ -185,13 +196,16 @@ async def as_actor(cls: Type[T], *args, **actor_kwargs) -> T: | |
""" | ||
logger.info("Spawning single actor %s", cls.__name__) | ||
actor = await cls.launch(*args, **actor_kwargs) | ||
await cls.register_allocation(actor) | ||
return actor | ||
|
||
@classmethod | ||
async def shutdown(cls, actor: "ForgeActor"): | ||
async def shutdown(cls, actor: "ForgeActor", quiet: bool = False): | ||
"""Shuts down an actor. | ||
This method is used by `Service` to teardown a replica. | ||
""" | ||
if not quiet: | ||
logger.info(f"Shutting down actor {getattr(actor, 'name', cls.__name__)}") | ||
|
||
if actor._proc_mesh is None: | ||
raise AssertionError("Called shutdown on a replica with no proc_mesh.") | ||
await stop_proc_mesh(actor._proc_mesh) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,8 @@ | |
import socket | ||
import uuid | ||
|
||
from typing import Any, Optional | ||
|
||
from monarch._src.actor.shape import NDSlice, Shape | ||
from monarch.actor import Actor, endpoint, HostMesh, ProcMesh, this_host | ||
from monarch.tools import commands | ||
|
@@ -131,6 +133,8 @@ def __init__(self, cfg: ProvisionerConfig | None = None): | |
if not self.launcher: | ||
logger.warning("Launcher not provided, remote allocations will not work.") | ||
|
||
self._allocations: list[Any] = [] # all live actor/service instances | ||
|
||
async def initialize(self): | ||
"""Call this after creating the instance""" | ||
if self.launcher is not None: | ||
|
@@ -302,8 +306,40 @@ async def stop_proc_mesh(self, proc_mesh: ProcMesh): | |
commands.kill(server_name) | ||
del self._proc_host_map[proc_mesh] | ||
|
||
async def track_allocation(self, alloc: Any): | ||
"""Tracks an allocation for cleanup.""" | ||
self._allocations.append(alloc) | ||
|
||
|
||
async def shutdown_all_allocations(self): | ||
"""Gracefully shut down all tracked actors and services.""" | ||
from monarch._src.actor.actor_mesh import ActorMesh | ||
|
||
|
||
from forge.controller.actor import ForgeActor | ||
from forge.controller.service import ServiceInterface | ||
|
||
for alloc in reversed(self._allocations): | ||
try: | ||
# --- ServiceInterface --- | ||
if isinstance(alloc, ServiceInterface): | ||
await alloc.shutdown() | ||
|
||
# --- Actor instance (ForgeActor or underlying ActorMesh) --- | ||
elif isinstance(alloc, (ForgeActor, ActorMesh)): | ||
# Get the class to call shutdown on (ForgeActor or its bound class) | ||
actor_cls = getattr(alloc, "_class", None) or alloc.__class__ | ||
await actor_cls.shutdown(alloc) | ||
|
||
else: | ||
logger.warning(f"Unknown allocation type: {type(alloc)}") | ||
|
||
except Exception as e: | ||
logger.warning(f"Failed to shut down {alloc}: {e}") | ||
|
||
self._allocations.clear() | ||
|
||
async def shutdown(self): | ||
"""Tears down all remaining remote allocations.""" | ||
await self.shutdown_all_allocations() | ||
async with self._lock: | ||
for server_name in self._server_names: | ||
commands.kill(server_name) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -200,6 +200,7 @@ async def shutdown(self) -> None: | |
""" | ||
Shut down the underlying Service. | ||
""" | ||
logger.info(f"Shutting down service {self.actor_def.__name__}") | ||
|
||
await self._service.stop() | ||
|
||
def session(self) -> "SessionContext": | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@DNXie @felipemello1 maybe we can just move the mlogger shutdown into the global shutdown as well?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved it into
shutdown()
.