Skip to content

Commit 1876f34

Browse files
authored
Merge pull request #6691 from hjoliver/set-bad-pre
set command: back out of bad prereqs
2 parents bcc30eb + c8a551e commit 1876f34

File tree

7 files changed

+248
-63
lines changed

7 files changed

+248
-63
lines changed

changes.d/6691.fix.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Fix bug in the `cylc set` command: attempting to set invalid prerequisites on
2+
a future task could prevent it from spawning later on.

cylc/flow/prerequisite.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
ItemsView,
2424
Iterable,
2525
Iterator,
26+
KeysView,
2627
NamedTuple,
2728
Optional,
2829
Set,
@@ -172,6 +173,9 @@ def __iter__(self) -> Iterator[PrereqTuple]:
172173
def items(self) -> ItemsView[PrereqTuple, SatisfiedState]:
173174
return self._satisfied.items()
174175

176+
def keys(self) -> KeysView[PrereqTuple]:
177+
return self._satisfied.keys()
178+
175179
def get_raw_conditional_expression(self):
176180
"""Return a representation of this prereq as a string.
177181

cylc/flow/task_pool.py

Lines changed: 118 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
from cylc.flow.id_cli import contains_fnmatch
6161
from cylc.flow.id_match import filter_ids
6262
from cylc.flow.platforms import get_platform
63+
from cylc.flow.prerequisite import PrereqTuple
6364
from cylc.flow.run_modes import RunMode
6465
from cylc.flow.run_modes.skip import process_outputs as get_skip_mode_outputs
6566
from cylc.flow.task_action_timer import (
@@ -1862,30 +1863,23 @@ def _get_task_proxy_db_outputs(
18621863
self._load_historical_outputs(itask)
18631864
return itask
18641865

1865-
def _standardise_prereqs(
1866-
self, prereqs: 'List[str]'
1867-
) -> 'Dict[Tokens, str]':
1868-
"""Convert prerequisites to a map of task messages: outputs.
1869-
1870-
(So satsify_me logs failures)
1871-
1872-
"""
1873-
_prereqs = {}
1866+
def _standardise_prereqs(self, prereqs: 'List[str]') -> 'Set[PrereqTuple]':
1867+
"""Convert trigger prerequisites to task messages."""
1868+
_prereqs = set()
18741869
for prereq in prereqs:
18751870
pre = Tokens(prereq, relative=True)
1876-
# add implicit "succeeded"; convert "succeed" to "succeeded" etc.
1871+
# Convert "succeed" to "succeeded" etc.
18771872
output = TaskTrigger.standardise_name(
18781873
pre['task_sel'] or TASK_OUTPUT_SUCCEEDED)
18791874
# Convert outputs to task messages.
18801875
try:
18811876
msg = self.config.get_taskdef(
1882-
pre['task']
1877+
str(pre['task'])
18831878
).outputs[output][0]
18841879
cycle = standardise_point_string(pre['cycle'])
18851880
except KeyError:
1886-
# The task does not have this output.
18871881
LOG.warning(
1888-
f"output {pre.relative_id_with_selectors} not found")
1882+
f"Output {pre.relative_id_with_selectors} not found")
18891883
continue
18901884
except WorkflowConfigError as exc:
18911885
LOG.warning(
@@ -1894,7 +1888,7 @@ def _standardise_prereqs(
18941888
LOG.warning(
18951889
f'Invalid prerequisite cycle point:\n{exc.args[0]}')
18961890
else:
1897-
_prereqs[pre.duplicate(task_sel=msg, cycle=cycle)] = prereq
1891+
_prereqs.add(PrereqTuple(str(cycle), str(pre["task"]), msg))
18981892
return _prereqs
18991893

19001894
def _standardise_outputs(
@@ -1909,11 +1903,26 @@ def _standardise_outputs(
19091903
msg = tdef.outputs[output][0]
19101904
except KeyError:
19111905
LOG.warning(
1912-
f"output {point}/{tdef.name}:{output} not found")
1906+
f"Output {point}/{tdef.name}:{output} not found")
19131907
continue
19141908
_outputs.append(msg)
19151909
return _outputs
19161910

1911+
def _get_prereq_params(
1912+
self, prereqs: 'List[str]', tdef: 'TaskDef', point: 'PointBase'
1913+
) -> Tuple[bool, 'Iterable[Tokens]']:
1914+
"""Convert input prerequisites to Tokens of just the valid ones.
1915+
1916+
And convert the "['all']" prerequisite shortcut to a bool.
1917+
"""
1918+
if prereqs != ['all']:
1919+
set_all = False
1920+
valid_prereqs = self._get_valid_prereqs(prereqs, tdef, point)
1921+
else:
1922+
set_all = True
1923+
valid_prereqs = []
1924+
return set_all, valid_prereqs
1925+
19171926
def set_prereqs_and_outputs(
19181927
self,
19191928
items: Iterable[str],
@@ -1931,17 +1940,17 @@ def set_prereqs_and_outputs(
19311940
- spawn the task (if not spawned)
19321941
- update its prerequisites
19331942
1943+
Prerequisite format: "cycle/task:output" or "all".
1944+
1945+
Prerequisite validity is checked via the taskdef prior to spawning
1946+
so we can easily back out it if no valid prerequisites are given.
1947+
19341948
Set outputs:
19351949
- update task outputs in the DB
19361950
- (implied outputs are handled by the event manager)
19371951
- spawn children of the outputs (if not spawned)
19381952
- update the child prerequisites
19391953
1940-
Task matching restrictions (for now):
1941-
- globs (cycle and name) only match in the pool
1942-
- inactive tasks must be specified individually
1943-
- family names are not expanded to members
1944-
19451954
Uses a transient task proxy to spawn children. (Even if parent was
19461955
previously spawned in this flow its children might not have been).
19471956
@@ -1953,20 +1962,21 @@ def set_prereqs_and_outputs(
19531962
19541963
Args:
19551964
items: task ID match patterns
1956-
prereqs: prerequisites to set
1965+
prereqs: prerequisites to set ([pre1, pre2,...], ['all'] or [])
19571966
outputs: outputs to set
19581967
flow: flow numbers for spawned or merged tasks
19591968
flow_wait: wait for flows to catch up before continuing
19601969
flow_descr: description of new flow
19611970
19621971
"""
19631972
# Get matching pool tasks and inactive task definitions.
1964-
itasks, inactive_tasks, unmatched = self.filter_task_proxies(
1973+
itasks, inactive_tasks, _ = self.filter_task_proxies(
19651974
items,
19661975
inactive=True,
19671976
warn_no_active=False,
19681977
)
19691978

1979+
no_op = True
19701980
flow_nums = self._get_flow_nums(flow, flow_descr)
19711981

19721982
# Set existing task proxies.
@@ -1977,40 +1987,96 @@ def set_prereqs_and_outputs(
19771987
f" {repr_flow_nums(itask.flow_nums, full=True)}"
19781988
)
19791989
continue
1980-
self.merge_flows(itask, flow_nums)
1990+
19811991
if prereqs:
1982-
self._set_prereqs_itask(itask, prereqs, flow_nums)
1992+
set_all, valid_prereqs = (
1993+
self._get_prereq_params(prereqs, itask.tdef, itask.point)
1994+
)
1995+
if not (set_all or valid_prereqs):
1996+
continue
1997+
self.merge_flows(itask, flow_nums)
1998+
self._set_prereqs_itask(itask, valid_prereqs, set_all)
1999+
no_op = False
19832000
else:
2001+
# Outputs (may be empty list)
19842002
# Spawn as if seq xtrig of parentless task was satisfied,
19852003
# with associated task producing these outputs.
2004+
self.merge_flows(itask, flow_nums)
19862005
self.check_spawn_psx_task(itask)
19872006
self._set_outputs_itask(itask, outputs)
2007+
no_op = False
19882008

1989-
# Spawn and set inactive tasks.
19902009
if not flow:
19912010
# default: assign to all active flows
19922011
flow_nums = self._get_active_flow_nums()
2012+
2013+
# Spawn and set inactive tasks.
19932014
for tdef, point in inactive_tasks:
19942015
if prereqs:
2016+
set_all, valid_prereqs = (
2017+
self._get_prereq_params(prereqs, tdef, point)
2018+
)
2019+
if not (set_all or valid_prereqs):
2020+
continue
19952021
self._set_prereqs_tdef(
1996-
point, tdef, prereqs, flow_nums, flow_wait)
2022+
point, tdef, valid_prereqs, flow_nums, flow_wait, set_all)
2023+
no_op = False
19972024
else:
2025+
# Outputs (may be empty list)
19982026
trans = self._get_task_proxy_db_outputs(
19992027
point, tdef, flow_nums,
20002028
flow_wait=flow_wait, transient=True
20012029
)
20022030
if trans is not None:
20032031
self._set_outputs_itask(trans, outputs)
2032+
no_op = False
20042033

