Skip to content

Commit 995f29d

Browse files
authored
Merge pull request #67 from oliver-sanders/group-trigger-tests++
improve integration test
2 parents dcee55c + 10dd26b commit 995f29d

File tree

2 files changed

+98
-19
lines changed

2 files changed

+98
-19
lines changed

tests/integration/conftest.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,7 @@ async def _complete(
592592
*wait_tokens: Union[Tokens, str],
593593
stop_mode=StopMode.AUTO,
594594
timeout: int = 60,
595+
allow_paused: bool = False,
595596
) -> None:
596597
"""Wait for the workflow, or tasks within it to complete.
597598
@@ -615,9 +616,17 @@ async def _complete(
615616
616617
Note, use this timeout rather than wrapping the complete call with
617618
async_timeout (handles shutdown logic more cleanly).
619+
allow_paused:
620+
This function will raise an Exception if the scheduler is paused
621+
(because this usually means the sepecified tasks cannot complete)
622+
unless allow_paused==True.
623+
624+
Raises:
625+
AssertionError: In the event the scheduler shut down or the operation
626+
timed out.
618627
619628
"""
620-
if schd.is_paused:
629+
if schd.is_paused and not allow_paused:
621630
raise Exception("Cannot wait for completion of a paused scheduler")
622631

623632
start_time = time()
@@ -678,7 +687,7 @@ def done():
678687
msg += ", ".join(map(str, tokens_list))
679688
else:
680689
msg += "workflow to shut down"
681-
raise Exception(msg)
690+
raise AssertionError(msg)
682691

683692

684693
@pytest.fixture

tests/integration/test_force_trigger.py

Lines changed: 87 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,19 @@
1414
# You should have received a copy of the GNU General Public License
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616

17+
import logging
1718
from typing import (
1819
Any as Fixture,
1920
Callable
2021
)
2122

22-
import logging
23+
import pytest
2324

2425
from cylc.flow.commands import (
2526
force_trigger_tasks,
2627
reload_workflow,
2728
hold,
29+
resume,
2830
run_cmd,
2931
set_prereqs_and_outputs,
3032
)
@@ -99,12 +101,44 @@ async def test_trigger_workflow_paused(
99101
)
100102

101103

104+
async def test_trigger_group_whilst_paused(flow, scheduler, run, complete):
105+
"""Only group start-tasks should run whilst the scheduler is paused.
106+
107+
When multiple tasks are triggered, only tasks with no dependencies within
108+
the group should run whilst the scheduler is paused.
109+
110+
The remaining tasks will run according to normal prerequsitie satisfaction
111+
once the workflow is resumed.
112+
"""
113+
id_ = flow(
114+
{
115+
'scheduling': {
116+
'graph': {'R1': 'a => b => c => d'},
117+
},
118+
}
119+
)
120+
schd = scheduler(id_)
121+
async with run(schd):
122+
# trigger the chain a => c
123+
await run_cmd(force_trigger_tasks(schd, ['1/a'], ['all']))
124+
125+
# 1/a should run whilst the workflow is paused (group start-task)
126+
await complete(schd, '1/a', allow_paused=True, timeout=1)
127+
128+
# 1/b should *not* run whilst the workflow is paused
129+
with pytest.raises(AssertionError):
130+
await complete(schd, '1/b', allow_paused=True, timeout=2)
131+
132+
# 1/b and 1/c should run once the workflow is resumed
133+
await run_cmd(resume(schd))
134+
await complete(schd, '1/c')
135+
136+
102137
async def test_trigger_on_resume(
103138
flow: 'Fixture',
104139
scheduler: 'Fixture',
105140
start: 'Fixture',
106141
capture_submission: 'Fixture',
107-
log_filter: Callable
108142
):
109143
"""
110144
Test manual triggering on-resume option when the workflow is paused.
@@ -373,7 +407,23 @@ async def test_trigger_active_task_in_group(
373407
}
374408

375409

376-
async def test_trigger_group_in_flow(flow, scheduler, run, complete, reflog):
410+
async def test_trigger_group_in_flow(
411+
flow,
412+
scheduler,
413+
run,
414+
complete,
415+
reflog,
416+
db_select,
417+
):
418+
"""It should remove tasks from the triggered flow(s).
419+
420+
Tests the following statement from the proposal:
421+
422+
> Use the same flow numbers, as determined by the trigger command in the
423+
> usual way, throughout the operation
424+
>
425+
> -- https://cylc.github.io/cylc-admin/proposal-group-trigger.html#details
426+
"""
377427
id_ = flow({
378428
'scheduling': {
379429
'graph': {
@@ -383,29 +433,49 @@ async def test_trigger_group_in_flow(flow, scheduler, run, complete, reflog):
383433
})
384434
schd = scheduler(id_, paused_start=False)
385435
async with run(schd):
436+
# prevent shutdown after 1/c completes
386437
await run_cmd(hold(schd, ['1/d']))
387438

388-
triggers = reflog(schd)
439+
# run the chain, merge in flow "2" part way through
440+
triggers = reflog(schd, flow_nums=True)
389441
await complete(schd, '1/a')
390442
await run_cmd(force_trigger_tasks(schd, ['1/b'], ['2']))
391443
await complete(schd, '1/c')
392444
assert triggers == {
393-
('1/a', None),
394-
('1/b', ('1/a',)),
395-
('1/c', ('1/b',)),
445+
# (task, flow_nums, triggered_off_of)
446+
('1/a', '[1]', None),
447+
('1/b', '[1, 2]', ('1/a',)), # flow "2" merged in
448+
('1/c', '[1, 2]', ('1/b',)), # flow "2" merged in
396449
}
397450

398-
# state is now:
399-
# * a - succeeded flow=1
400-
# * b - succeeded flow=1,2
401-
# * c - succedded flow=1,2
402-
# * d - waiting (held)
403-
404-
triggers = reflog(schd)
451+
# re-run the chain in flow "2"
452+
triggers = reflog(schd, flow_nums=True)
405453
await run_cmd(force_trigger_tasks(schd, ['1/a', '1/b', '1/c'], ['2']))
406454
await complete(schd, '1/c', timeout=10)
407455
assert triggers == {
408-
('1/a', None),
409-
('1/b', ('1/a',)),
410-
('1/c', ('1/b',)),
456+
# (task, flow_nums, triggered_off_of)
457+
('1/a', '[2]', None),
458+
('1/b', '[2]', ('1/a',)),
459+
('1/c', '[2]', ('1/b',)),
460+
}
461+
462+
# ensure that flow "2" was removed from the tasks in the original run
463+
# by the group-trigger
464+
assert set(db_select(
465+
schd,
466+
True,
467+
'task_outputs',
468+
'name',
469+
'flow_nums',
470+
)) == {
471+
# original run
472+
('a', '[1]'),
473+
('b', '[1]'), # flow "2" has been removed
474+
('c', '[1]'), # flow "2" has been removed
475+
('d', '[1, 2]'),
476+
477+
# subsequent run
478+
('a', '[2]'),
479+
('b', '[2]'),
480+
('c', '[2]'),
411481
}

0 commit comments

Comments
 (0)