Skip to content

Commit 80f06f2

Browse files
hjoliveroliver-sanderswxtim
authored
xtrigger efficiency fix. (#5908)
xtrigger efficiency fix. * Update the DB xtriggers table when the xtriggers get satisfied, not when the tasks that depend on them get satisfied. * Don't delete xtriggers table before every update. --------- Co-authored-by: Oliver Sanders <[email protected]> Co-authored-by: Tim Pillinger <[email protected]>
1 parent 2ec98dd commit 80f06f2

File tree

10 files changed

+115
-116
lines changed

10 files changed

+115
-116
lines changed

changes.d/5908.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed bug causing redundant DB updates when many tasks depend on the same xtrigger.

cylc/flow/rundb.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,8 @@ class CylcWorkflowDAO:
298298
["prereq_output", {"is_primary_key": True}],
299299
["satisfied"],
300300
],
301+
# The xtriggers table holds the function signature and result of
302+
# already-satisfied (the scheduler no longer needs to call them).
301303
TABLE_XTRIGGERS: [
302304
["signature", {"is_primary_key": True}],
303305
["results"],

cylc/flow/scheduler.py

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ async def initialise(self):
377377
self.workflow,
378378
user=self.owner,
379379
broadcast_mgr=self.broadcast_mgr,
380+
workflow_db_mgr=self.workflow_db_mgr,
380381
data_store_mgr=self.data_store_mgr,
381382
proc_pool=self.proc_pool,
382383
workflow_run_dir=self.workflow_run_dir,
@@ -1705,14 +1706,8 @@ async def main_loop(self) -> None:
17051706
await self.process_command_queue()
17061707
self.proc_pool.process()
17071708

1708-
# Tasks in the main pool that are waiting but not queued must be
1709-
# waiting on external dependencies, i.e. xtriggers or ext_triggers.
1710-
# For these tasks, call any unsatisfied xtrigger functions, and
1711-
# queue tasks that have become ready. (Tasks do not appear in the
1712-
# main pool at all until all other-task deps are satisfied, and are
1713-
# queued immediately on release from runahead limiting if they are
1714-
# not waiting on external deps).
1715-
housekeep_xtriggers = False
1709+
# Unqueued tasks with satisfied prerequisites must be waiting on
1710+
# xtriggers or ext_triggers. Check these and queue tasks if ready.
17161711
for itask in self.pool.get_tasks():
17171712
if (
17181713
not itask.state(TASK_STATUS_WAITING)
@@ -1725,28 +1720,19 @@ async def main_loop(self) -> None:
17251720
itask.state.xtriggers
17261721
and not itask.state.xtriggers_all_satisfied()
17271722
):
1728-
# Call unsatisfied xtriggers if not already in-process.
1729-
# Results are returned asynchronously.
17301723
self.xtrigger_mgr.call_xtriggers_async(itask)
1731-
# Check for satisfied xtriggers, and queue if ready.
1732-
if self.xtrigger_mgr.check_xtriggers(
1733-
itask, self.workflow_db_mgr.put_xtriggers):
1734-
housekeep_xtriggers = True
1735-
if all(itask.is_ready_to_run()):
1736-
self.pool.queue_task(itask)
1737-
1738-
# Check for satisfied ext_triggers, and queue if ready.
1724+
17391725
if (
17401726
itask.state.external_triggers
17411727
and not itask.state.external_triggers_all_satisfied()
1742-
and self.broadcast_mgr.check_ext_triggers(
1743-
itask, self.ext_trigger_queue)
1744-
and all(itask.is_ready_to_run())
17451728
):
1729+
self.broadcast_mgr.check_ext_triggers(
1730+
itask, self.ext_trigger_queue)
1731+
1732+
if all(itask.is_ready_to_run()):
17461733
self.pool.queue_task(itask)
17471734

1748-
if housekeep_xtriggers:
1749-
# (Could do this periodically?)
1735+
if self.xtrigger_mgr.do_housekeeping:
17501736
self.xtrigger_mgr.housekeep(self.pool.get_tasks())
17511737

17521738
self.pool.set_expired_tasks()

cylc/flow/workflow_db_mgr.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,6 @@ def put_task_event_timers(self, task_events_mgr):
404404

405405
def put_xtriggers(self, sat_xtrig):
406406
"""Put statements to update external triggers table."""
407-
self.db_deletes_map[self.TABLE_XTRIGGERS].append({})
408407
for sig, res in sat_xtrig.items():
409408
self.db_inserts_map[self.TABLE_XTRIGGERS].append({
410409
"signature": sig,

cylc/flow/xtrigger_mgr.py

Lines changed: 45 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,22 @@
2020
import re
2121
from copy import deepcopy
2222
from time import time
23-
from typing import Any, Dict, List, Optional, Tuple, Callable
23+
from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING
2424

2525
from cylc.flow import LOG
2626
from cylc.flow.exceptions import XtriggerConfigError
2727
import cylc.flow.flags
2828
from cylc.flow.hostuserutil import get_user
29+
from cylc.flow.subprocpool import get_func
2930
from cylc.flow.xtriggers.wall_clock import wall_clock
3031

31-
from cylc.flow.subprocctx import SubFuncContext
32-
from cylc.flow.broadcast_mgr import BroadcastMgr
33-
from cylc.flow.data_store_mgr import DataStoreMgr
34-
from cylc.flow.subprocpool import SubProcPool
35-
from cylc.flow.task_proxy import TaskProxy
36-
from cylc.flow.subprocpool import get_func
32+
if TYPE_CHECKING:
33+
from cylc.flow.broadcast_mgr import BroadcastMgr
34+
from cylc.flow.data_store_mgr import DataStoreMgr
35+
from cylc.flow.subprocctx import SubFuncContext
36+
from cylc.flow.subprocpool import SubProcPool
37+
from cylc.flow.task_proxy import TaskProxy
38+
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
3739

3840

3941
class TemplateVariables(Enum):
@@ -185,6 +187,7 @@ class XtriggerManager:
185187
Args:
186188
workflow: workflow name
187189
user: workflow owner
190+
workflow_db_mgr: the DB Manager
188191
broadcast_mgr: the Broadcast Manager
189192
proc_pool: pool of Subprocesses
190193
workflow_run_dir: workflow run directory
@@ -195,9 +198,10 @@ class XtriggerManager:
195198
def __init__(
196199
self,
197200
workflow: str,
198-
broadcast_mgr: BroadcastMgr,
199-
data_store_mgr: DataStoreMgr,
200-
proc_pool: SubProcPool,
201+
broadcast_mgr: 'BroadcastMgr',
202+
workflow_db_mgr: 'WorkflowDatabaseManager',
203+
data_store_mgr: 'DataStoreMgr',
204+
proc_pool: 'SubProcPool',
201205
user: Optional[str] = None,
202206
workflow_run_dir: Optional[str] = None,
203207
workflow_share_dir: Optional[str] = None,
@@ -230,11 +234,15 @@ def __init__(
230234
}
231235

232236
self.proc_pool = proc_pool
237+
self.workflow_db_mgr = workflow_db_mgr
233238
self.broadcast_mgr = broadcast_mgr
234239
self.data_store_mgr = data_store_mgr
240+
self.do_housekeeping = False
235241

236242
@staticmethod
237-
def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None:
243+
def validate_xtrigger(
244+
label: str, fctx: 'SubFuncContext', fdir: str
245+
) -> None:
238246
"""Validate an Xtrigger function.
239247
240248
Args:
@@ -305,7 +313,7 @@ def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None:
305313
f' {", ".join(t.value for t in deprecated_variables)}'
306314
)
307315

308-
def add_trig(self, label: str, fctx: SubFuncContext, fdir: str) -> None:
316+
def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None:
309317
"""Add a new xtrigger function.
310318
311319
Check the xtrigger function exists here (e.g. during validation).
@@ -334,7 +342,7 @@ def load_xtrigger_for_restart(self, row_idx: int, row: Tuple[str, str]):
334342
sig, results = row
335343
self.sat_xtrig[sig] = json.loads(results)
336344

337-
def _get_xtrigs(self, itask: TaskProxy, unsat_only: bool = False,
345+
def _get_xtrigs(self, itask: 'TaskProxy', unsat_only: bool = False,
338346
sigs_only: bool = False):
339347
"""(Internal helper method.)
340348
@@ -361,7 +369,9 @@ def _get_xtrigs(self, itask: TaskProxy, unsat_only: bool = False,
361369
res.append((label, sig, ctx, satisfied))
362370
return res
363371

364-
def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext:
372+
def get_xtrig_ctx(
373+
self, itask: 'TaskProxy', label: str
374+
) -> 'SubFuncContext':
365375
"""Get a real function context from the template.
366376
367377
Args:
@@ -412,7 +422,7 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext:
412422
ctx.update_command(self.workflow_run_dir)
413423
return ctx
414424

415-
def call_xtriggers_async(self, itask: TaskProxy):
425+
def call_xtriggers_async(self, itask: 'TaskProxy'):
416426
"""Call itask's xtrigger functions via the process pool...
417427
418428
...if previous call not still in-process and retry period is up.
@@ -421,16 +431,23 @@ def call_xtriggers_async(self, itask: TaskProxy):
421431
itask: task proxy to check.
422432
"""
423433
for label, sig, ctx, _ in self._get_xtrigs(itask, unsat_only=True):
434+
# Special case: quick synchronous clock check:
424435
if sig.startswith("wall_clock"):
425-
# Special case: quick synchronous clock check.
426-
if wall_clock(*ctx.func_args, **ctx.func_kwargs):
436+
if sig in self.sat_xtrig:
437+
# Already satisfied, just update the task
438+
itask.state.xtriggers[label] = True
439+
elif wall_clock(*ctx.func_args, **ctx.func_kwargs):
440+
# Newly satisfied
427441
itask.state.xtriggers[label] = True
428442
self.sat_xtrig[sig] = {}
429443
self.data_store_mgr.delta_task_xtrigger(sig, True)
444+
self.workflow_db_mgr.put_xtriggers({sig: {}})
430445
LOG.info('xtrigger satisfied: %s = %s', label, sig)
446+
self.do_housekeeping = True
431447
continue
432448
# General case: potentially slow asynchronous function call.
433449
if sig in self.sat_xtrig:
450+
# Already satisfied, just update the task
434451
if not itask.state.xtriggers[label]:
435452
itask.state.xtriggers[label] = True
436453
res = {}
@@ -445,6 +462,8 @@ def call_xtriggers_async(self, itask: TaskProxy):
445462
xtrigger_env
446463
)
447464
continue
465+
466+
# Call the function to check the unsatisfied xtrigger.
448467
if sig in self.active:
449468
# Already waiting on this result.
450469
continue
@@ -457,8 +476,10 @@ def call_xtriggers_async(self, itask: TaskProxy):
457476
self.active.append(sig)
458477
self.proc_pool.put_command(ctx, callback=self.callback)
459478

460-
def housekeep(self, itasks: List[TaskProxy]):
461-
"""Delete satisfied xtriggers no longer needed by any task.
479+
def housekeep(self, itasks):
480+
"""Forget satisfied xtriggers no longer needed by any task.
481+
482+
Check self.do_housekeeping before calling this method.
462483
463484
Args:
464485
itasks: list of all task proxies.
@@ -469,8 +490,9 @@ def housekeep(self, itasks: List[TaskProxy]):
469490
for sig in list(self.sat_xtrig):
470491
if sig not in all_xtrig:
471492
del self.sat_xtrig[sig]
493+
self.do_housekeeping = False
472494

473-
def callback(self, ctx: SubFuncContext):
495+
def callback(self, ctx: 'SubFuncContext'):
474496
"""Callback for asynchronous xtrigger functions.
475497
476498
Record satisfaction status and function results dict.
@@ -489,23 +511,9 @@ def callback(self, ctx: SubFuncContext):
489511
return
490512
LOG.debug('%s: returned %s', sig, results)
491513
if satisfied:
514+
# Newly satisfied
492515
self.data_store_mgr.delta_task_xtrigger(sig, True)
516+
self.workflow_db_mgr.put_xtriggers({sig: results})
493517
LOG.info('xtrigger satisfied: %s = %s', ctx.label, sig)
494518
self.sat_xtrig[sig] = results
495-
496-
def check_xtriggers(
497-
self,
498-
itask: TaskProxy,
499-
db_update_func: Callable[[dict], None]) -> bool:
500-
"""Check if all of itasks' xtriggers have become satisfied.
501-
502-
Return True if satisfied, else False
503-
504-
Args:
505-
itasks: task proxies to check
506-
db_update_func: method to update xtriggers in the DB
507-
"""
508-
if itask.state.xtriggers_all_satisfied():
509-
db_update_func(self.sat_xtrig)
510-
return True
511-
return False
519+
self.do_housekeeping = True

setup.cfg

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ tests =
119119
pytest-asyncio>=0.17,!=0.23.*
120120
pytest-cov>=2.8.0
121121
pytest-xdist>=2
122+
pytest-env>=0.6.2
123+
pytest-mock>=3.7
122124
pytest>=6
123125
testfixtures>=6.11.0
124126
towncrier>=23

tests/functional/xtriggers/02-persistence/flow.cylc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
[scheduling]
44
initial cycle point = 2010
55
final cycle point = 2011
6+
runahead limit = P0
67
[[xtriggers]]
78
x1 = faker(name="bob")
89
[[graph]]

tests/integration/test_xtrigger_mgr.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
"""Tests for the behaviour of xtrigger manager.
1717
"""
1818

19+
from pytest_mock import mocker
1920

2021
async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
2122
"""Test that if an itask has 2 wall_clock triggers with different
@@ -65,3 +66,56 @@ async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
6566
'clock_2': False,
6667
'clock_3': False,
6768
}
69+
70+
71+
async def test_1_xtrigger_2_tasks(flow, start, scheduler, monkeypatch, mocker):
72+
"""
73+
If multiple tasks depend on the same satisfied xtrigger, the DB mgr method
74+
put_xtriggers should only be called once - when the xtrigger gets satisfied.
75+
76+
See [GitHub #5908](https://github.com/cylc/cylc-flow/pull/5908)
77+
78+
"""
79+
task_point = 1588636800 # 2020-05-05
80+
ten_years_ahead = 1904169600 # 2030-05-05
81+
monkeypatch.setattr(
82+
'cylc.flow.xtriggers.wall_clock.time',
83+
lambda: ten_years_ahead - 1
84+
)
85+
id_ = flow({
86+
'scheduler': {
87+
'allow implicit tasks': True
88+
},
89+
'scheduling': {
90+
'initial cycle point': '2020-05-05',
91+
'xtriggers': {
92+
'clock_1': 'wall_clock()',
93+
},
94+
'graph': {
95+
'R1': '@clock_1 => foo & bar'
96+
}
97+
}
98+
})
99+
100+
schd = scheduler(id_)
101+
spy = mocker.spy(schd.workflow_db_mgr, 'put_xtriggers')
102+
103+
async with start(schd):
104+
105+
# Call the clock trigger via its dependent tasks, to get it satisfied.
106+
for task in schd.pool.get_tasks():
107+
# (For clock triggers this is synchronous)
108+
schd.xtrigger_mgr.call_xtriggers_async(task)
109+
110+
# It should now be satisfied.
111+
assert task.state.xtriggers == {'clock_1': True}
112+
113+
# Check one put_xtriggers call only, not two.
114+
assert spy.call_count == 1
115+
116+
# Note on master prior to GH #5908 the call is made from the
117+
# scheduler main loop when the two tasks become satisified,
118+
# resulting in two calls to put_xtriggers. This test fails
119+
# on master, but with call count 0 (not 2) because the main
120+
# loop doesn't run in this test.
121+

tests/unit/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ def xtrigger_mgr() -> XtriggerManager:
199199
workflow=workflow_name,
200200
user=user,
201201
proc_pool=Mock(put_command=lambda *a, **k: True),
202+
workflow_db_mgr=Mock(housekeep=lambda *a, **k: True),
202203
broadcast_mgr=Mock(put_broadcast=lambda *a, **k: True),
203204
data_store_mgr=DataStoreMgr(
204205
create_autospec(Scheduler, workflow=workflow_name, owner=user)

0 commit comments

Comments
 (0)