Skip to content

Commit 978d0f6

Browse files
authored
Merge pull request #6138 from MetRonnie/xtrig-sequential
Fix xtrigger `sequential` arg validation & improve docs
2 parents f426bc4 + 0393a03 commit 978d0f6

File tree

8 files changed

+72
-38
lines changed

8 files changed

+72
-38
lines changed

cylc/flow/dbstatecheck.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def __init__(self, rund, workflow, db_path=None):
7272
if not os.path.exists(db_path):
7373
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), db_path)
7474

75-
self.conn = sqlite3.connect(db_path, timeout=10.0)
75+
self.conn: sqlite3.Connection = sqlite3.connect(db_path, timeout=10.0)
7676

7777
# Get workflow point format.
7878
try:
@@ -84,8 +84,17 @@ def __init__(self, rund, workflow, db_path=None):
8484
self.db_point_fmt = self._get_db_point_format_compat()
8585
self.c7_back_compat_mode = True
8686
except sqlite3.OperationalError:
87+
with suppress(Exception):
88+
self.conn.close()
8789
raise exc # original error
8890

91+
def __enter__(self):
92+
return self
93+
94+
def __exit__(self, exc_type, exc_value, traceback):
95+
"""Close DB connection when leaving context manager."""
96+
self.conn.close()
97+
8998
def adjust_point_to_db(self, cycle, offset):
9099
"""Adjust a cycle point (with offset) to the DB point format.
91100

cylc/flow/task_outputs.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -419,14 +419,10 @@ def get_completed_outputs(self) -> Dict[str, str]:
419419
Replace message with "forced" if the output was forced.
420420
421421
"""
422-
def _get_msg(message):
423-
if message in self._forced:
424-
return FORCED_COMPLETION_MSG
425-
else:
426-
return message
427-
428422
return {
429-
self._message_to_trigger[message]: _get_msg(message)
423+
self._message_to_trigger[message]: (
424+
FORCED_COMPLETION_MSG if message in self._forced else message
425+
)
430426
for message, is_completed in self._completed.items()
431427
if is_completed
432428
}

cylc/flow/xtrigger_mgr.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,11 +325,16 @@ def _handle_sequential_kwarg(
325325
)
326326
fctx.func_kwargs.setdefault('sequential', sequential_param.default)
327327

328-
elif 'sequential' in fctx.func_kwargs:
329-
# xtrig marked as sequential, so add 'sequential' arg to signature
330-
sig = add_kwarg_to_sig(
331-
sig, 'sequential', fctx.func_kwargs['sequential']
332-
)
328+
if 'sequential' in fctx.func_kwargs:
329+
# xtrig marked as sequential in function call
330+
value = fctx.func_kwargs['sequential']
331+
if not isinstance(value, bool):
332+
raise XtriggerConfigError(
333+
label, fctx.func_name,
334+
f"invalid argument 'sequential={value}' - must be boolean"
335+
)
336+
if not sequential_param:
337+
sig = add_kwarg_to_sig(sig, 'sequential', value)
333338
return sig
334339

335340
@staticmethod

