Skip to content

Commit 57fe4ae

Browse files
wxtimMetRonnieoliver-sanders
authored
Don't record satisfied wall clock triggers in the DB (#5923)
Don't record satisfied xtriggers in the DB * Closes #5911 * Test that no wall_clock xtriggers are saved to the DB for a retry. --------- Co-authored-by: Ronnie Dutta <[email protected]> Co-authored-by: Oliver Sanders <[email protected]>
1 parent f4e2853 commit 57fe4ae

File tree

3 files changed

+147
-12
lines changed

3 files changed

+147
-12
lines changed

cylc/flow/workflow_db_mgr.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -405,9 +405,10 @@ def put_task_event_timers(self, task_events_mgr):
405405
def put_xtriggers(self, sat_xtrig):
406406
"""Put statements to update external triggers table."""
407407
for sig, res in sat_xtrig.items():
408-
self.db_inserts_map[self.TABLE_XTRIGGERS].append({
409-
"signature": sig,
410-
"results": json.dumps(res)})
408+
if not sig.startswith('wall_clock('):
409+
self.db_inserts_map[self.TABLE_XTRIGGERS].append({
410+
"signature": sig,
411+
"results": json.dumps(res)})
411412

412413
def put_update_task_state(self, itask):
413414
"""Update task_states table for current state of itask.

tests/integration/conftest.py

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
from pathlib import Path
2121
import pytest
2222
from shutil import rmtree
23+
from time import time
2324
from typing import List, TYPE_CHECKING, Set, Tuple, Union
2425

2526
from cylc.flow.config import WorkflowConfig
2627
from cylc.flow.option_parsers import Options
27-
from cylc.flow.network.client import WorkflowRuntimeClient
2828
from cylc.flow.pathutil import get_cylc_run_dir
2929
from cylc.flow.rundb import CylcWorkflowDAO
3030
from cylc.flow.scripts.validate import ValidateOptions
@@ -34,6 +34,7 @@
3434
)
3535
from cylc.flow.wallclock import get_current_time_string
3636
from cylc.flow.workflow_files import infer_latest_run_from_id
37+
from cylc.flow.workflow_status import StopMode
3738

3839
from .utils import _rm_if_empty
3940
from .utils.flow_tools import (
@@ -47,6 +48,7 @@
4748
if TYPE_CHECKING:
4849
from cylc.flow.scheduler import Scheduler
4950
from cylc.flow.task_proxy import TaskProxy
51+
from cylc.flow.network.client import WorkflowRuntimeClient
5052

5153

5254
InstallOpts = Options(install_gop())
@@ -323,7 +325,7 @@ def _inner(
323325
def gql_query():
324326
"""Execute a GraphQL query given a workflow runtime client."""
325327
async def _gql_query(
326-
client: WorkflowRuntimeClient, query_str: str
328+
client: 'WorkflowRuntimeClient', query_str: str
327329
) -> object:
328330
ret = await client.async_request(
329331
'graphql', {
@@ -473,3 +475,83 @@ def _inner(source, **kwargs):
473475
workflow_id = infer_latest_run_from_id(workflow_id)
474476
return workflow_id
475477
yield _inner
478+
479+
480+
@pytest.fixture
481+
def complete():
482+
"""Wait for the workflow, or tasks within it to complete.
483+
484+
Args:
485+
schd:
486+
The scheduler to await.
487+
tokens_list:
488+
If specified, this will wait for the tasks represented by these
489+
tokens to be marked as completed by the task pool.
490+
stop_mode:
491+
If tokens_list is not provided, this will wait for the scheduler
492+
to be shutdown with the specified mode (default = AUTO, i.e.
493+
workflow completed normally).
494+
timeout:
495+
Max time to wait for the condition to be met.
496+
497+
Note, if you need to increase this, you might want to rethink your
498+
test.
499+
500+
Note, use this timeout rather than wrapping the complete call with
501+
async_timeout (handles shutdown logic more cleanly).
502+
503+
"""
504+
async def _complete(
505+
schd,
506+
*tokens_list,
507+
stop_mode=StopMode.AUTO,
508+
timeout=60,
509+
):
510+
start_time = time()
511+
tokens_list = [tokens.task for tokens in tokens_list]
512+
513+
# capture task completion
514+
remove_if_complete = schd.pool.remove_if_complete
515+
516+
def _remove_if_complete(itask):
517+
ret = remove_if_complete(itask)
518+
if ret and itask.tokens.task in tokens_list:
519+
tokens_list.remove(itask.tokens.task)
520+
return ret
521+
522+
schd.pool.remove_if_complete = _remove_if_complete
523+
524+
# capture workflow shutdown
525+
set_stop = schd._set_stop
526+
has_shutdown = False
527+
528+
def _set_stop(mode=None):
529+
nonlocal has_shutdown, stop_mode
530+
if mode == stop_mode:
531+
has_shutdown = True
532+
return set_stop(mode)
533+
else:
534+
set_stop(mode)
535+
raise Exception(f'Workflow bailed with stop mode = {mode}')
536+
537+
schd._set_stop = _set_stop
538+
539+
# determine the completion condition
540+
if tokens_list:
541+
condition = lambda: bool(tokens_list)
542+
else:
543+
condition = lambda: bool(not has_shutdown)
544+
545+
# wait for the condition to be met
546+
while condition():
547+
# allow the main loop to advance
548+
await asyncio.sleep(0)
549+
if time() - start_time > timeout:
550+
raise Exception(
551+
f'Timeout waiting for {", ".join(map(str, tokens_list))}'
552+
)
553+
554+
# restore regular shutdown logic
555+
schd._set_stop = set_stop
556+
557+
return _complete

tests/integration/test_workflow_db_mgr.py

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@
1414
# You should have received a copy of the GNU General Public License
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616

17+
import asyncio
1718
import pytest
1819
import sqlite3
20+
from typing import TYPE_CHECKING
1921

20-
from cylc.flow.scheduler import Scheduler
22+
from cylc.flow.cycling.iso8601 import ISO8601Point
23+
24+
if TYPE_CHECKING:
25+
from cylc.flow.scheduler import Scheduler
2126

2227

2328
async def test_restart_number(
@@ -29,7 +34,7 @@ async def test_restart_number(
2934
async def test(expected_restart_num: int, do_reload: bool = False):
3035
"""(Re)start the workflow and check the restart number is as expected.
3136
"""
32-
schd: Scheduler = scheduler(id_, paused_start=True)
37+
schd: 'Scheduler' = scheduler(id_, paused_start=True)
3338
async with start(schd) as log:
3439
if do_reload:
3540
schd.command_reload_workflow()
@@ -52,7 +57,7 @@ async def test(expected_restart_num: int, do_reload: bool = False):
5257
await test(expected_restart_num=3)
5358

5459

55-
def db_remove_column(schd: Scheduler, table: str, column: str) -> None:
60+
def db_remove_column(schd: 'Scheduler', table: str, column: str) -> None:
5661
"""Remove a column from a scheduler DB table.
5762
5863
ALTER TABLE DROP COLUMN is not supported by sqlite yet, so we have to copy
@@ -82,23 +87,23 @@ async def test_db_upgrade_pre_803(
8287
id_ = flow(one_conf)
8388

8489
# Run a scheduler to create a DB.
85-
schd: Scheduler = scheduler(id_, paused_start=True)
90+
schd: 'Scheduler' = scheduler(id_, paused_start=True)
8691
async with start(schd):
8792
assert ('n_restart', '0') in db_select(schd, False, 'workflow_params')
8893

8994
# Remove task_states:is_manual_submit to fake a pre-8.0.3 DB.
9095
db_remove_column(schd, "task_states", "is_manual_submit")
9196
db_remove_column(schd, "task_jobs", "flow_nums")
9297

93-
schd: Scheduler = scheduler(id_, paused_start=True)
98+
schd: 'Scheduler' = scheduler(id_, paused_start=True)
9499

95100
# Restart should fail due to the missing column.
96101
with pytest.raises(sqlite3.OperationalError):
97102
async with start(schd):
98103
pass
99104
assert ('n_restart', '1') in db_select(schd, False, 'workflow_params')
100105

101-
schd: Scheduler = scheduler(id_, paused_start=True)
106+
schd: 'Scheduler' = scheduler(id_, paused_start=True)
102107

103108
# Run the DB upgrader for version 8.0.2
104109
# (8.0.2 requires upgrade)
@@ -117,7 +122,7 @@ async def test_workflow_param_rapid_toggle(
117122
118123
https://github.com/cylc/cylc-flow/issues/5593
119124
"""
120-
schd: Scheduler = scheduler(flow(one_conf), paused_start=False)
125+
schd: 'Scheduler' = scheduler(flow(one_conf), paused_start=False)
121126
async with run(schd):
122127
assert schd.is_paused is False
123128
schd.pause_workflow()
@@ -127,3 +132,50 @@ async def test_workflow_param_rapid_toggle(
127132

128133
w_params = dict(schd.workflow_db_mgr.pri_dao.select_workflow_params())
129134
assert w_params['is_paused'] == '0'
135+
136+
137+
async def test_record_only_non_clock_triggers(
138+
flow, run, scheduler, complete, db_select
139+
):
140+
"""Database does not record wall_clock xtriggers.
141+
142+
https://github.com/cylc/cylc-flow/issues/5911
143+
144+
Includes:
145+
- Not in DB: A normal wall clock xtrigger (wall_clock).
146+
- In DB: An xrandom mis-labelled as wall_clock trigger DB).
147+
- Not in DB: An execution retry xtrigger.
148+
149+
@TODO: Refactor to use simulation mode to speedup after Simulation
150+
mode upgrade bugfixes: This should speed this test up considerably.
151+
"""
152+
rawpoint = '1348'
153+
id_ = flow({
154+
"scheduler": {
155+
'cycle point format': '%Y',
156+
'allow implicit tasks': True
157+
},
158+
"scheduling": {
159+
"initial cycle point": rawpoint,
160+
"xtriggers": {
161+
"another": "xrandom(100)",
162+
"wall_clock": "xrandom(100, _=Not a real wall clock trigger)",
163+
"real_wall_clock": "wall_clock()"
164+
},
165+
"graph": {
166+
"R1": """
167+
@another & @wall_clock & @real_wall_clock => foo
168+
@real_wall_clock => bar
169+
"""
170+
}
171+
},
172+
})
173+
schd = scheduler(id_, paused_start=False, run_mode='simulation')
174+
175+
async with run(schd):
176+
await complete(schd, timeout=20)
177+
178+
# Assert that (only) the real clock trigger is not in the db:
179+
assert db_select(schd, False, 'xtriggers', 'signature') == [
180+
('xrandom(100)',),
181+
('xrandom(100, _=Not a real wall clock trigger)',)]

0 commit comments

Comments
 (0)