Skip to content

Commit 660c46a

Browse files
authored
Merge pull request #6721 from hjoliver/fix-xtrigger-restart-datastore
Update datastore xtriggers on restart.
2 parents 46f933f + 605c816 commit 660c46a

File tree

3 files changed

+26
-1
lines changed

3 files changed

+26
-1
lines changed

changes.d/6721.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed a bug that could mark satisfied xtriggers as unsatisfied after a restart, in task queries.

cylc/flow/xtrigger_mgr.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,8 @@ def load_xtrigger_for_restart(self, row_idx: int, row: Tuple[str, str]):
574574
LOG.info("LOADING satisfied xtriggers")
575575
sig, results = row
576576
self.sat_xtrig[sig] = json.loads(results)
577+
# Tell the datastore this xtrigger is satisfied.
578+
self.data_store_mgr.delta_task_xtrigger(sig, True)
577579

578580
def _get_xtrigs(self, itask: 'TaskProxy', unsat_only: bool = False,
579581
sigs_only: bool = False):

tests/integration/test_xtrigger_mgr.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
import asyncio
1919
from pathlib import Path
2020
from textwrap import dedent
21+
from typing import cast, Iterable
2122

23+
from cylc.flow.data_messages_pb2 import PbTaskProxy
24+
from cylc.flow.data_store_mgr import TASK_PROXIES
2225
from cylc.flow.pathutil import get_workflow_run_dir
2326
from cylc.flow.scheduler import Scheduler
2427

@@ -127,7 +130,7 @@ async def test_1_xtrigger_2_tasks(flow, start, scheduler, mocker):
127130
# loop doesn't run in this test.
128131

129132

130-
async def test_xtriggers_restart(flow, start, scheduler, db_select):
133+
async def test_xtriggers_restart(flow, start, scheduler, db_select, capsys):
131134
"""It should write xtrigger results to the DB and load them on restart."""
132135
# define a workflow which uses a custom xtrigger
133136
id_ = flow({
@@ -186,6 +189,25 @@ def mytrig(*args, **kwargs):
186189
# (so no xtriggers should be scheduled to run)
187190
assert len(schd.proc_pool.queuings) + len(schd.proc_pool.runnings) == 0
188191

192+
# the xtrigger should be recorded as satisfied in the datastore task
193+
# instance, after the restart
194+
await schd.update_data_structure()
195+
[xtrig] = [
196+
p
197+
for t in cast(
198+
'Iterable[PbTaskProxy]',
199+
schd.data_store_mgr.data[
200+
schd.data_store_mgr.workflow_id
201+
][
202+
TASK_PROXIES
203+
].values()
204+
)
205+
for p in t.xtriggers.values()
206+
]
207+
assert xtrig.id == "mytrig()"
208+
assert xtrig.label == "mytrig"
209+
assert xtrig.satisfied
210+
189211
# check the DB to ensure no additional entries have been created
190212
assert db_select(schd, True, 'xtriggers') == db_xtriggers
191213

0 commit comments

Comments
 (0)