cylc/flow/xtriggers/wall_clock.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ def wall_clock(offset: str = 'PT0S', sequential: bool = True):
3838
Wall-clock xtriggers are run sequentially by default.
3939
See :ref:`Sequential Xtriggers` for more details.
4040
41+
.. versionchanged:: 8.3.0
42+
43+
The ``sequential`` argument was added.
4144
"""
4245
# NOTE: This is just a placeholder for the actual implementation.
4346
# This is only used for validating the signature and for autodocs.

cylc/flow/xtriggers/workflow_state.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def workflow_state(
3636
3737
If the status or output has been achieved, return {True, result}.
3838
39-
Arg:
39+
Args:
4040
workflow_task_id:
4141
ID (workflow//point/task:selector) of the target task.
4242
offset:
@@ -62,6 +62,13 @@ def workflow_state(
6262
Dict of workflow, task, point, offset,
6363
status, message, trigger, flow_num, run_dir
6464
65+
.. versionchanged:: 8.3.0
66+
67+
The ``workflow_task_id`` argument was introduced to replace the
68+
separate ``workflow``, ``point``, ``task``, ``status``, and ``message``
69+
arguments (which are still supported for backwards compatibility).
70+
The ``flow_num`` argument was added. The ``cylc_run_dir`` argument
71+
was renamed to ``alt_cylc_run_dir``.
6572
"""
6673
poller = WorkflowPoller(
6774
workflow_task_id,

tests/integration/test_dbstatecheck.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,15 @@
2020
from asyncio import sleep
2121
import pytest
2222
from textwrap import dedent
23-
from typing import TYPE_CHECKING
2423

25-
from cylc.flow.dbstatecheck import CylcWorkflowDBChecker as Checker
26-
27-
28-
if TYPE_CHECKING:
29-
from cylc.flow.dbstatecheck import CylcWorkflowDBChecker
24+
from cylc.flow.dbstatecheck import CylcWorkflowDBChecker
25+
from cylc.flow.scheduler import Scheduler
3026

3127

3228
@pytest.fixture(scope='module')
3329
async def checker(
3430
mod_flow, mod_scheduler, mod_run, mod_complete
35-
) -> 'CylcWorkflowDBChecker':
31+
):
3632
"""Make a real world database.
3733
3834
We could just write the database manually but this is a better
@@ -53,17 +49,17 @@ async def checker(
5349
'output': {'outputs': {'trigger': 'message'}}
5450
}
5551
})
56-
schd = mod_scheduler(wid, paused_start=False)
52+
schd: Scheduler = mod_scheduler(wid, paused_start=False)
5753
async with mod_run(schd):
5854
await mod_complete(schd)
59-
schd.pool.force_trigger_tasks(['1000/good'], [2])
55+
schd.pool.force_trigger_tasks(['1000/good'], ['2'])
6056
# Allow a cycle of the main loop to pass so that flow 2 can be
6157
# added to db
6258
await sleep(1)
63-
yield Checker(
64-
'somestring', 'utterbunkum',
65-
schd.workflow_db_mgr.pub_path
66-
)
59+
with CylcWorkflowDBChecker(
60+
'somestring', 'utterbunkum', schd.workflow_db_mgr.pub_path
61+
) as _checker:
62+
yield _checker
6763

6864

6965
def test_basic(checker):

tests/integration/test_sequential_xtriggers.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,8 @@ async def test_sequential_arg_ok(
159159
assert len(list_cycles(schd)) == expected_num_cycles
160160

161161

162-
def test_sequential_arg_bad(
163-
flow, validate
164-
):
165-
"""Test validation of 'sequential' arg for custom xtriggers"""
162+
def test_sequential_arg_bad(flow, validate):
163+
"""Test validation of 'sequential' arg for custom xtrigger function def"""
166164
wid = flow({
167165
'scheduling': {
168166
'xtriggers': {
@@ -194,6 +192,27 @@ def xtrig2(x, sequential='True'):
194192
) in str(excinfo.value)
195193

196194

195+
def test_sequential_arg_bad2(flow, validate):
196+
"""Test validation of 'sequential' arg for xtrigger calls"""
197+
wid = flow({
198+
'scheduling': {
199+
'initial cycle point': '2000',
200+
'xtriggers': {
201+
'clock': 'wall_clock(sequential=3)',
202+
},
203+
'graph': {
204+
'R1': '@clock => foo',
205+
},
206+
},
207+
})
208+
209+
with pytest.raises(XtriggerConfigError) as excinfo:
210+
validate(wid)
211+
assert (
212+
"invalid argument 'sequential=3' - must be boolean"
213+
) in str(excinfo.value)
214+
215+
197216
@pytest.mark.parametrize('is_sequential', [True, False])
198217
async def test_any_sequential(flow, scheduler, start, is_sequential: bool):
199218
"""Test that a task is marked as sequential if any of its xtriggers are."""

tests/unit/test_db_compat.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -129,14 +129,13 @@ def test_cylc_7_db_wflow_params_table(_setup_db):
129129
rf'("cycle_point_format", "{ptformat}")'
130130
)
131131
db_file_name = _setup_db([create, insert])
132-
checker = CylcWorkflowDBChecker('foo', 'bar', db_path=db_file_name)
132+
with CylcWorkflowDBChecker('foo', 'bar', db_path=db_file_name) as checker:
133+
with pytest.raises(
134+
sqlite3.OperationalError, match="no such table: workflow_params"
135+
):
136+
checker._get_db_point_format()
133137

134-
with pytest.raises(
135-
sqlite3.OperationalError, match="no such table: workflow_params"
136-
):
137-
checker._get_db_point_format()
138-
139-
assert checker.db_point_fmt == ptformat
138+
assert checker.db_point_fmt == ptformat
140139

141140

142141
def test_pre_830_task_action_timers(_setup_db):

0 commit comments

Comments
 (0)