-
Notifications
You must be signed in to change notification settings - Fork 26
Add as_actor single-actor mode
#195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
15a5f0f
1aa0dc3
be146a4
c8a1733
72d0315
aeb6282
be1fbe9
782e67d
310b04d
5565b03
9eb7c8f
56c5b5e
53f773c
e7f5a76
3e733de
88dc895
82eb6ca
691344c
d67e55e
645dea4
0862965
48d70e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -8,6 +8,7 @@ | |||||
|
|
||||||
| import math | ||||||
| import sys | ||||||
| import types | ||||||
| from typing import Type, TypeVar | ||||||
|
|
||||||
| from monarch.actor import Actor, current_rank, current_size, endpoint | ||||||
|
|
@@ -21,6 +22,16 @@ | |||||
| T = TypeVar("T", bound="ForgeActor") | ||||||
|
|
||||||
|
|
||||||
| def filter_config_params(cls, kwargs: dict) -> dict: | ||||||
| from inspect import signature | ||||||
|
|
||||||
| """ | ||||||
| Filters kwargs to only include parameters that are valid for the given config class. | ||||||
| """ | ||||||
| sig = signature(cls) | ||||||
| return {k: v for k, v in kwargs.items() if k in sig.parameters} | ||||||
|
|
||||||
|
|
||||||
| class ForgeActor(Actor): | ||||||
| def __init__(self, *args, **kwargs): | ||||||
| if not hasattr(self, "_rank"): | ||||||
|
|
@@ -48,73 +59,64 @@ def __init__(self, *args, **kwargs): | |||||
| def options( | ||||||
| cls: Type[T], | ||||||
| *, | ||||||
| service_config: ServiceConfig | None = None, | ||||||
| num_replicas: int | None = None, | ||||||
| procs: int | None = None, | ||||||
| **service_kwargs, | ||||||
| procs: int, | ||||||
| **kwargs, | ||||||
| ) -> Type[T]: | ||||||
| """ | ||||||
| Returns a subclass of this ForgeActor with a bound ServiceConfig. | ||||||
| The returned subclass can later be launched via `.as_service()`. | ||||||
|
|
||||||
| Usage (choose ONE of the following forms): | ||||||
| # Option A: construct ServiceConfig implicitly | ||||||
| service = await MyForgeActor.options( | ||||||
| num_replicas=1, | ||||||
| procs=2, | ||||||
| ).as_service(...) | ||||||
| await service.shutdown() | ||||||
|
|
||||||
| # Option B: provide an explicit ServiceConfig | ||||||
| cfg = ServiceConfig(num_replicas=1, procs=2, ..) | ||||||
| service = await MyForgeActor.options(service_config=cfg).as_service(...) | ||||||
| await service.shutdown() | ||||||
|
|
||||||
| # Option C: skip options, use the default service config with num_replicas=1, procs=1 | ||||||
| service = await MyForgeActor.as_service(...) | ||||||
| await service.shutdown() | ||||||
| Returns a dynamically created subclass of this ForgeActor with bound configuration. | ||||||
|
||||||
| Returns a dynamically created subclass of this ForgeActor with bound configuration. | |
| Returns a version of ForgeActor with configured resource attributes. |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
follow up comment on explicit attributes, this for e.g. is unclear and can be pretty brittle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in the latest version
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we still need service_cls here? can the logic of as_service() be:
@classmethod
async def as_service(cls, **actor_kwargs) -> "ServiceInterface":
service = Service(cfg, cls, actor_kwargs)
await service.__initialize__()
return ServiceInterface(service, cls)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right! Removed.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm maybe we can modify the def launch() above to simplify things?
Like this:
@classmethod
async def launch(cls, *args, **kwargs) -> "ForgeActor":
proc_mesh = await get_proc_mesh(process_config=ProcessConfig(procs=cls._procs, hosts=cls._hosts, with_gpu=cls._with_gpu))
actor_name = kwargs.pop("name", cls.__name__)
actor = await proc_mesh.spawn(actor_name, cls, *args, **kwargs)
actor._proc_mesh = proc_mesh
if hasattr(proc_mesh, "_hostname") and hasattr(proc_mesh, "_port"):
host, port = proc_mesh._hostname, proc_mesh._port
await actor.set_env.call(addr=host, port=port)
await actor.setup.call()
return actor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm this is a hack, we shouldn't be doing this. I'm guessing it's because we want to preserve the ability to
svc = MyActor.as_service()
await svc.shutdown()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, as_service returns a ServiceInterface. So when we call service.shutdown(), we are actually calling ServiceInterface.shutdown
The reason I have to do this hacky thing is:
Without it, actor.shutdown() gives me this error:
RuntimeError: Actor <class 'tests.unit_tests.test_service.Counter'>.shutdown is not annotated as an endpoint. To call it as one, add a @endpoint decorator to it, or directly wrap it in one as_endpoint(obj.method).call(...)If I simply decorate shutdown with @endpoint, we'd have to call it like
await actor.shutdown.call()
But it would still give error:
AssertionError("Called shutdown on a replica with no proc_mesh.")Any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I see. Ok in that case, I think what we should do is not do actor.shutdown() for now, and just rely on eg
await RLTrainer.stop(trainer)
for now. Maybe what we can do next is have the provisioner keep track of all of the proc meshes, and do a global shutdown()? Including all the services etc. we can discuss more, just want to unblock this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also
hosts: int,with_gpu: boolandnum_replicas: int | None?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kind of put them all in
**kwargssince onlyprocsis required for both service and actor. Do you think it is better to explicitly list them?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, please explicitly list them