-
Notifications
You must be signed in to change notification settings - Fork 21
Refactor service spawning: add ForgeActor.options().as_service() API #153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 22 commits
da21e1d
5c72908
b4d7a61
02d77c6
fd1d38b
f79beee
d8d775a
e423c44
4815c05
77d41e4
a3feb1e
23d7e02
2ca881d
4df5d3a
0b5c0db
1cc5cf2
a92952a
2ce61d1
f32fef7
f28824d
549f43a
26a4207
a311cbd
1261568
c1854ec
e595fbd
52cb676
fd38100
9a80b16
09e7237
0165027
721e32a
915baf1
4171675
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,11 +8,13 @@ | |
|
||
import math | ||
import sys | ||
from typing import Type | ||
|
||
from monarch.actor import Actor, current_rank, current_size, endpoint | ||
|
||
from forge.controller.proc_mesh import get_proc_mesh, stop_proc_mesh | ||
from forge.types import ProcessConfig | ||
|
||
from forge.types import ProcessConfig, ServiceConfig | ||
|
||
logger = logging.getLogger(__name__) | ||
logger.setLevel(logging.DEBUG) | ||
|
@@ -41,6 +43,116 @@ def __init__(self, *args, **kwargs): | |
self.logger.root.addHandler(stdout_handler) | ||
super().__init__(*args, **kwargs) | ||
|
||
@classmethod | ||
def options( | ||
cls, | ||
*, | ||
service_config: ServiceConfig | None = None, | ||
num_replicas: int | None = None, | ||
procs_per_replica: int | None = None, | ||
**service_kwargs, | ||
) -> Type["ConfiguredService"]: | ||
""" | ||
Returns a ConfiguredService class that wraps this ForgeActor in a Service. | ||
|
||
Usage (choose ONE of the following forms): | ||
# Option A: construct ServiceConfig implicitly | ||
service = await MyForgeActor.options( | ||
num_replicas=1, | ||
procs_per_replica=2, | ||
).as_service(...) | ||
await service.shutdown() | ||
|
||
# Option B: provide an explicit ServiceConfig | ||
cfg = ServiceConfig(num_replicas=1, procs_per_replica=2, scheduling="round_robin") | ||
service = await MyForgeActor.options(service_config=cfg).as_service(...) | ||
await service.shutdown() | ||
|
||
""" | ||
from forge.controller.service import Service, ServiceInterface | ||
|
||
if service_config is not None: | ||
# Use the provided config directly | ||
cfg = service_config | ||
else: | ||
if num_replicas is None or procs_per_replica is None: | ||
raise ValueError( | ||
"Must provide either `service_config` or (num_replicas + procs_per_replica)." | ||
) | ||
cfg = ServiceConfig( | ||
num_replicas=num_replicas, | ||
procs_per_replica=procs_per_replica, | ||
**service_kwargs, | ||
) | ||
|
||
class ConfiguredService: | ||
|
||
""" | ||
A wrapper around Service that binds a ForgeActor class. | ||
Provides: | ||
- as_service(): spawns the actor inside the service | ||
- shutdown(): stops the service | ||
""" | ||
|
||
_actor_def = cls | ||
_service_interface: ServiceInterface | None | ||
|
||
def __init__(self) -> None: | ||
self._service_interface = None | ||
|
||
@classmethod | ||
async def as_service(cls, **actor_kwargs) -> "ConfiguredService": | ||
DNXie marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
""" | ||
Spawn the actor inside a Service with the given configuration. | ||
|
||
Args: | ||
**actor_kwargs: arguments to pass to the ForgeActor constructor | ||
|
||
Returns: | ||
self: so that methods like .shutdown() can be called | ||
""" | ||
self = cls() | ||
logger.info("Spawning Service Actor for %s", self._actor_def.__name__) | ||
service = Service(cfg, self._actor_def, actor_kwargs) | ||
await service.__initialize__() | ||
self._service_interface = ServiceInterface(service, self._actor_def) | ||
return self | ||
|
||
async def shutdown(self): | ||
""" | ||
Gracefully stops the service if it has been started. | ||
""" | ||
if self._service_interface is None: | ||
raise RuntimeError("Service not started yet") | ||
await self._service_interface._service.stop() | ||
self._service_interface = None | ||
|
||
def __getattr__(self, item): | ||
""" | ||
Delegate attribute access to the ServiceInterface instance. | ||
This makes ConfiguredService behave like a ServiceInterface. | ||
""" | ||
if self._service_interface is None: | ||
raise AttributeError( | ||
f"Service not started yet; cannot access '{item}'" | ||
) | ||
return getattr(self._service_interface, item) | ||
|
||
return ConfiguredService | ||
|
||
@classmethod | ||
async def as_service(cls, **actor_kwargs) -> "ConfiguredService": | ||
""" | ||
Spawn this ForgeActor inside a Service with default configuration. | ||
Defaults: num_replicas=1, procs_per_replica=1 | ||
|
||
Usage: | ||
service = await MyForgeActor.as_service(...) | ||
await service.shutdown() | ||
""" | ||
return await cls.options(num_replicas=1, procs_per_replica=1).as_service( | ||
**actor_kwargs | ||
) | ||
|
||
@endpoint | ||
async def setup(self): | ||
"""Sets up the actor. | ||
|
Uh oh!
There was an error while loading. Please reload this page.