|
18 | 18 | import asyncio
|
19 | 19 | from pathlib import Path
|
20 | 20 | from textwrap import dedent
|
| 21 | +from typing import cast, Iterable |
21 | 22 |
|
| 23 | +from cylc.flow.data_messages_pb2 import PbTaskProxy |
| 24 | +from cylc.flow.data_store_mgr import TASK_PROXIES |
22 | 25 | from cylc.flow.pathutil import get_workflow_run_dir
|
23 | 26 | from cylc.flow.scheduler import Scheduler
|
24 | 27 |
|
@@ -127,7 +130,7 @@ async def test_1_xtrigger_2_tasks(flow, start, scheduler, mocker):
|
127 | 130 | # loop doesn't run in this test.
|
128 | 131 |
|
129 | 132 |
|
130 |
| -async def test_xtriggers_restart(flow, start, scheduler, db_select): |
| 133 | +async def test_xtriggers_restart(flow, start, scheduler, db_select, capsys): |
131 | 134 | """It should write xtrigger results to the DB and load them on restart."""
|
132 | 135 | # define a workflow which uses a custom xtrigger
|
133 | 136 | id_ = flow({
|
@@ -186,6 +189,25 @@ def mytrig(*args, **kwargs):
|
186 | 189 | # (so no xtriggers should be scheduled to run)
|
187 | 190 | assert len(schd.proc_pool.queuings) + len(schd.proc_pool.runnings) == 0
|
188 | 191 |
|
| 192 | + # the xtrigger should be recorded as satisfied in the datastore task |
| 193 | + # instance, after the restart |
| 194 | + await schd.update_data_structure() |
| 195 | + [xtrig] = [ |
| 196 | + p |
| 197 | + for t in cast( |
| 198 | + 'Iterable[PbTaskProxy]', |
| 199 | + schd.data_store_mgr.data[ |
| 200 | + schd.data_store_mgr.workflow_id |
| 201 | + ][ |
| 202 | + TASK_PROXIES |
| 203 | + ].values() |
| 204 | + ) |
| 205 | + for p in t.xtriggers.values() |
| 206 | + ] |
| 207 | + assert xtrig.id == "mytrig()" |
| 208 | + assert xtrig.label == "mytrig" |
| 209 | + assert xtrig.satisfied |
| 210 | + |
189 | 211 | # check the DB to ensure no additional entries have been created
|
190 | 212 | assert db_select(schd, True, 'xtriggers') == db_xtriggers
|
191 | 213 |
|
|
0 commit comments