|
16 | 16 |
|
17 | 17 | """Tests for Broadcast Manager."""
|
18 | 18 |
|
| 19 | +import asyncio |
19 | 20 |
|
20 | 21 | import pytest
|
| 22 | + |
| 23 | +from cylc.flow import commands |
21 | 24 | from cylc.flow.cycling.integer import IntegerInterval, IntegerPoint
|
22 | 25 | from cylc.flow.cycling.iso8601 import ISO8601Interval, ISO8601Point
|
| 26 | +from cylc.flow.task_state import TASK_STATUS_FAILED |
23 | 27 |
|
24 | 28 |
|
25 | 29 | async def test_reject_valid_broadcast_is_remote_clash_with_config(
|
@@ -212,3 +216,69 @@ async def test_broadcast_expiry_async(
|
212 | 216 |
|
213 | 217 | # no auto-expiry should take place
|
214 | 218 | assert expires == []
|
| 219 | + |
| 220 | + |
| 221 | +async def test_broadcast_old_cycle(flow, scheduler, run, complete): |
| 222 | + """It should not expire broadcasts whilst the scheduler is paused. |
| 223 | +
|
| 224 | + This tests the use case of broadcasting to a historical cycle (whilst the |
| 225 | + workflow is paused) before triggering it to run to ensure that the |
| 226 | + broadcast is not expired before the operator is able to run the trigger |
| 227 | + command. |
| 228 | +
|
| 229 | + For context, see https://github.com/cylc/cylc-flow/pull/6499 and |
| 230 | + https://github.com/cylc/cylc-flow/pull/6192#issuecomment-2486785465 |
| 231 | + """ |
| 232 | + id_ = flow({ |
| 233 | + 'scheduling': { |
| 234 | + 'initial cycle point': '1', |
| 235 | + 'cycling mode': 'integer', |
| 236 | + 'graph': { |
| 237 | + 'P1': 'a[-P1] => a', |
| 238 | + }, |
| 239 | + }, |
| 240 | + }) |
| 241 | + schd = scheduler(id_, paused_start=False) |
| 242 | + async with run(schd): |
| 243 | + # issue a broadcast into the first cycle |
| 244 | + schd.broadcast_mgr.put_broadcast( |
| 245 | + point_strings=['1'], |
| 246 | + namespaces=['a'], |
| 247 | + settings=[{'environment': {'ANSWER': '42'}}] |
| 248 | + ) |
| 249 | + assert list(schd.broadcast_mgr.broadcasts) == ['1'] |
| 250 | + |
| 251 | + # the broadcast should expire after the workflow passes cycle "3" |
| 252 | + await complete(schd, '3/a') |
| 253 | + assert list(schd.broadcast_mgr.broadcasts) == [] |
| 254 | + |
| 255 | + # pause the workflow |
| 256 | + await commands.run_cmd(commands.pause(schd)) |
| 257 | + |
| 258 | + # issue a broadcast into the first cycle (now behind the broadcast |
| 259 | + # expire point) |
| 260 | + schd.broadcast_mgr.put_broadcast( |
| 261 | + point_strings=['1'], |
| 262 | + namespaces=['a'], |
| 263 | + settings=[{'simulation': {'fail cycle points': '1'}}] |
| 264 | + ) |
| 265 | + |
| 266 | + # this should not be expired whilst the scheduler is paused |
| 267 | + await schd._main_loop() # run one iteration of the main loop |
| 268 | + assert list(schd.broadcast_mgr.broadcasts) == ['1'] |
| 269 | + |
| 270 | + # trigger the first cycle and resume the workflow |
| 271 | + await commands.run_cmd(commands.force_trigger_tasks(schd, ['1'], [])) |
| 272 | + await commands.run_cmd(commands.resume(schd)) |
| 273 | + |
| 274 | + # the broadcast should still be there |
| 275 | + await schd._main_loop() # run one iteration of the main loop |
| 276 | + assert list(schd.broadcast_mgr.broadcasts) == ['1'] |
| 277 | + |
| 278 | + # and should take effect |
| 279 | + a_1 = schd.pool._get_task_by_id('1/a') |
| 280 | + async with asyncio.timeout(5): |
| 281 | + while True: |
| 282 | + await asyncio.sleep(0.1) |
| 283 | + if a_1.state(TASK_STATUS_FAILED): |
| 284 | + break |
0 commit comments