Skip to content

Commit c43f137

Browse files
broadcast: suspend auto-expiry whilst scheduler paused
* Don't auto-expire broadcasts in a paused scheduler. * This allows historical cycles to be broadcasted and triggered. * Answering the remaining use case for the `--on-resume` option - cylc/cylc-ui#2299
1 parent a3b27b3 commit c43f137

File tree

3 files changed

+85
-10
lines changed

3 files changed

+85
-10
lines changed

changes.d/6976.feat.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
broadcast: Broadcasts to inactive historical cycles will no longer expire while
2+
the scheduler is paused. This, combined with the newly extended broadcast
3+
expiry limit removes the need for the "cylc trigger --on-resume" option.

cylc/flow/scheduler.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1682,16 +1682,18 @@ async def _main_loop(self) -> None:
16821682
self.reset_inactivity_timer()
16831683

16841684
# auto expire broadcasts
1685-
with suppress(TimePointDumperBoundsError):
1686-
# NOTE: TimePointDumperBoundsError will be raised for negative
1687-
# cycle points, we skip broadcast expiry in this circumstance
1688-
# (pre-initial condition)
1689-
if min_point := self.pool.get_min_point():
1690-
# NOTE: the broadcast expire limit is the oldest active cycle
1691-
# MINUS the longest cycling interval
1692-
self.broadcast_mgr.expire_broadcast(
1693-
min_point - self.config.interval_of_longest_sequence
1694-
)
1685+
if not self.is_paused:
1686+
# NOTE: Don't auto-expire broadcasts whilst the scheduler is paused
1687+
with suppress(TimePointDumperBoundsError):
1688+
# NOTE: TimePointDumperBoundsError will be raised for negative
1689+
# cycle points, we skip broadcast expiry in this circumstance
1690+
# (pre-initial condition)
1691+
if min_point := self.pool.get_min_point():
1692+
# NOTE: the broadcast expire limit is the oldest active
1693+
# cycle MINUS the longest cycling interval
1694+
self.broadcast_mgr.expire_broadcast(
1695+
min_point - self.config.interval_of_longest_sequence
1696+
)
16951697

16961698
self.late_tasks_check()
16971699

tests/integration/test_broadcast_mgr.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616

1717
"""Tests for Broadcast Manager."""
1818

19+
import asyncio
1920

2021
import pytest
22+
23+
from cylc.flow import commands
2124
from cylc.flow.cycling.integer import IntegerInterval, IntegerPoint
2225
from cylc.flow.cycling.iso8601 import ISO8601Interval, ISO8601Point
26+
from cylc.flow.task_state import TASK_STATUS_FAILED
2327

2428

2529
async def test_reject_valid_broadcast_is_remote_clash_with_config(
@@ -212,3 +216,69 @@ async def test_broadcast_expiry_async(
212216

213217
# no auto-expiry should take place
214218
assert expires == []
219+
220+
221+
async def test_broadcast_old_cycle(flow, scheduler, run, complete):
222+
"""It should not expire broadcasts whilst the scheduler is paused.
223+
224+
This tests the use case of broadcasting to a historical cycle (whilst the
225+
workflow is paused) before triggering it to run to ensure that the
226+
broadcast is not expired before the operator is able to run the trigger
227+
command.
228+
229+
For context, see https://github.com/cylc/cylc-flow/pull/6499 and
230+
https://github.com/cylc/cylc-flow/pull/6192#issuecomment-2486785465
231+
"""
232+
id_ = flow({
233+
'scheduling': {
234+
'initial cycle point': '1',
235+
'cycling mode': 'integer',
236+
'graph': {
237+
'P1': 'a[-P1] => a',
238+
},
239+
},
240+
})
241+
schd = scheduler(id_, paused_start=False)
242+
async with run(schd):
243+
# issue a broadcast into the first cycle
244+
schd.broadcast_mgr.put_broadcast(
245+
point_strings=['1'],
246+
namespaces=['a'],
247+
settings=[{'environment': {'ANSWER': '42'}}]
248+
)
249+
assert list(schd.broadcast_mgr.broadcasts) == ['1']
250+
251+
# the broadcast should expire after the workflow passes cycle "3"
252+
await complete(schd, '3/a')
253+
assert list(schd.broadcast_mgr.broadcasts) == []
254+
255+
# pause the workflow
256+
await commands.run_cmd(commands.pause(schd))
257+
258+
# issue a broadcast into the first cycle (now behind the broadcast
259+
# expire point)
260+
schd.broadcast_mgr.put_broadcast(
261+
point_strings=['1'],
262+
namespaces=['a'],
263+
settings=[{'simulation': {'fail cycle points': '1'}}]
264+
)
265+
266+
# this should not be expired whilst the scheduler is paused
267+
await schd._main_loop() # run one iteration of the main loop
268+
assert list(schd.broadcast_mgr.broadcasts) == ['1']
269+
270+
# trigger the first cycle and resume the workflow
271+
await commands.run_cmd(commands.force_trigger_tasks(schd, ['1'], []))
272+
await commands.run_cmd(commands.resume(schd))
273+
274+
# the broadcast should still be there
275+
await schd._main_loop() # run one iteration of the main loop
276+
assert list(schd.broadcast_mgr.broadcasts) == ['1']
277+
278+
# and should take effect
279+
a_1 = schd.pool._get_task_by_id('1/a')
280+
async with asyncio.timeout(5):
281+
while True:
282+
await asyncio.sleep(0.1)
283+
if a_1.state(TASK_STATUS_FAILED):
284+
break

0 commit comments

Comments
 (0)