Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -2274,6 +2274,12 @@ def __init__(
# time we checked.
self._request_routing_info_updated = False

# Dirty flag: set when replicas transition state (start, stop, health
# check fail, migration) or when availability-related fields change.
# When False *and* _request_routing_info_updated is False, the
# broadcast_running_replicas_if_changed() method can skip all work.
self._replicas_changed = True

self._last_broadcasted_running_replica_infos: List[RunningReplicaInfo] = []
self._last_broadcasted_availability: bool = True
self._last_broadcasted_deployment_config = None
Expand Down Expand Up @@ -2456,7 +2462,16 @@ def broadcast_running_replicas_if_changed(self) -> None:

The set will also be broadcast if any replicas have an updated set of
multiplexed model IDs.

Uses a dirty flag (_replicas_changed) to skip all work in steady state
when no replicas have transitioned and no routing info has been updated.
RunningReplicaInfo objects are only constructed when a broadcast may
actually be needed.
"""
# Fast path: nothing could have changed, skip entirely.
if not self._replicas_changed and not self._request_routing_info_updated:
return

running_replica_infos = self.get_running_replica_infos()
is_available = not self._terminally_failed()

Expand All @@ -2466,6 +2481,10 @@ def broadcast_running_replicas_if_changed(self) -> None:
or self._request_routing_info_updated
)
availability_changed = is_available != self._last_broadcasted_availability

# Clear the dirty flag now that we've done the comparison.
self._replicas_changed = False

if not running_replicas_changed and not availability_changed:
return

Expand Down Expand Up @@ -2661,6 +2680,7 @@ def deploy(self, deployment_info: DeploymentInfo) -> bool:
)
self._replica_constructor_retry_counter = 0
self._replica_has_started = False
self._replicas_changed = True
return True

def autoscale(self, decision_num_replicas: int) -> bool:
Expand Down Expand Up @@ -2787,6 +2807,7 @@ def _stop_or_update_outdated_version_replicas(self, max_to_stop=math.inf) -> boo
self._target_state.version
):
replicas_changed = True
self._replicas_changed = True
# Get current rank for the replica
current_rank = self._rank_manager.get_replica_rank(
replica.replica_id.unique_id
Expand All @@ -2796,6 +2817,7 @@ def _stop_or_update_outdated_version_replicas(self, max_to_stop=math.inf) -> boo
)
if actor_updating:
self._replicas.add(ReplicaState.UPDATING, replica)
self._replicas_changed = True
else:
self._replicas.add(ReplicaState.RUNNING, replica)
# We don't allow going from STARTING, PENDING_MIGRATION to UPDATING.
Expand Down Expand Up @@ -2966,6 +2988,8 @@ def check_curr_status(self) -> Tuple[bool, bool]:
# leave it to the controller to fully scale to target
# number of replicas and only return as completed once
# reached target replica count
if not self._replica_has_started:
self._replicas_changed = True
self._replica_has_started = True
elif self._replica_startup_failing():
self._curr_status_info = self._curr_status_info.handle_transition(
Expand Down Expand Up @@ -3039,6 +3063,7 @@ def _check_startup_replicas(
# This replica should be now be added to handle's replica
# set.
self._replicas.add(ReplicaState.RUNNING, replica)
self._replicas_changed = True
self._deployment_scheduler.on_replica_running(
replica.replica_id, replica.actor_node_id
)
Expand Down Expand Up @@ -3124,8 +3149,10 @@ def record_replica_startup_failure(self, error_msg: str):
if self._target_state.target_num_replicas == 0:
return

# Increase startup failure counter
# Increase startup failure counter (may change _terminally_failed()
# result, which affects broadcasted availability).
self._replica_constructor_retry_counter += 1
self._replicas_changed = True
self._replica_constructor_error_msg = error_msg

# Update the deployment message only if replicas are failing during
Expand Down Expand Up @@ -3162,6 +3189,7 @@ def _stop_replica(self, replica: DeploymentReplica, graceful_stop=True):
logger.debug(f"Adding STOPPING to replica: {replica.replica_id}.")
replica.stop(graceful=graceful_stop)
self._replicas.add(ReplicaState.STOPPING, replica)
self._replicas_changed = True
self._deployment_scheduler.on_replica_stopping(replica.replica_id)
self.health_check_gauge.set(
0,
Expand Down Expand Up @@ -3513,6 +3541,7 @@ def _stop_one_running_replica_for_testing(self):
replica_to_stop = running_replicas.pop()
replica_to_stop.stop(graceful=False)
self._replicas.add(ReplicaState.STOPPING, replica_to_stop)
self._replicas_changed = True
for replica in running_replicas:
self._replicas.add(ReplicaState.RUNNING, replica)

Expand Down
186 changes: 186 additions & 0 deletions python/ray/serve/tests/unit/test_deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -5948,5 +5948,191 @@ def test_only_considers_replicas_matching_target_version(
assert ds.get_outbound_deployments() == [d3]


def test_broadcast_skips_work_when_replicas_unchanged(mock_deployment_state_manager):
"""Test that broadcast_running_replicas_if_changed() skips all work in
steady state when _replicas_changed is False and
_request_routing_info_updated is False."""
create_dsm, _, _, _ = mock_deployment_state_manager
dsm: DeploymentStateManager = create_dsm()

info_1, v1 = deployment_info()
dsm.deploy(TEST_DEPLOYMENT_ID, info_1)
ds = dsm._deployment_states[TEST_DEPLOYMENT_ID]

# _replicas_changed should be True after deploy.
assert ds._replicas_changed is True

# Bring deployment to healthy state.
dsm.update()
ds._replicas.get()[0]._actor.set_ready()
dsm.update()
check_counts(ds, total=1, by_state=[(ReplicaState.RUNNING, 1, None)])
assert ds.curr_status_info.status == DeploymentStatus.HEALTHY

# After update(), the broadcast should have cleared the flag.
assert ds._replicas_changed is False
assert ds._request_routing_info_updated is False

# Call broadcast again — should be a no-op (early return).
# We verify by patching get_running_replica_infos; if the fast path
# works, the patched method will NOT be called.
with patch.object(ds, "get_running_replica_infos") as mock_get_infos:
ds.broadcast_running_replicas_if_changed()
mock_get_infos.assert_not_called()

# Now stop a replica (sets _replicas_changed = True).
ds._stop_one_running_replica_for_testing()
assert ds._replicas_changed is True

# broadcast should now do the full check.
ds.broadcast_running_replicas_if_changed()
# Flag should be cleared after the broadcast.
assert ds._replicas_changed is False


def test_broadcast_runs_when_routing_info_updated(mock_deployment_state_manager):
"""Test that broadcast_running_replicas_if_changed() runs when
_request_routing_info_updated is True even if _replicas_changed is False."""
create_dsm, _, _, _ = mock_deployment_state_manager
dsm: DeploymentStateManager = create_dsm()

info_1, v1 = deployment_info()
dsm.deploy(TEST_DEPLOYMENT_ID, info_1)
ds = dsm._deployment_states[TEST_DEPLOYMENT_ID]

# Bring deployment to healthy state.
dsm.update()
ds._replicas.get()[0]._actor.set_ready()
dsm.update()
assert ds._replicas_changed is False

# Simulate routing info update.
ds._request_routing_info_updated = True

# broadcast should NOT take the fast path.
with patch.object(
ds, "get_running_replica_infos", wraps=ds.get_running_replica_infos
) as mock_get_infos:
ds.broadcast_running_replicas_if_changed()
mock_get_infos.assert_called_once()

# Both flags should be cleared after broadcast.
assert ds._replicas_changed is False
assert ds._request_routing_info_updated is False


def test_replicas_changed_flag_set_on_state_transitions(mock_deployment_state_manager):
"""Test that _replicas_changed is set correctly during replica state
transitions."""
create_dsm, _, _, _ = mock_deployment_state_manager
dsm: DeploymentStateManager = create_dsm()

info_1, v1 = deployment_info(num_replicas=2)
dsm.deploy(TEST_DEPLOYMENT_ID, info_1)
ds = dsm._deployment_states[TEST_DEPLOYMENT_ID]

# Flag should be True after deploy (_set_target_state sets it).
assert ds._replicas_changed is True

# After update, replicas are STARTING.
dsm.update()
check_counts(ds, total=2, by_state=[(ReplicaState.STARTING, 2, None)])

# broadcast clears the flag.
assert ds._replicas_changed is False

# Set replicas ready — this transitions them to RUNNING in _check_startup_replicas.
for r in ds._replicas.get():
r._actor.set_ready()
dsm.update()
check_counts(ds, total=2, by_state=[(ReplicaState.RUNNING, 2, None)])

# Flag should be cleared again after broadcast.
assert ds._replicas_changed is False

# Now fail a health check (sets _replicas_changed via _stop_replica).
ds._replicas.get()[0]._actor.set_unhealthy()
dsm.update()
# Flag set by _stop_replica then cleared by broadcast.
assert ds._replicas_changed is False
# Verify a replica was stopped.
assert ds._replicas.count(states=[ReplicaState.STOPPING]) >= 1


def test_replicas_changed_flag_set_on_lightweight_broadcast_config_update(
mock_deployment_state_manager,
):
"""Regression test: when a config change requires a long-poll broadcast
(e.g. max_ongoing_requests changed) but does NOT require an actor restart
or reconfigure, the _replicas_changed flag must still be set by the
requires_long_poll_broadcast path so the broadcast is not skipped.

This guards against a future scenario where a broadcast-affecting field
is changed to a lighter update type that doesn't trigger actor_updating.
"""
create_dsm, _, _, _ = mock_deployment_state_manager
dsm: DeploymentStateManager = create_dsm()

# Deploy v1 and bring to healthy steady state.
b_info_1, v1 = deployment_info(version="1")
dsm.deploy(TEST_DEPLOYMENT_ID, b_info_1)
ds = dsm._deployment_states[TEST_DEPLOYMENT_ID]

dsm.update()
ds._replicas.get()[0]._actor.set_ready()
dsm.update()
check_counts(ds, total=1, by_state=[(ReplicaState.RUNNING, 1, v1)])
assert ds.curr_status_info.status == DeploymentStatus.HEALTHY
assert ds._replicas_changed is False

# Deploy v2 with a different max_ongoing_requests.
b_info_2, v2 = deployment_info(version="1", max_ongoing_requests=42)
dsm.deploy(TEST_DEPLOYMENT_ID, b_info_2)

# _set_target_state also sets _replicas_changed. Clear it so we can
# isolate whether _stop_or_update_outdated_version_replicas sets the
# flag via the requires_long_poll_broadcast path.
ds._replicas_changed = False

# Patch the running replica's mock actor so reconfigure() returns False
# (simulating a broadcast-needed but no-actor-update scenario).
replica = ds._replicas.get()[0]
original_reconfigure = replica._actor.reconfigure

def patched_reconfigure(version, rank=None):
# Perform the version/rank bookkeeping but report no actor update.
original_reconfigure(version, rank=rank)
return False

replica._actor.reconfigure = patched_reconfigure

# Confirm preconditions: the version change requires a broadcast.
assert v1.requires_long_poll_broadcast(v2)

# Directly call _stop_or_update_outdated_version_replicas (the method
# that checks requires_long_poll_broadcast) so we can inspect the flag
# before broadcast clears it.
ds._stop_or_update_outdated_version_replicas()

# The replica should stay RUNNING (no actor restart, no UPDATING state)
# because our patched reconfigure() returns False.
check_counts(ds, total=1, by_state=[(ReplicaState.RUNNING, 1, v2)])

# Key assertion: _replicas_changed was set by the
# requires_long_poll_broadcast path, NOT by actor_updating.
assert ds._replicas_changed is True

# Now broadcast and verify it fires (clearing the flag).
ds.broadcast_running_replicas_if_changed()
assert ds._replicas_changed is False
ds._long_poll_host.notify_changed.assert_called()

# Verify the fast path works: no further broadcast on next tick.
ds._long_poll_host.notify_changed.reset_mock()
with patch.object(ds, "get_running_replica_infos") as mock_get_infos:
ds.broadcast_running_replicas_if_changed()
mock_get_infos.assert_not_called()


if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))