|
13 | 13 | #
|
14 | 14 | # You should have received a copy of the GNU General Public License
|
15 | 15 | # along with this program. If not, see <http://www.gnu.org/licenses/>.
|
16 |
| -"""Tests for the behaviour of xtrigger manager. |
17 |
| -""" |
| 16 | +"""Tests for the behaviour of xtrigger manager.""" |
18 | 17 |
|
19 |
| -from pytest_mock import mocker |
| 18 | +import asyncio |
| 19 | +from pathlib import Path |
| 20 | +from textwrap import dedent |
| 21 | + |
| 22 | +from cylc.flow.pathutil import get_workflow_run_dir |
20 | 23 |
|
21 | 24 | async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
|
22 | 25 | """Test that if an itask has 2 wall_clock triggers with different
|
@@ -118,4 +121,69 @@ async def test_1_xtrigger_2_tasks(flow, start, scheduler, monkeypatch, mocker):
|
118 | 121 | # resulting in two calls to put_xtriggers. This test fails
|
119 | 122 | # on master, but with call count 0 (not 2) because the main
|
120 | 123 | # loop doesn't run in this test.
|
121 |
| - |
| 124 | + |
| 125 | + |
| 126 | +async def test_xtriggers_restart(flow, start, scheduler, db_select): |
| 127 | + """It should write xtrigger results to the DB and load them on restart.""" |
| 128 | + # define a workflow which uses a custom xtrigger |
| 129 | + id_ = flow({ |
| 130 | + 'scheduler': { |
| 131 | + 'allow implicit tasks': 'True' |
| 132 | + }, |
| 133 | + 'scheduling': { |
| 134 | + 'xtriggers': { |
| 135 | + 'mytrig': 'mytrig()' |
| 136 | + }, |
| 137 | + 'graph': { |
| 138 | + 'R1': '@mytrig => foo' |
| 139 | + }, |
| 140 | + } |
| 141 | + }) |
| 142 | + |
| 143 | + # add a custom xtrigger to the workflow |
| 144 | + run_dir = Path(get_workflow_run_dir(id_)) |
| 145 | + xtrig_dir = run_dir / 'lib/python' |
| 146 | + xtrig_dir.mkdir(parents=True) |
| 147 | + (xtrig_dir / 'mytrig.py').write_text(dedent(''' |
| 148 | + from random import random |
| 149 | +
|
| 150 | + def mytrig(*args, **kwargs): |
| 151 | + # return a different random number each time |
| 152 | + return True, {"x": str(random())} |
| 153 | + ''')) |
| 154 | + |
| 155 | + # start the workflow & run the xtrigger |
| 156 | + schd = scheduler(id_) |
| 157 | + async with start(schd): |
| 158 | + # run all xtriggers |
| 159 | + for task in schd.pool.get_tasks(): |
| 160 | + schd.xtrigger_mgr.call_xtriggers_async(task) |
| 161 | + # one xtrigger should have been scheduled to run |
| 162 | + assert len(schd.proc_pool.queuings) + len(schd.proc_pool.runnings) == 1 |
| 163 | + # wait for it to return |
| 164 | + for _ in range(50): |
| 165 | + await asyncio.sleep(0.1) |
| 166 | + schd.proc_pool.process() |
| 167 | + if len(schd.proc_pool.runnings) == 0: |
| 168 | + break |
| 169 | + else: |
| 170 | + raise Exception('Process pool did not clear') |
| 171 | + |
| 172 | + # the xtrigger should be written to the DB |
| 173 | + db_xtriggers = db_select(schd, True, 'xtriggers') |
| 174 | + assert len(db_xtriggers) == 1 |
| 175 | + assert db_xtriggers[0][0] == 'mytrig()' |
| 176 | + assert db_xtriggers[0][1].startswith('{"x":') |
| 177 | + |
| 178 | + # restart the workflow, the xtrigger should *not* run again |
| 179 | + schd = scheduler(id_) |
| 180 | + async with start(schd): |
| 181 | + # run all xtriggers |
| 182 | + for task in schd.pool.get_tasks(): |
| 183 | + schd.xtrigger_mgr.call_xtriggers_async(task) |
| 184 | + # the xtrigger should have been loaded from the DB |
| 185 | + # (so no xtriggers should be scheduled to run) |
| 186 | + assert len(schd.proc_pool.queuings) + len(schd.proc_pool.runnings) == 0 |
| 187 | + |
| 188 | + # check the DB to ensure no additional entries have been created |
| 189 | + assert db_select(schd, True, 'xtriggers') == db_xtriggers |
0 commit comments