diff --git a/changes.d/6976.feat.md b/changes.d/6976.feat.md new file mode 100644 index 0000000000..bf4bedef4c --- /dev/null +++ b/changes.d/6976.feat.md @@ -0,0 +1,3 @@ +broadcast: Broadcasts to inactive historical cycles will no longer expire while +the scheduler is paused. This, combined with the newly extended broadcast +expiry limit removes the need for the "cylc trigger --on-resume" option. diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index c63311990b..6731d95e98 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1682,16 +1682,20 @@ async def _main_loop(self) -> None: self.reset_inactivity_timer() # auto expire broadcasts - with suppress(TimePointDumperBoundsError): - # NOTE: TimePointDumperBoundsError will be raised for negative - # cycle points, we skip broadcast expiry in this circumstance - # (pre-initial condition) - if min_point := self.pool.get_min_point(): - # NOTE: the broadcast expire limit is the oldest active cycle - # MINUS the longest cycling interval - self.broadcast_mgr.expire_broadcast( - min_point - self.config.interval_of_longest_sequence - ) + if not self.is_paused: + # NOTE: Don't auto-expire broadcasts whilst the scheduler is paused. + # This allows broadcast-and-trigger beyond the expiry limit, by + # pausing before doing it (after which the expiry limit moves back). + with suppress(TimePointDumperBoundsError): + # NOTE: TimePointDumperBoundsError will be raised for negative + # cycle points, we skip broadcast expiry in this circumstance + # (pre-initial condition) + if min_point := self.pool.get_min_point(): + # NOTE: the broadcast expire limit is the oldest active + # cycle MINUS the longest cycling interval + self.broadcast_mgr.expire_broadcast( + min_point - self.config.interval_of_longest_sequence + ) self.late_tasks_check() diff --git a/tests/integration/test_broadcast_mgr.py b/tests/integration/test_broadcast_mgr.py index ae5e8aabde..0f60662fda 100644 --- a/tests/integration/test_broadcast_mgr.py +++ b/tests/integration/test_broadcast_mgr.py @@ -16,10 +16,14 @@ """Tests for Broadcast Manager.""" +import asyncio import pytest + +from cylc.flow import commands from cylc.flow.cycling.integer import IntegerInterval, IntegerPoint from cylc.flow.cycling.iso8601 import ISO8601Interval, ISO8601Point +from cylc.flow.task_state import TASK_STATUS_FAILED async def test_reject_valid_broadcast_is_remote_clash_with_config( @@ -212,3 +216,69 @@ async def test_broadcast_expiry_async( # no auto-expiry should take place assert expires == [] + + +async def test_broadcast_old_cycle(flow, scheduler, run, complete): + """It should not expire broadcasts whilst the scheduler is paused. + + This tests the use case of broadcasting to a historical cycle (whilst the + workflow is paused) before triggering it to run to ensure that the + broadcast is not expired before the operator is able to run the trigger + command. + + For context, see https://github.com/cylc/cylc-flow/pull/6499 and + https://github.com/cylc/cylc-flow/pull/6192#issuecomment-2486785465 + """ + id_ = flow({ + 'scheduling': { + 'initial cycle point': '1', + 'cycling mode': 'integer', + 'graph': { + 'P1': 'a[-P1] => a', + }, + }, + }) + schd = scheduler(id_, paused_start=False) + async with run(schd): + # issue a broadcast into the first cycle + schd.broadcast_mgr.put_broadcast( + point_strings=['1'], + namespaces=['a'], + settings=[{'environment': {'ANSWER': '42'}}] + ) + assert list(schd.broadcast_mgr.broadcasts) == ['1'] + + # the broadcast should expire after the workflow passes cycle "3" + await complete(schd, '3/a') + assert list(schd.broadcast_mgr.broadcasts) == [] + + # pause the workflow + await commands.run_cmd(commands.pause(schd)) + + # issue a broadcast into the first cycle (now behind the broadcast + # expire point) + schd.broadcast_mgr.put_broadcast( + point_strings=['1'], + namespaces=['a'], + settings=[{'simulation': {'fail cycle points': '1'}}] + ) + + # this should not be expired whilst the scheduler is paused + await schd._main_loop() # run one iteration of the main loop + assert list(schd.broadcast_mgr.broadcasts) == ['1'] + + # trigger the first cycle and resume the workflow + await commands.run_cmd(commands.force_trigger_tasks(schd, ['1'], [])) + await commands.run_cmd(commands.resume(schd)) + + # the broadcast should still be there + await schd._main_loop() # run one iteration of the main loop + assert list(schd.broadcast_mgr.broadcasts) == ['1'] + + # and should take effect + a_1 = schd.pool._get_task_by_id('1/a') + async with asyncio.timeout(5): + while True: + await asyncio.sleep(0.1) + if a_1.state(TASK_STATUS_FAILED): + break