2005-
if self.compute_runahead():
2034+
if not no_op and self.compute_runahead():
20062035
self.release_runahead_tasks()
20072036

2037+
def _get_valid_prereqs(
2038+
self, prereqs: List[str], tdef: 'TaskDef', point: 'PointBase'
2039+
) -> 'Iterable[Tokens]':
2040+
"""Validate prerequisite triggers and return associated task messages.
2041+
2042+
To set prerequisites, the user gives triggers, but we need to use the
2043+
associated task messages to satisfy the prerequisites of target tasks.
2044+
2045+
Args:
2046+
prereqs:
2047+
list of string prerequisites of the form "point/task:output"
2048+
Returns:
2049+
set of tokens {(cycle, task, task_message),}
2050+
2051+
"""
2052+
valid = {key for pre in tdef.get_prereqs(point) for key in pre.keys()}
2053+
2054+
# Get prerequisite tuples in terms of task messages not triggers.
2055+
requested = self._standardise_prereqs(prereqs)
2056+
2057+
for prereq in requested - valid:
2058+
# But log bad ones with triggers, not messages.
2059+
trg = self.config.get_taskdef(
2060+
prereq.task
2061+
).get_output(prereq.output)
2062+
LOG.warning(
2063+
f'{point}/{tdef.name} does not depend on '
2064+
f'"{prereq.get_id()}:{trg}"'
2065+
)
2066+
return {
2067+
Tokens(cycle=pre.point, task=pre.task, task_sel=pre.output)
2068+
for pre in valid & requested
2069+
}
2070+
20082071
def _set_outputs_itask(
20092072
self,
20102073
itask: 'TaskProxy',
20112074
outputs: Iterable[str],
20122075
) -> None:
2013-
"""Set requested outputs on a task proxy and spawn children."""
2076+
"""Set requested outputs on a task proxy and spawn children.
2077+
2078+
Designated flows should already be merged to the task proxy.
2079+
"""
20142080
if not outputs:
20152081
outputs = itask.state.outputs.iter_required_messages()
20162082
else:
@@ -2050,51 +2116,42 @@ def _set_outputs_itask(
20502116
def _set_prereqs_itask(
20512117
self,
20522118
itask: 'TaskProxy',
2053-
prereqs: 'List[str]',
2054-
flow_nums: 'Set[int]',
2055-
) -> bool:
2119+
prereqs: 'Iterable[Tokens]',
2120+
set_all: bool
2121+
) -> None:
20562122
"""Set prerequisites on a task proxy.
20572123
2058-
Prerequisite format: "cycle/task:output" or "all".
2059-
2060-
Return True if any prereqs are valid, else False.
2061-
2124+
Designated flows should already be merged to the task proxy.
20622125
"""
2063-
if prereqs == ["all"]:
2126+
if set_all:
20642127
itask.state.set_prerequisites_all_satisfied()
20652128
else:
2066-
# Attempt to set the given presrequisites.
2067-
# Log any that aren't valid for the task.
2068-
presus = self._standardise_prereqs(prereqs)
2069-
unmatched = itask.satisfy_me(presus.keys(), forced=True)
2070-
for task_msg in unmatched:
2071-
LOG.warning(
2072-
f"{itask.identity} does not depend on"
2073-
f' "{presus[task_msg]}"'
2074-
)
2075-
if len(unmatched) == len(prereqs):
2076-
# No prereqs matched.
2077-
return False
2129+
itask.satisfy_me(prereqs, forced=True)
20782130
if (
20792131
self.runahead_limit_point is not None
20802132
and itask.point <= self.runahead_limit_point
20812133
):
20822134
self.rh_release_and_queue(itask)
20832135
self.data_store_mgr.delta_task_prerequisite(itask)
2084-
return True
20852136

