Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/forge/controller/service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .interface import ServiceInterface, Session, SessionContext
from .metrics import ServiceMetrics
from .replica import Replica, ReplicaMetrics
from .service import Service, ServiceConfig
from .service import Service, ServiceActor, ServiceConfig
from .spawn import shutdown_service, spawn_service

__all__ = [
Expand All @@ -19,6 +19,7 @@
"ServiceMetrics",
"Session",
"SessionContext",
"ServiceActor",
"spawn_service",
"shutdown_service",
]
106 changes: 97 additions & 9 deletions src/forge/controller/service/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,35 @@ class ServiceEndpoint(Generic[P, R]):

"""

def __init__(self, service, endpoint_name: str):
self.service = service
self.endpoint_name = endpoint_name

async def choose(self, *args: P.args, **kwargs: P.kwargs) -> R:
"""Chooses a replica to call based on context and load balancing strategy."""
# Extract sess_id from kwargs if present
sess_id = kwargs.pop("sess_id", None)
return await self.service._call(sess_id, self.endpoint_name, *args, **kwargs)

async def call(self, *args: P.args, **kwargs: P.kwargs) -> List[R]:
"""Broadcasts a request to all healthy replicas and returns the results as a list."""
result = await self.service.call_all(self.endpoint_name, *args, **kwargs)
return result


class ServiceEndpointV2(Generic[P, R]):
"""An endpoint object specific to services.

This loosely mimics the Endpoint APIs exposed in Monarch, with
a few key differences:
- Only choose and call are retained (dropping stream and call_one)
- Call returns a list directly rather than a ValueMesh.

These changes are made with Forge use cases in mind, but can
certainly be expanded/adapted in the future.

"""

def __init__(self, actor_mesh, endpoint_name: str):
self.actor_mesh = actor_mesh
self.endpoint_name = endpoint_name
Expand All @@ -108,6 +137,70 @@ async def call(self, *args: P.args, **kwargs: P.kwargs) -> List[R]:


class ServiceInterface:
"""
A lightweight interface to the base Service class.

This is a temporary workaround until Monarch supports nested
actors.

"""

def __init__(self, _service, actor_def):
self._service = _service
self.actor_def = actor_def

# Dynamically create ServiceEndpoint objects for user's actor endpoints
# Inspect the actor_def directly to find endpoints
for attr_name in dir(actor_def):
attr_value = getattr(actor_def, attr_name)
if isinstance(attr_value, EndpointProperty):
# Create a ServiceEndpoint that will route through the Service Actor
endpoint = ServiceEndpoint(self._service, attr_name)
setattr(self, attr_name, endpoint)

# Session management methods - handled by ServiceInterface
async def start_session(self) -> str:
"""Starts a new session for stateful request handling."""
return await self._service.start_session()

async def terminate_session(self, sess_id: str):
"""Terminates an active session and cleans up associated resources."""
return await self._service.terminate_session(sess_id)

def session(self) -> "SessionContext":
"""Returns a context manager for session-based calls."""
return SessionContext(self)

async def get_metrics(self):
"""Get comprehensive service metrics for monitoring and analysis."""
return self._service.get_metrics()

async def get_metrics_summary(self):
"""Get a summary of key metrics for monitoring and debugging."""
return self._service.get_metrics_summary()

# Testing method - forwarded to Service Actor
async def _get_internal_state(self):
"""
Get comprehensive internal state for testing purposes.

Returns:
dict: Complete internal state including sessions, replicas, and metrics
"""
return await self._service._get_internal_state()

def __getattr__(self, name: str):
"""Forward all other attribute access to the underlying Service Actor."""
_service = object.__getattribute__(self, "_service")
# Forward everything else to the _service
if hasattr(_service, name):
return getattr(_service, name)
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{name}'"
)


class ServiceInterfaceV2:
"""
A lightweight interface to a Service Actor running on a single-node mesh.

Expand All @@ -134,7 +227,7 @@ def __init__(self, _proc_mesh, _service, actor_def):
attr_value = getattr(actor_def, attr_name)
if isinstance(attr_value, EndpointProperty):
# Create a ServiceEndpoint that will route through the Service Actor
endpoint = ServiceEndpoint(self._service, attr_name)
endpoint = ServiceEndpointV2(self._service, attr_name)
setattr(self, attr_name, endpoint)

# Session management methods - handled by ServiceInterface
Expand All @@ -160,23 +253,18 @@ async def get_metrics_summary(self):
return await self._service.get_metrics_summary.call_one()

# Testing method - forwarded to Service Actor
def _get_internal_state(self):
async def _get_internal_state(self):
"""
Get comprehensive internal state for testing purposes.

Returns:
dict: Complete internal state including sessions, replicas, and metrics
"""
return self._service._get_internal_state.call_one()
return await self._service._get_internal_state.call_one()

def __getattr__(self, name: str):
"""Forward all other attribute access to the underlying Service Actor."""
try:
_service = object.__getattribute__(self, "_service")
except AttributeError:
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{name}'"
)
_service = object.__getattribute__(self, "_service")
# Forward everything else to the _service
if hasattr(_service, name):
return getattr(_service, name)
Expand Down
Loading
Loading