Skip to content

[RFC] Service API updates - adverbs, topology awareness, custom routersΒ #160

@allenwang28

Description

@allenwang28

Continuing some thoughts from #133:

The experience of starting a service will look something as follows:

class MyActor(Actor):
    @endpoint
    def foo(self) -> str:
        return "bar"

i.e., you still define Monarch actors and you can spawn Actors as services as follows (after #153):

service = await MyActor.options(replicas=1, procs_per_replica=1, hosts_per_replica=1).as_service()

(if you don't provide options, it defaults to 1/1/1 as above)

You stop a service with:

await service.shutdown()

Proposal 1: Service Adverbs updates

Specifically, change call() and choose() Service adverbs to route() and fanout(). This avoids the naming collision from Monarch (which has call() and choose() already) while still being specific to what is happening:

with policy.session():
     output = policy.generate.route()

Proposal 2: Topology Awareness

In order to support Policy service, we do something as follows:

class Policy(Actor):
    @classmethod
    def launch(cls, ...):
        policy_proc = get_proc_mesh(...)
        worker_proc = get_proc_mesh(...)
        workers = worker_proc.spawn(...)
        policy = policy_proc.spawn(..., workers=workers, ...)

This works, although is a bit magical. A proposal could be to introduce topology awareness into options:

policy = Policy.options(
    topology={
        "worker": PolicyWorker.options(hosts_per_replica=1, procs_per_replica=8, replicas=4)
    }).as_service(**policy_kwargs)

When a Service creates new replicas, it creates resources based on the actor definition's class attributes. This will only work
if the top level actor implements set_dependencies:

class Policy:
    @endpoint
    def set_dependencies(self, dependencies: dict[str, Actor]):
        policy_worker: PolicyWorker = dependencies["worker"]
        self.worker = policy_worker

and otherwise will raise an error.

Proposal 3: Custom Routers

Service implements basic round_robin, but can be extended to other types of routers. E.g., one could be batching (see #120):

class ReferenceActor(Actor):
    @service_endpoint(router=BatchedRouter(timeout_in_s=5, batch_size=4)) # batch either when you hit 5s, or batch size is 4
    def forward(self, token_batch: list[int]) -> list[torch.Tensor]:
         ...


reference = ReferenceActor.as_service(...)

await reference.forward.route() # batches behind the scene, but the correct response is routed

Implementing a custom router looks something like this:

class CustomRouter:
    def __call__(self, request: ServiceRequest, replicas: List[Replica]) -> Any:
        """Route request to appropriate replica(s) and return result."""
        pass

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions