Skip to content

Commit 7a58152

Browse files
authored
Merge pull request cylc#6835 from hjoliver/suicide-expire
Reimpliment suicide trigger as expire trigger
1 parent c08b3da commit 7a58152

File tree

6 files changed

+175
-12
lines changed

6 files changed

+175
-12
lines changed

changes.d/6835.feat.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
A new experimental feature that can be switched on in workflow config:
2+
Suicide triggers expire tasks rather than just remove them. This fixes
3+
a bug that could allow tasks to run after suicide triggering. The
4+
"expired" output will automatically be marked as optional for the
5+
task, but custom completion conditions must be adapted accordingly.

cylc/flow/cfgspec/workflow.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,40 @@ def get_script_common_text(this: str, example: Optional[str] = None):
416416
The default time zone is now ``Z`` instead of the local time of
417417
the first workflow start.
418418
''')
419+
with Conf('experimental', desc='''
420+
Activate experimental features.
421+
422+
These are preview features which will become the default in future
423+
releases.
424+
425+
.. versionadded:: 8.6.0
426+
'''):
427+
Conf('all', VDR.V_BOOLEAN, False, desc='''
428+
Activate all experimental features.
429+
430+
Encouraged for canary testing.
431+
432+
.. versionadded:: 8.6.0
433+
''')
434+
Conf('expire triggers', VDR.V_BOOLEAN, False, desc='''
435+
This reimplements "suicide triggers" as "expire triggers".
436+
437+
* When the condition is met, the task will generate the
438+
``expired`` output rather than just being removed.
439+
* The ``expired`` output will be marked as :term:`optional`
440+
for the triggered task, but a custom
441+
`flow.cylc[runtime][<namespace>]completion condition
442+
will need to be modified accordingly.
443+
* This should be functionally equivalent to "suicide triggers"
444+
in that the triggered task will not run.
445+
* However, the triggered task will now be left in the
446+
``expired`` state making it clearer in the GUI/logs that
447+
the task has been triggered in this way.
448+
* It is possible to trigger other tasks off of this ``expired``
449+
output for more advanced failure recovery.
450+
451+
.. versionadded:: 8.6.0
452+
''')
419453

420454
with Conf( # noqa: SIM117 (keep same format)
421455
'main loop',

cylc/flow/config.py

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import re
3535
from textwrap import wrap
3636
import traceback
37+
from types import SimpleNamespace
3738
from typing import (
3839
TYPE_CHECKING,
3940
Any,
@@ -461,6 +462,7 @@ def __init__(
461462
self.mem_log("config.py: after get(sparse=False)")
462463

463464
# These 2 must be called before call to init_cyclers(self.cfg):
465+
self.set_experimental_features()
464466
self.process_utc_mode()
465467
self.process_cycle_point_tz()
466468

@@ -614,6 +616,14 @@ def __init__(
614616

615617
skip_mode_validate(self.taskdefs)
616618

619+
def set_experimental_features(self):
620+
all_ = self.cfg['scheduler']['experimental']['all']
621+
self.experimental = SimpleNamespace(**{
622+
key.replace(' ', '_'): value or all_
623+
for key, value in self.cfg['scheduler']['experimental'].items()
624+
if key != 'all'
625+
})
626+
617627
@staticmethod
618628
def _warn_if_queues_have_implicit_tasks(
619629
config, taskdefs, max_warning_lines
@@ -1062,8 +1072,13 @@ def _set_completion_expressions(self):
10621072
for name, taskdef in self.taskdefs.items():
10631073
expr = taskdef.rtconfig['completion']
10641074
if expr:
1075+
any_suicide = any(
1076+
dep.suicide
1077+
for d in taskdef.dependencies.values()
1078+
for dep in d
1079+
)
10651080
# check the user-defined expression
1066-
self._check_completion_expression(name, expr)
1081+
self._check_completion_expression(name, expr, any_suicide)
10671082
else:
10681083
# derive a completion expression for this taskdef
10691084
expr = get_completion_expression(taskdef)
@@ -1086,14 +1101,18 @@ def _set_completion_expressions(self):
10861101
# on after the TaskDef has been created
10871102
taskdef.rtconfig['completion'] = expr
10881103

1089-
def _check_completion_expression(self, task_name: str, expr: str) -> None:
1104+
def _check_completion_expression(
1105+
self, task_name: str, expr: str, any_suicide: bool
1106+
) -> None:
10901107
"""Checks a user-defined completion expression.
10911108
10921109
Args:
10931110
task_name:
10941111
The name of the task we are checking.
10951112
expr:
10961113
The completion expression as defined in the config.
1114+
any_suicide:
1115+
Does this task have any suicide triggers
10971116
10981117
"""
10991118
# check completion expressions are not being used in compat mode
@@ -1242,12 +1261,21 @@ def _check_completion_expression(self, task_name: str, expr: str) -> None:
12421261
and expr_opt is None
12431262
and compvar in {'submit_failed', 'expired'}
12441263
):
1245-
raise WorkflowConfigError(
1264+
msg = (
12461265
f'{task_name}:{trigger} is permitted in the graph'
1247-
' but is not referenced in the completion'
1248-
' expression (so is not permitted by it).'
1249-
f'\nTry: completion = "{expr} or {compvar}"'
1266+
' but is not referenced in the completion.'
12501267
)
1268+
if (
1269+
any_suicide
1270+
and trigger == "expired"
1271+
and self.experimental.expire_triggers
1272+
):
1273+
msg += (
1274+
"\nThis may be due to use of an expire "
1275+
"(formerly suicide) trigger."
1276+
)
1277+
msg += f'\nTry: completion = "{expr} or {compvar}"'
1278+
raise WorkflowConfigError(msg)
12511279

12521280
if (
12531281
graph_opt is False
@@ -2334,7 +2362,8 @@ def load_graph(self):
23342362
parser = GraphParser(
23352363
family_map,
23362364
self.parameters,
2337-
task_output_opt=task_output_opt
2365+
task_output_opt=task_output_opt,
2366+
expire_triggers=self.experimental.expire_triggers,
23382367
)
23392368
parser.parse_graph(graph)
23402369
task_output_opt.update(parser.task_output_opt)

cylc/flow/graph_parser.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
import contextlib
2020

2121
from typing import (
22-
Set,
2322
Dict,
2423
List,
25-
Tuple,
2624
Optional,
25+
Set,
26+
Tuple,
2727
Union
2828
)
2929

@@ -264,7 +264,8 @@ def __init__(
264264
family_map: Optional[Dict[str, List[str]]] = None,
265265
parameters: Optional[Dict] = None,
266266
task_output_opt:
267-
Optional[Dict[Tuple[str, str], Tuple[bool, bool, bool]]] = None
267+
Optional[Dict[Tuple[str, str], Tuple[bool, bool, bool]]] = None,
268+
expire_triggers: bool = False,
268269
) -> None:
269270
"""Initialize the graph string parser.
270271
@@ -283,6 +284,7 @@ def __init__(
283284
self.triggers: Dict = {}
284285
self.original: Dict = {}
285286
self.workflow_state_polling_tasks: Dict = {}
287+
self.expire_triggers = expire_triggers
286288

287289
# Record task outputs as optional or required:
288290
# {(name, output): (is_optional, is_member)}
@@ -744,6 +746,10 @@ def _set_triggers(
744746
self.original.setdefault(name, {})
745747
self.original[name][expr] = orig_expr
746748

749+
if suicide and self.expire_triggers:
750+
# Make expiry optional for suicide triggered tasks.
751+
self._set_output_opt(name, TASK_OUTPUT_EXPIRED, True, False, False)
752+
747753
def _set_output_opt(
748754
self,
749755
name: str,

cylc/flow/task_pool.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1564,7 +1564,13 @@ def spawn_on_output(self, itask: TaskProxy, output: str) -> None:
15641564
suicide.append(t)
15651565

15661566
for c_task in suicide:
1567-
self.remove(c_task, self.__class__.SUICIDE_MSG)
1567+
if self.config.experimental.expire_triggers:
1568+
self.task_queue_mgr.remove_task(c_task)
1569+
self.task_events_mgr.process_message(
1570+
c_task, logging.WARNING, TASK_OUTPUT_EXPIRED
1571+
)
1572+
else:
1573+
self.remove(c_task, self.__class__.SUICIDE_MSG)
15681574

15691575
if suicide:
15701576
# Update DB now in case of very quick respawn attempt.
@@ -1850,7 +1856,7 @@ def spawn_task(
18501856
# revive as incomplete.
18511857
msg = "incomplete"
18521858

1853-
if cylc.flow.flags.verbosity >= 1:
1859+
if LOG.level <= logging.DEBUG:
18541860
# avoid unnecessary compute when we are not in debug mode
18551861
id_ = itask.tokens.duplicate(
18561862
task_sel=prev_status

tests/integration/test_task_pool.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import pytest
3131
from pytest import param
32+
import re
3233

3334
from cylc.flow import (
3435
CYLC_LOG,
@@ -38,6 +39,7 @@
3839
from cylc.flow.cycling.iso8601 import ISO8601Point
3940
from cylc.flow.data_messages_pb2 import PbPrerequisite
4041
from cylc.flow.data_store_mgr import TASK_PROXIES
42+
from cylc.flow.exceptions import WorkflowConfigError
4143
from cylc.flow.flow_mgr import FLOW_NONE
4244
from cylc.flow.id import TaskTokens, Tokens
4345
from cylc.flow.task_events_mgr import TaskEventsManager
@@ -1982,6 +1984,87 @@ async def test_remove_active_task(
19821984
)
19831985

19841986

1987+
async def test_remove_by_expire_trigger(
1988+
flow,
1989+
validate,
1990+
scheduler,
1991+
start,
1992+
log_filter
1993+
):
1994+
"""Test task removal by suicide trigger.
1995+
1996+
* Suicide triggers should remove tasks from the pool.
1997+
* It should be possible to bring them back by manually triggering them.
1998+
* Removing a task manually (cylc remove) should work the same.
1999+
"""
2000+
def _get_id(b_completion: str = "succeeded"):
2001+
return flow({
2002+
'scheduler': {
2003+
'experimental': {
2004+
'expire triggers': 'True',
2005+
}
2006+
},
2007+
'scheduling': {
2008+
'graph': {
2009+
'R1': '''
2010+
a? => b
2011+
a:failed? => !b
2012+
'''
2013+
},
2014+
},
2015+
'runtime': {
2016+
'b': {
2017+
'completion': b_completion
2018+
}
2019+
}
2020+
})
2021+
with pytest.raises(
2022+
WorkflowConfigError,
2023+
match=re.escape(
2024+
"This may be due to use of an expire (formerly suicide) trigger"
2025+
)
2026+
):
2027+
validate(_get_id())
2028+
2029+
id_ = _get_id("succeeded or expired")
2030+
validate(id_)
2031+
schd: 'Scheduler' = scheduler(id_, paused_start=False)
2032+
2033+
async with start(schd, level=logging.DEBUG) as log:
2034+
# it should start up with 1/a
2035+
assert schd.pool.get_task_ids() == {"1/a"}
2036+
a = schd.pool.get_task(IntegerPoint("1"), "a")
2037+
2038+
# mark 1/a as failed and check that 1/b expires
2039+
schd.pool.spawn_on_output(a, TASK_OUTPUT_FAILED)
2040+
assert log_filter(regex="1/b.*=> expired")
2041+
assert schd.pool.get_task_ids() == {"1/a"}
2042+
2043+
# 1/b should not be resurrected if it becomes ready
2044+
schd.pool.set_prereqs_and_outputs(['1/b'], [], ["1/a"], [1],)
2045+
assert log_filter(regex="1/b:expired.* already finished and completed")
2046+
2047+
# but we can still resurrect 1/b by triggering it
2048+
log.clear()
2049+
await commands.run_cmd(
2050+
commands.force_trigger_tasks(schd, ['1/b'], ['1']))
2051+
assert log_filter(regex='1/b.*added to the n=0 window')
2052+
2053+
# remove 1/b with "cylc remove""
2054+
await commands.run_cmd(
2055+
commands.remove_tasks(schd, ['1/b'], [])
2056+
)
2057+
assert log_filter(
2058+
regex='1/b.*removed from the n=0 window: request',
2059+
)
2060+
2061+
# and bring 1/b back again by triggering it again
2062+
log.clear()
2063+
await commands.run_cmd(
2064+
commands.force_trigger_tasks(schd, ['1/b'], ['1']))
2065+
assert log_filter(regex='1/b.*added to the n=0 window',)
2066+
2067+
19852068
async def test_remove_by_suicide(
19862069
flow,
19872070
scheduler,

0 commit comments

Comments
 (0)