Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changes.d/6976.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
broadcast: Broadcasts to inactive historical cycles will no longer expire while
the scheduler is paused. This, combined with the newly extended broadcast
expiry limit removes the need for the "cylc trigger --on-resume" option.
22 changes: 12 additions & 10 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1682,16 +1682,18 @@ async def _main_loop(self) -> None:
self.reset_inactivity_timer()

# auto expire broadcasts
with suppress(TimePointDumperBoundsError):
# NOTE: TimePointDumperBoundsError will be raised for negative
# cycle points, we skip broadcast expiry in this circumstance
# (pre-initial condition)
if min_point := self.pool.get_min_point():
# NOTE: the broadcast expire limit is the oldest active cycle
# MINUS the longest cycling interval
self.broadcast_mgr.expire_broadcast(
min_point - self.config.interval_of_longest_sequence
)
if not self.is_paused:
# NOTE: Don't auto-expire broadcasts whilst the scheduler is paused
with suppress(TimePointDumperBoundsError):
# NOTE: TimePointDumperBoundsError will be raised for negative
# cycle points, we skip broadcast expiry in this circumstance
# (pre-initial condition)
if min_point := self.pool.get_min_point():
# NOTE: the broadcast expire limit is the oldest active
# cycle MINUS the longest cycling interval
self.broadcast_mgr.expire_broadcast(
min_point - self.config.interval_of_longest_sequence
)

self.late_tasks_check()

Expand Down
70 changes: 70 additions & 0 deletions tests/integration/test_broadcast_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@

"""Tests for Broadcast Manager."""

import asyncio

import pytest

from cylc.flow import commands
from cylc.flow.cycling.integer import IntegerInterval, IntegerPoint
from cylc.flow.cycling.iso8601 import ISO8601Interval, ISO8601Point
from cylc.flow.task_state import TASK_STATUS_FAILED


async def test_reject_valid_broadcast_is_remote_clash_with_config(
Expand Down Expand Up @@ -212,3 +216,69 @@ async def test_broadcast_expiry_async(

# no auto-expiry should take place
assert expires == []


async def test_broadcast_old_cycle(flow, scheduler, run, complete):
"""It should not expire broadcasts whilst the scheduler is paused.

This tests the use case of broadcasting to a historical cycle (whilst the
workflow is paused) before triggering it to run to ensure that the
broadcast is not expired before the operator is able to run the trigger
command.

For context, see https://github.com/cylc/cylc-flow/pull/6499 and
https://github.com/cylc/cylc-flow/pull/6192#issuecomment-2486785465
"""
id_ = flow({
'scheduling': {
'initial cycle point': '1',
'cycling mode': 'integer',
'graph': {
'P1': 'a[-P1] => a',
},
},
})
schd = scheduler(id_, paused_start=False)
async with run(schd):
# issue a broadcast into the first cycle
schd.broadcast_mgr.put_broadcast(
point_strings=['1'],
namespaces=['a'],
settings=[{'environment': {'ANSWER': '42'}}]
)
assert list(schd.broadcast_mgr.broadcasts) == ['1']

# the broadcast should expire after the workflow passes cycle "3"
await complete(schd, '3/a')
assert list(schd.broadcast_mgr.broadcasts) == []

# pause the workflow
await commands.run_cmd(commands.pause(schd))

# issue a broadcast into the first cycle (now behind the broadcast
# expire point)
schd.broadcast_mgr.put_broadcast(
point_strings=['1'],
namespaces=['a'],
settings=[{'simulation': {'fail cycle points': '1'}}]
)

# this should not be expired whilst the scheduler is paused
await schd._main_loop() # run one iteration of the main loop
assert list(schd.broadcast_mgr.broadcasts) == ['1']

# trigger the first cycle and resume the workflow
await commands.run_cmd(commands.force_trigger_tasks(schd, ['1'], []))
await commands.run_cmd(commands.resume(schd))

# the broadcast should still be there
await schd._main_loop() # run one iteration of the main loop
assert list(schd.broadcast_mgr.broadcasts) == ['1']

# and should take effect
a_1 = schd.pool._get_task_by_id('1/a')
async with asyncio.timeout(5):
while True:
await asyncio.sleep(0.1)
if a_1.state(TASK_STATUS_FAILED):
break
Loading