Skip to content

Conversation

DNXie
Copy link
Member

@DNXie DNXie commented Sep 19, 2025

Summary:
This PR adds ForgeActor.as_actor() and refactors ForgeActor.options() and .as_service() to improve configuration handling and dynamic subclassing for actors and services. Context: #173

Key changes include:

  • Added as_actor() to support launching a single actor directly.
  • Support positional argument for Service and Actor
  • Rename num_hosts to hosts and num_procs to procs in ProcessConfig.
  • .options() now stores all configuration parameters as class attributes rather than building a full config object immediately.
  • Dynamic subclasses are created only during .as_actor() or .as_service() calls, ensuring each configuration remains isolated.
  • Default configuration is applied automatically if .options() is not called.

Changes in behavior:

So the single actor initialization from

cfg = ProcessConfig(...)
actor = await MyForgeActor.launch(process_config=cfg, **actor_kwargs)

become

actor = await MyForgeActor.options(procs=1, ...).as_actor(**actor_kwargs)

Usage Examples 1 (Actor):

# Pre-configure a single actor
actor = await MyForgeActor.options(procs=1, hosts=1).as_actor(...)
await actor.shutdown()

# Default usage without calling options
actor = await MyForgeActor.as_actor(...)
await actor.shutdown()

Log:

Spawning single actor Counter

Usage Examples 2 (Service):

# Pre-configure a service with multiple replicas
service = await MyForgeActor.options(num_replicas=3, procs=2).as_service(...)
await service.shutdown()

# Default usage without calling options
service = await MyForgeActor.as_service(...)
await service.shutdown()

Log when num_replicas=3

