Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 61 additions & 7 deletions src/forge/controller/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def options(
cls: Type[T],
*,
service_config: ServiceConfig | None = None,
process_config: ProcessConfig | None = None,
num_replicas: int | None = None,
procs_per_replica: int | None = None,
**service_kwargs,
Expand All @@ -57,7 +58,11 @@ def options(
Returns a subclass of this ForgeActor with a bound ServiceConfig.
The returned subclass can later be launched via `.as_service()`.

Usage (choose ONE of the following forms):
Usage modes:

---- Service Mode (default) ----
Use when deploying a replicated service (multiple replicas, each with N procs).

# Option A: construct ServiceConfig implicitly
service = await MyForgeActor.options(
num_replicas=1,
Expand All @@ -73,10 +78,42 @@ def options(
# Option C: skip options, use the default service config with num_replicas=1, procs_per_replica=1
service = await MyForgeActor.as_service(...)
await service.shutdown()

---- Single Actor Mode ----
Use when launching just one actor directly (without Service abstraction).
Must provide a ProcessConfig.

cfg = ProcessConfig(...)
actor = await MyForgeActor.options(process_config=cfg).as_actor(...)
await actor.shutdown()

---- Notes ----
- If `process_config` is passed, we bind to an actor configuration
and expect `.as_actor(...)` to be called later.
- Otherwise (default), we bind to a service configuration and expect
`.as_service(...)` to be called later.
- Passing both `service_config` and `process_config` is invalid.
"""

if service_config is not None:
if service_config is not None and process_config is not None:
raise ValueError(
"Cannot pass both `service_config` and `process_config`. "
"Use either `service_config` for service mode or `process_config` for single actor mode."
)

if process_config is not None:
return type(
f"{cls.__name__}Actor",
(cls,),
{"_process_config": process_config},
)
elif service_config is not None:
cfg = service_config
return type(
f"{cls.__name__}Service",
(cls,),
{"_service_config": cfg},
)
else:
if num_replicas is None or procs_per_replica is None:
raise ValueError(
Expand All @@ -88,11 +125,11 @@ def options(
**service_kwargs,
)

return type(
f"{cls.__name__}Service",
(cls,),
{"_service_config": cfg},
)
return type(
f"{cls.__name__}Service",
(cls,),
{"_service_config": cfg},
)

@classmethod
async def as_service(cls: Type[T], **actor_kwargs) -> "ServiceInterface":
Expand Down Expand Up @@ -180,6 +217,23 @@ async def launch(cls, *, process_config: ProcessConfig, **kwargs) -> "ForgeActor
await actor.setup.call()
return actor

@classmethod
async def as_actor(cls: Type[T], **actor_kwargs) -> T:
"""
Spawns a single actor using the ProcessConfig bound in `.options()`.
Example:
cfg = ProcessConfig(...)
actor = await MyForgeActor.options(process_config=cfg).as_actor(...)
"""
cfg = getattr(cls, "_process_config", None)
if cfg is None:
raise ValueError(
"No process_config found. Use `.options(process_config=...)` before calling `.as_actor()`."
)
logger.info("Spawning single actor %s", cls.__name__)
actor = await cls.launch(process_config=cfg, **actor_kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm maybe we can modify the def launch() above to simplify things?

Like this:

@classmethod
async def launch(cls, *args, **kwargs) -> "ForgeActor":
    proc_mesh = await get_proc_mesh(process_config=ProcessConfig(procs=cls._procs, hosts=cls._hosts, with_gpu=cls._with_gpu))
    actor_name = kwargs.pop("name", cls.__name__)
    actor = await proc_mesh.spawn(actor_name, cls, *args, **kwargs)
    actor._proc_mesh = proc_mesh
    if hasattr(proc_mesh, "_hostname") and hasattr(proc_mesh, "_port"):
        host, port = proc_mesh._hostname, proc_mesh._port
        await actor.set_env.call(addr=host, port=port)
    await actor.setup.call()
    return actor
    

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

return actor

@classmethod
async def shutdown(cls, actor: "ForgeActor"):
"""Shuts down an actor.
Expand Down
7 changes: 3 additions & 4 deletions src/forge/controller/service/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,10 @@ async def initialize(self):
try:
# Deploy the actor and its underlying resources
logger.debug(f"Launching actor for replica {self.idx}")
self.actor = await self.actor_def.launch(
process_config=self.proc_config,
**self.actor_kwargs,
)

self.actor = await self.actor_def.options(
process_config=self.proc_config
).as_actor(**self.actor_kwargs)
# Transition to healthy state and start processing
self.state = ReplicaState.HEALTHY
self.start_processing()
Expand Down
Loading