Skip to content

Commit 2c4efae

Browse files
authored
Ensure heartbeats after cancelation do not raise KeyErrors (#5053)
1 parent 07fe11d commit 2c4efae

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

distributed/scheduler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3947,7 +3947,9 @@ def heartbeat_worker(
39473947
ws._last_seen = local_now
39483948
if executing is not None:
39493949
ws._executing = {
3950-
parent._tasks[key]: duration for key, duration in executing.items()
3950+
parent._tasks[key]: duration
3951+
for key, duration in executing.items()
3952+
if key in parent._tasks
39513953
}
39523954

39533955
ws._metrics = metrics

distributed/tests/test_scheduler.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2936,3 +2936,47 @@ async def test_transition_counter(c, s, a, b):
29362936
assert s.transition_counter == 0
29372937
await c.submit(inc, 1)
29382938
assert s.transition_counter > 1
2939+
2940+
2941+
@pytest.mark.slow
2942+
@gen_cluster(
2943+
client=True,
2944+
nthreads=[("127.0.0.1", 1) for _ in range(10)],
2945+
# typical runtime just 2-3s but on CI this may increase significantly
2946+
timeout=60,
2947+
)
2948+
async def test_worker_heartbeat_after_cancel(c, s, *workers):
2949+
"""This test is intended to ensure that after cancelation of a graph, the
2950+
worker heartbeat is always successful. The hearbeat may not be successful if
2951+
the worker and scheduler state drift and the scheduler doesn't handle
2952+
unknown information gracefully. One example would be a released/cancelled
2953+
computation where the worker returns metrics about duration, type, etc. and
2954+
the scheduler doesn't handle the forgotten task gracefully.
2955+
2956+
Failures are not triggered reliably since the race conditions for this error
2957+
case are very hard to produce. Likelihood of failure increases with the
2958+
number of workers.
2959+
2960+
See also https://github.com/dask/distributed/issues/4587
2961+
"""
2962+
da = pytest.importorskip("dask.array")
2963+
for w in workers:
2964+
w.periodic_callbacks["heartbeat"].stop()
2965+
x = da.random.random((2000000, 100), chunks=(10000, None))
2966+
svd = da.linalg.svd(x)
2967+
2968+
futs = c.compute(svd)
2969+
2970+
while not s.tasks:
2971+
await asyncio.sleep(0.001)
2972+
2973+
while sum(w.executing_count for w in workers) < len(workers) / 2:
2974+
await asyncio.sleep(0.001)
2975+
2976+
await c.cancel(futs)
2977+
2978+
while s.tasks:
2979+
await asyncio.sleep(0.001)
2980+
2981+
while any(w.tasks for w in workers):
2982+
await asyncio.gather(*[w.heartbeat() for w in workers])

0 commit comments

Comments
 (0)