20862137
def _set_prereqs_tdef(
2087-
self, point, taskdef, prereqs, flow_nums, flow_wait
2138+
self,
2139+
point: 'PointBase',
2140+
taskdef: 'TaskDef',
2141+
prereqs: 'Iterable[Tokens]',
2142+
flow_nums: 'FlowNums',
2143+
flow_wait: bool,
2144+
set_all: bool
20882145
):
20892146
"""Spawn an inactive task and set prerequisites on it."""
2090-
20912147
itask = self.spawn_task(
20922148
taskdef.name, point, flow_nums, flow_wait=flow_wait
20932149
)
20942150
if itask is None:
20952151
return
2096-
if self._set_prereqs_itask(itask, prereqs, flow_nums):
2097-
self.add_to_pool(itask)
2152+
2153+
self._set_prereqs_itask(itask, prereqs, set_all)
2154+
self.add_to_pool(itask)
20982155

20992156
def _get_active_flow_nums(self) -> 'FlowNums':
21002157
"""Return all active flow numbers.
@@ -2380,7 +2437,14 @@ def filter_task_proxies(
23802437
warn_no_active: bool = True,
23812438
inactive: bool = False,
23822439
) -> 'Tuple[List[TaskProxy], Set[Tuple[TaskDef, PointBase]], List[str]]':
2383-
"""Return task proxies that match names, points, states in items.
2440+
"""Return task proxies and inactive tasks that match ids.
2441+
2442+
(TODO: method should be renamed to "filter_tasks").
2443+
2444+
Restrictions (for now):
2445+
- globs (cycle and name) only match in the pool
2446+
- inactive tasks must be specified individually
2447+
- family names are not expanded to members
23842448
23852449
Args:
23862450
ids:

cylc/flow/taskdef.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,20 @@ def get_parent_points(self, point):
325325
parent_points.add(trig.get_parent_point(point))
326326
return parent_points
327327

328+
def get_prereqs(self, point):
329+
"""Return my prereqs, at point."""
330+
prereqs = set()
331+
for seq in self.sequences:
332+
if not seq.is_valid(point):
333+
continue
334+
if seq in self.dependencies:
335+
# task has prereqs in this sequence
336+
for dep in self.dependencies[seq]:
337+
if dep.suicide:
338+
continue
339+
prereqs.add(dep.get_prerequisite(point, self))
340+
return prereqs
341+
328342
def has_only_abs_triggers(self, point):
329343
"""Return whether I have only absolute triggers at point."""
330344
if not self.has_abs_triggers:

0 commit comments

Comments
 (0)