Skip to content

Commit 52d3a86

Browse files
committed
Fix scheduler crash after broadcasting run mode = skip
1 parent 33fccbe commit 52d3a86

File tree

3 files changed

+55
-10
lines changed

3 files changed

+55
-10
lines changed

changes.d/6940.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed a scheduler crash that could occur after re-running a task in skip mode.

cylc/flow/task_events_mgr.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1952,6 +1952,11 @@ def process_execution_polling_intervals(
19521952
>>> this([], 10, [5])
19531953
[15, 5]
19541954
1955+
# There are no execution time limit polling intervals set - just
1956+
# repeat the execution polling interval until the time limit:
1957+
>>> this([10], 25, None)
1958+
[10, 10]
1959+
19551960
# We have a list of execution time limit polling intervals,
19561961
>>> this([10], 25, [5, 6, 7, 8])
19571962
[10, 10, 10, 6, 7, 8]
@@ -1968,17 +1973,19 @@ def process_execution_polling_intervals(
19681973
size = int((time_limit - sum(delays)) / delays[-1])
19691974
delays.extend([delays[-1]] * size)
19701975

1971-
# After the last delay before the execution time limit add the
1972-
# delay to get to the execution_time_limit
1973-
if len(time_limit_polling_intervals) == 1:
1974-
time_limit_polling_intervals.append(
1975-
time_limit_polling_intervals[0]
1976-
)
1977-
time_limit_polling_intervals[0] += time_limit - sum(delays)
1976+
if time_limit_polling_intervals:
1977+
# After the last delay before the execution time limit add the
1978+
# delay to get to the execution_time_limit
1979+
if len(time_limit_polling_intervals) == 1:
1980+
time_limit_polling_intervals.append(
1981+
time_limit_polling_intervals[0]
1982+
)
1983+
time_limit_polling_intervals[0] += time_limit - sum(delays)
1984+
1985+
# After the execution time limit, poll at the
1986+
# execution time limit polling intervals.
1987+
delays += time_limit_polling_intervals
19781988

1979-
# After the execution time limit poll at execution time limit polling
1980-
# intervals.
1981-
delays += time_limit_polling_intervals
19821989
return delays
19831990

19841991
def add_event_timer(self, id_key: EventKey, event_timer) -> None:

tests/integration/run_modes/test_skip.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717
"""
1818

1919
from cylc.flow.cycling.integer import IntegerPoint
20+
from cylc.flow.scheduler import Scheduler
21+
from cylc.flow.task_outputs import TASK_OUTPUT_FAILED
22+
from cylc.flow.task_state import (
23+
TASK_STATUS_FAILED,
24+
TASK_STATUS_SUCCEEDED,
25+
)
2026

2127

2228
async def test_settings_override_from_broadcast(
@@ -237,3 +243,34 @@ async def test_outputs_can_be_changed(
237243
['1'], ['one'], [{'skip': {'outputs': 'succeeded'}}]
238244
)
239245
schd.submit_task_jobs(schd.pool.get_tasks())
246+
247+
248+
async def test_rerun_after_skip_mode_broadcast(
249+
flow, one_conf, scheduler, start
250+
):
251+
"""Test re-running a task after it has been set to skip.
252+
253+
See https://github.com/cylc/cylc-flow/pull/6940
254+
"""
255+
id_ = flow({
256+
**one_conf,
257+
"runtime": {
258+
"one": {
259+
"execution time limit": "PT1M",
260+
},
261+
},
262+
})
263+
schd: Scheduler = scheduler(id_, run_mode='live')
264+
async with start(schd):
265+
itask = schd.pool.get_tasks()[0]
266+
schd.submit_task_jobs([itask])
267+
schd.task_events_mgr.process_message(
268+
itask, 'CRITICAL', TASK_OUTPUT_FAILED
269+
)
270+
assert itask.state(TASK_STATUS_FAILED)
271+
272+
schd.broadcast_mgr.put_broadcast(
273+
['1'], ['root'], [{'run mode': 'skip'}]
274+
)
275+
schd.submit_task_jobs([itask])
276+
assert itask.state(TASK_STATUS_SUCCEEDED)

0 commit comments

Comments
 (0)