The printed class name is its original class name instead of xxService (See #193)

INFO     forge.controller.actor:actor.py:123 Spawning Service Actor for Counter
INFO     forge.controller.actor:actor.py:207 Spawning single actor Counter
INFO     forge.controller.actor:actor.py:207 Spawning single actor Counter
INFO     forge.controller.actor:actor.py:207 Spawning single actor Counter

Usage Examples 3 (Positional argument):
This means you can now do:

await Counter.as_service(10)

instead of having to use keyword-only arguments like:

await Counter.as_service(v=10)

Test

pytest tests/unit_tests/test_service.py

@DNXie DNXie requested a review from allenwang28 September 19, 2025 18:59
@meta-cla meta-cla bot added the CLA Signed This label is managed by the Meta Open Source bot. label Sep 19, 2025
@Ritesh1905
Copy link
Contributor

Wondering what is the motivation behind this? Why would one choose an actor creation directly over service? Single actor seems to be a special case of service?

@allenwang28
Copy link
Contributor

Wondering what is the motivation behind this? Why would one choose an actor creation directly over service? Single actor seems to be a special case of service?

For context: #173

You're right though, the rationale is that only vLLM should be a service currently. Trainer for e.g. will not really take advantage of fault tolerance or routing, so we should always expect it to be a singleton.

@DNXie DNXie requested a review from allenwang28 September 22, 2025 21:12
num_replicas: int | None = None,
procs: int | None = None,
**service_kwargs,
procs: int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also hosts: int, with_gpu: bool and num_replicas: int | None?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of put them all in **kwargs since only procs is required for both service and actor. Do you think it is better to explicitly list them?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, please explicitly list them

class_attrs["num_replicas"] = 1
cfg = ServiceConfig(**filter_config_params(ServiceConfig, class_attrs))

service_cls = type(f"{cls.__name__}Service", (cls,), {"_service_config": cfg})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we still need service_cls here? can the logic of as_service() be:

@classmethod
async def as_service(cls, **actor_kwargs) -> "ServiceInterface":
    service = Service(cfg, cls, actor_kwargs)
    await service.__initialize__()
    return ServiceInterface(service, cls)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right! Removed.

cfg = ProcessConfig(**filter_config_params(ProcessConfig, class_attrs))

logger.info("Spawning single actor %s", cls.__name__)
actor = await cls.launch(process_config=cfg, **actor_kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm maybe we can modify the def launch() above to simplify things?

Like this:

@classmethod
async def launch(cls, *args, **kwargs) -> "ForgeActor":
    proc_mesh = await get_proc_mesh(process_config=ProcessConfig(procs=cls._procs, hosts=cls._hosts, with_gpu=cls._with_gpu))
    actor_name = kwargs.pop("name", cls.__name__)
    actor = await proc_mesh.spawn(actor_name, cls, *args, **kwargs)
    actor._proc_mesh = proc_mesh
    if hasattr(proc_mesh, "_hostname") and hasattr(proc_mesh, "_port"):
        host, port = proc_mesh._hostname, proc_mesh._port
        await actor.set_env.call(addr=host, port=port)
    await actor.setup.call()
    return actor
    

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

@DNXie DNXie requested a review from allenwang28 September 22, 2025 23:53
num_replicas: int | None = None,
procs: int | None = None,
**service_kwargs,
procs: int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, please explicitly list them

# dynamically create a configured subclass for consistency
cls = type(f"{cls.__name__}Service", (cls,), {"_service_config": cfg})
class_attrs = {k: v for k, v in cls.__dict__.items() if not k.startswith("__")}
if "procs" not in class_attrs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow up comment on explicit attributes, this for e.g. is unclear and can be pretty brittle

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in the latest version

proc_mesh = await get_proc_mesh(process_config=process_config)
# Build process config from class attributes with defaults
cfg = ProcessConfig(
procs=getattr(cls, "procs", 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally try and use getattr as little as possible. If it's used too much it can mask real errors that can be really hard to debug later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fallback when the user doesn’t specify configs via .options(). In this case, the original ForgeActor class doesn’t have attributes like procs. If we are getting rid of getattr, one way I can think of is to add these attributes to ForgeActor class like

class ForgeActor(Actor):
    procs: int = 1
    hosts: int | None = None
    with_gpus: bool = False
    num_replicas: int = 1

    def __init__(self, *args, **kwargs):

But either way, it means the default values are specified in three places:

  1. In types.py
  2. As default values in .options()
  3. As attributes on the ForgeActor class OR here in launch.

I’m not sure if there’s a cleaner way to handle this. I’ve updated the code accordingly (get rid of getattr), please take a look and let me know if you have any suggestions or improvements.

actor = await cls.launch(**actor_kwargs)

# Patch shutdown to bypass endpoint system
actor.shutdown = types.MethodType(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm this is a hack, we shouldn't be doing this. I'm guessing it's because we want to preserve the ability to

svc = MyActor.as_service()

await svc.shutdown()

?

Copy link
Member Author

@DNXie DNXie Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, as_service returns a ServiceInterface. So when we call service.shutdown(), we are actually calling ServiceInterface.shutdown

The reason I have to do this hacky thing is:
Without it, actor.shutdown() gives me this error:

RuntimeError: Actor <class 'tests.unit_tests.test_service.Counter'>.shutdown is not annotated as an endpoint. To call it as one, add a @endpoint decorator to it, or directly wrap it in one as_endpoint(obj.method).call(...)

If I simply decorate shutdown with @endpoint, we'd have to call it like

await actor.shutdown.call()

But it would still give error:

AssertionError("Called shutdown on a replica with no proc_mesh.")

Any suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see. Ok in that case, I think what we should do is not do actor.shutdown() for now, and just rely on eg

await RLTrainer.stop(trainer)

for now. Maybe what we can do next is have the provisioner keep track of all of the proc meshes, and do a global shutdown()? Including all the services etc. we can discuss more, just want to unblock this PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Done!

@DNXie DNXie requested a review from allenwang28 September 23, 2025 19:04

@classmethod
async def launch(cls, *, process_config: ProcessConfig, **kwargs) -> "ForgeActor":
async def launch(cls, **kwargs) -> "ForgeActor":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add *args here? This solves the *args related TODO that's listed here!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in launch and as_actor. Also tested in test_as_actor_with_kwargs_config

Copy link
Contributor

@allenwang28 allenwang28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ty @DNXie!

# Option C: skip options, use the default service config with num_replicas=1, procs=1
service = await MyForgeActor.as_service(...)
await service.shutdown()
Returns a dynamically created subclass of this ForgeActor with bound configuration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Returns a dynamically created subclass of this ForgeActor with bound configuration.
Returns a version of ForgeActor with configured resource attributes.

self.kwargs = kwargs

@endpoint
async def get_args(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove these tests, i think it's fine without

@DNXie DNXie merged commit 88fcd6b into meta-pytorch:main Sep 23, 2025
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Meta Open Source bot.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants