Skip to content

Commit 6954221

Browse files
authored
Merge pull request #6940 from MetRonnie/crash
Fix scheduler crash after broadcasting `run mode = skip`
2 parents 03b2dde + 52d3a86 commit 6954221

File tree

3 files changed

+63
-18
lines changed

3 files changed

+63
-18
lines changed

changes.d/6940.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed a scheduler crash that could occur after re-running a task in skip mode.

cylc/flow/task_events_mgr.py

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ def _get_remote_conf(self, itask, key):
548548
itask.platform[key]
549549
)
550550

551-
def _get_workflow_platforms_conf(self, itask, key):
551+
def _get_workflow_platforms_conf(self, itask: 'TaskProxy', key: str):
552552
"""Return top level [runtime] items that default to platforms."""
553553
overrides = self.broadcast_mgr.get_broadcast(itask.tokens)
554554
return (
@@ -1844,7 +1844,7 @@ def _get_handler_template_variables(
18441844
}
18451845
# fmt: on
18461846

1847-
def _reset_job_timers(self, itask):
1847+
def _reset_job_timers(self, itask: 'TaskProxy'):
18481848
"""Set up poll timer and timeout for task."""
18491849

18501850
if itask.transient:
@@ -1898,18 +1898,18 @@ def _reset_job_timers(self, itask):
18981898
itask.poll_timer = TaskActionTimer(ctx=ctx, delays=delays)
18991899
# Log timeout and polling schedule
19001900
message = f"health: {timeout_key}={timeout_str}"
1901-
# Attempt to group identical consecutive delays as N*DELAY,...
19021901
if itask.poll_timer.delays:
1903-
items = [] # [(number of item - 1, item), ...]
1902+
# Group identical consecutive delays as N*DELAY,...
1903+
items: List[List[float]] = [] # [[number of item, item], ...]
19041904
for delay in itask.poll_timer.delays:
19051905
if items and items[-1][1] == delay:
19061906
items[-1][0] += 1
19071907
else:
1908-
items.append([0, delay])
1908+
items.append([1, delay])
19091909
message += ', polling intervals='
19101910
for num, item in items:
1911-
if num:
1912-
message += '%d*' % (num + 1)
1911+
if num > 1:
1912+
message += f'{num}*'
19131913
message += '%s,' % intvl_as_str(item)
19141914
message += '...'
19151915
LOG.debug(f"[{itask}] {message}")
@@ -1920,7 +1920,7 @@ def _reset_job_timers(self, itask):
19201920
def process_execution_polling_intervals(
19211921
polling_intervals: List[float],
19221922
time_limit: float,
1923-
time_limit_polling_intervals: List[float]
1923+
time_limit_polling_intervals: Optional[List[float]]
19241924
) -> List[float]:
19251925
"""Create a list of polling times.
19261926
@@ -1952,6 +1952,11 @@ def process_execution_polling_intervals(
19521952
>>> this([], 10, [5])
19531953
[15, 5]
19541954
1955+
# There are no execution time limit polling intervals set - just
1956+
# repeat the execution polling interval until the time limit:
1957+
>>> this([10], 25, None)
1958+
[10, 10]
1959+
19551960
# We have a list of execution time limit polling intervals,
19561961
>>> this([10], 25, [5, 6, 7, 8])
19571962
[10, 10, 10, 6, 7, 8]
@@ -1968,17 +1973,19 @@ def process_execution_polling_intervals(
19681973
size = int((time_limit - sum(delays)) / delays[-1])
19691974
delays.extend([delays[-1]] * size)
19701975

1971-
# After the last delay before the execution time limit add the
1972-
# delay to get to the execution_time_limit
1973-
if len(time_limit_polling_intervals) == 1:
1974-
time_limit_polling_intervals.append(
1975-
time_limit_polling_intervals[0]
1976-
)
1977-
time_limit_polling_intervals[0] += time_limit - sum(delays)
1976+
if time_limit_polling_intervals:
1977+
# After the last delay before the execution time limit add the
1978+
# delay to get to the execution_time_limit
1979+
if len(time_limit_polling_intervals) == 1:
1980+
time_limit_polling_intervals.append(
1981+
time_limit_polling_intervals[0]
1982+
)
1983+
time_limit_polling_intervals[0] += time_limit - sum(delays)
1984+
1985+
# After the execution time limit, poll at the
1986+
# execution time limit polling intervals.
1987+
delays += time_limit_polling_intervals
19781988

1979-
# After the execution time limit poll at execution time limit polling
1980-
# intervals.
1981-
delays += time_limit_polling_intervals
19821989
return delays
19831990

19841991
def add_event_timer(self, id_key: EventKey, event_timer) -> None:

tests/integration/run_modes/test_skip.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717
"""
1818

1919
from cylc.flow.cycling.integer import IntegerPoint
20+
from cylc.flow.scheduler import Scheduler
21+
from cylc.flow.task_outputs import TASK_OUTPUT_FAILED
22+
from cylc.flow.task_state import (
23+
TASK_STATUS_FAILED,
24+
TASK_STATUS_SUCCEEDED,
25+
)
2026

2127

2228
async def test_settings_override_from_broadcast(
@@ -237,3 +243,34 @@ async def test_outputs_can_be_changed(
237243
['1'], ['one'], [{'skip': {'outputs': 'succeeded'}}]
238244
)
239245
schd.submit_task_jobs(schd.pool.get_tasks())
246+
247+
248+
async def test_rerun_after_skip_mode_broadcast(
249+
flow, one_conf, scheduler, start
250+
):
251+
"""Test re-running a task after it has been set to skip.
252+
253+
See https://github.com/cylc/cylc-flow/pull/6940
254+
"""
255+
id_ = flow({
256+
**one_conf,
257+
"runtime": {
258+
"one": {
259+
"execution time limit": "PT1M",
260+
},
261+
},
262+
})
263+
schd: Scheduler = scheduler(id_, run_mode='live')
264+
async with start(schd):
265+
itask = schd.pool.get_tasks()[0]
266+
schd.submit_task_jobs([itask])
267+
schd.task_events_mgr.process_message(
268+
itask, 'CRITICAL', TASK_OUTPUT_FAILED
269+
)
270+
assert itask.state(TASK_STATUS_FAILED)
271+
272+
schd.broadcast_mgr.put_broadcast(
273+
['1'], ['root'], [{'run mode': 'skip'}]
274+
)
275+
schd.submit_task_jobs([itask])
276+
assert itask.state(TASK_STATUS_SUCCEEDED)

0 commit comments

Comments
 (0)