Skip to content

Commit 6398c2c

Browse files
author
yicheng
committed
[core] Call SendPendingTasks after cancelling queued actor task
Signed-off-by: yicheng <[email protected]>
1 parent 44be7c5 commit 6398c2c

File tree

2 files changed

+54
-0
lines changed

2 files changed

+54
-0
lines changed

python/ray/tests/test_actor_cancel.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,5 +597,57 @@ def is_canceled(self):
597597
assert not ray.get(actor.is_canceled.remote())
598598

599599

600+
def test_cancel_head_unblocks_queue(shutdown_only):
601+
"""
602+
Test that when the head task of an actor's queue is cancelled,
603+
subsequent tasks with resolved dependencies can proceed.
604+
605+
Scenario:
606+
- Task A depends on a long-running task (unresolved dependency)
607+
- Task B has no dependencies (resolved immediately)
608+
- Task B is queued behind Task A
609+
- Cancel Task A
610+
- Task B should now execute
611+
"""
612+
613+
@ray.remote
614+
class Actor:
615+
def process(self, data):
616+
return f"processed: {data}"
617+
618+
@ray.remote
619+
def long_running_task():
620+
time.sleep(3600)
621+
622+
ray.init()
623+
actor = Actor.remote()
624+
625+
# Task A with unresolved dependency
626+
ref_long = long_running_task.remote()
627+
ref_a = actor.process.remote(ref_long)
628+
629+
# Task B with no dependencies, queued behind A
630+
ref_b = actor.process.remote("ready_value")
631+
632+
# Wait for Task B's dependency resolution to complete. After resolution,
633+
# Task B won't trigger SendPendingTasks again (it only happens in the
634+
# resolution callback). So if CancelTask doesn't call SendPendingTasks
635+
# properly, Task B will remain stuck in the queue forever.
636+
# TODO(Yicheng-Lu-llll): Use a deterministic approach if an API becomes
637+
# available to query whether a task's dependencies are resolved while
638+
# still queued on the client side.
639+
time.sleep(1)
640+
641+
# Cancel Task A
642+
ray.cancel(ref_a)
643+
644+
# Task B should complete
645+
result = ray.get(ref_b, timeout=5)
646+
assert result == "processed: ready_value"
647+
648+
# Cleanup
649+
ray.cancel(ref_long, force=True)
650+
651+
600652
if __name__ == "__main__":
601653
sys.exit(pytest.main(["-sv", __file__]))

src/ray/core_worker/task_submission/actor_task_submitter.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -990,6 +990,8 @@ void ActorTaskSubmitter::CancelTask(TaskSpecification task_spec, bool recursive)
990990
RAY_LOG(DEBUG).WithField(task_id)
991991
<< "Task was queued. Mark a task is canceled from a queue.";
992992
queue->second.actor_submit_queue_->MarkTaskCanceled(send_pos);
993+
queue->second.cur_pending_calls_--;
994+
SendPendingTasks(actor_id);
993995
}
994996
}
995997

0 commit comments

Comments
 (0)