Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions python/ray/serve/_private/request_router/request_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,31 @@ def __init__(
}
)

def _compute_backoff_s(self, attempt: int) -> float:
"""Compute the backoff time in seconds for a given retry attempt.

Uses exponential backoff with the class-level backoff parameters.

Args:
attempt: The retry attempt number (0-indexed).

Returns:
The number of seconds to sleep before the next retry.
"""
return min(
self.initial_backoff_s * (self.backoff_multiplier**attempt),
self.max_backoff_s,
)

async def _backoff(self, attempt: int) -> None:
"""Sleep for the appropriate backoff time for a given retry attempt.

Args:
attempt: The retry attempt number (0-indexed).
"""
backoff_s = self._compute_backoff_s(attempt)
await asyncio.sleep(backoff_s)

def _update_router_queue_len_gauge(
self, replica_id: ReplicaID, queue_len: int, *, force: bool = False
) -> None:
Expand Down Expand Up @@ -645,6 +670,19 @@ def initialize_state(self, **kwargs):
"""
pass

@property
def supports_rejection_protocol(self) -> bool:
"""Whether this router supports the rejection protocol.

The rejection protocol is used when replicas may reject requests due to
being at capacity. Routers that guarantee capacity
should return False to skip the rejection handling.

Returns:
True if rejection protocol should be used, False otherwise.
"""
return True

@property
def _event_loop(self) -> asyncio.AbstractEventLoop:
if self._lazily_fetched_loop is None:
Expand Down Expand Up @@ -1126,11 +1164,7 @@ async def _choose_replicas_with_backoff(
)
else:
# Only backoff after the first retry.
backoff_s = min(
self.initial_backoff_s * self.backoff_multiplier**attempt,
self.max_backoff_s,
)
await asyncio.sleep(backoff_s)
await self._backoff(attempt)
attempt += 1
finally:
if entered_backoff:
Expand Down Expand Up @@ -1339,3 +1373,21 @@ def on_request_routed(
after a response is generated.
"""
pass

def on_request_completed(
self,
replica_id: ReplicaID,
internal_request_id: str,
):
"""Called when a request to a replica has completed.

This lifecycle hook is called after a request finishes (successfully or
with an error). It can be used by request routers that need to perform
cleanup after a request completes, such as releasing capacity tokens.

Args:
replica_id: The ID of the replica that handled the request.
internal_request_id: The internal unique identifier for the request
(from RequestMetadata.internal_request_id).
"""
pass
36 changes: 24 additions & 12 deletions python/ray/serve/_private/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,11 +734,16 @@ async def _resolve_request_arguments(
def _process_finished_request(
self,
replica_id: ReplicaID,
parent_request_id: str,
response_id: str,
internal_request_id: str,
result: Union[Any, RayError],
):
self._metrics_manager.dec_num_running_requests_for_replica(replica_id)
if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE:
self._metrics_manager.dec_num_running_requests_for_replica(replica_id)

# Notify request router that request completed (for cleanup, e.g., token release)
if self.request_router:
self.request_router.on_request_completed(replica_id, internal_request_id)

if isinstance(result, ActorDiedError):
# Replica has died but controller hasn't notified the router yet.
# Don't consider this replica for requests in the future, and retry
Expand Down Expand Up @@ -769,6 +774,7 @@ async def _route_and_send_request_once(
) -> Optional[ReplicaResult]:
result: Optional[ReplicaResult] = None
replica: Optional[RunningReplica] = None
callback_registered = False
try:
# Resolve request arguments BEFORE incrementing queued requests.
# This ensures that queue metrics reflect actual pending work,
Expand Down Expand Up @@ -797,18 +803,18 @@ async def _route_and_send_request_once(

# Keep track of requests that have been sent out to replicas
if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE:
_request_context = ray.serve.context._get_serve_request_context()
request_id: str = _request_context.request_id
self._metrics_manager.inc_num_running_requests_for_replica(
replica.replica_id
)
callback = partial(
self._process_finished_request,
replica.replica_id,
request_id,
response_id,
)
result.add_done_callback(callback)
# Always register callback to notify router when request completes
# (needed for token release in queue-based routing, metrics tracking, etc.)
callback = partial(
self._process_finished_request,
replica.replica_id,
pr.metadata.internal_request_id,
)
result.add_done_callback(callback)
callback_registered = True

if not with_rejection:
return result
Expand Down Expand Up @@ -846,6 +852,12 @@ async def _route_and_send_request_once(
if replica is not None:
self.request_router.on_replica_actor_unavailable(replica.replica_id)
logger.warning(f"{replica.replica_id} is temporarily unavailable.")
finally:
# Only release if callback wasn't registered (callback handles release).
if replica is not None and not callback_registered:
self.request_router.on_request_completed(
replica.replica_id, pr.metadata.internal_request_id
)

return None

Expand Down
Loading