Skip to content

Commit 9ed2014

Browse files
authored
Merge pull request #6707 from cylc/8.4.x-sync
🤖 Merge 8.4.x-sync into master
2 parents 87b2c28 + 39b855c commit 9ed2014

File tree

7 files changed

+242
-60
lines changed

7 files changed

+242
-60
lines changed

changes.d/6691.fix.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Fix bug in the `cylc set` command: attempting to set invalid prerequisites on
2+
a future task could prevent it from spawning later on.

cylc/flow/prerequisite.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
ItemsView,
2424
Iterable,
2525
Iterator,
26+
KeysView,
2627
NamedTuple,
2728
Optional,
2829
Set,
@@ -172,6 +173,9 @@ def __iter__(self) -> Iterator[PrereqTuple]:
172173
def items(self) -> ItemsView[PrereqTuple, SatisfiedState]:
173174
return self._satisfied.items()
174175

176+
def keys(self) -> KeysView[PrereqTuple]:
177+
return self._satisfied.keys()
178+
175179
def get_raw_conditional_expression(self):
176180
"""Return a representation of this prereq as a string.
177181

cylc/flow/task_pool.py

Lines changed: 117 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
from cylc.flow.id_cli import contains_fnmatch
6161
from cylc.flow.id_match import filter_ids
6262
from cylc.flow.platforms import get_platform
63+
from cylc.flow.prerequisite import PrereqTuple
6364
from cylc.flow.run_modes import RunMode
6465
from cylc.flow.run_modes.skip import process_outputs as get_skip_mode_outputs
6566
from cylc.flow.task_action_timer import (
@@ -1867,30 +1868,23 @@ def _get_task_proxy_db_outputs(
18671868
self._load_historical_outputs(itask)
18681869
return itask
18691870

1870-
def _standardise_prereqs(
1871-
self, prereqs: 'List[str]'
1872-
) -> 'Dict[Tokens, str]':
1873-
"""Convert prerequisites to a map of task messages: outputs.
1874-
1875-
(So satsify_me logs failures)
1876-
1877-
"""
1878-
_prereqs = {}
1871+
def _standardise_prereqs(self, prereqs: 'List[str]') -> 'Set[PrereqTuple]':
1872+
"""Convert trigger prerequisites to task messages."""
1873+
_prereqs = set()
18791874
for prereq in prereqs:
18801875
pre = Tokens(prereq, relative=True)
1881-
# add implicit "succeeded"; convert "succeed" to "succeeded" etc.
1876+
# Convert "succeed" to "succeeded" etc.
18821877
output = TaskTrigger.standardise_name(
18831878
pre['task_sel'] or TASK_OUTPUT_SUCCEEDED)
18841879
# Convert outputs to task messages.
18851880
try:
18861881
msg = self.config.get_taskdef(
1887-
pre['task']
1882+
str(pre['task'])
18881883
).outputs[output][0]
18891884
cycle = standardise_point_string(pre['cycle'])
18901885
except KeyError:
1891-
# The task does not have this output.
18921886
LOG.warning(
1893-
f"output {pre.relative_id_with_selectors} not found")
1887+
f"Output {pre.relative_id_with_selectors} not found")
18941888
continue
18951889
except WorkflowConfigError as exc:
18961890
LOG.warning(
@@ -1899,7 +1893,7 @@ def _standardise_prereqs(
18991893
LOG.warning(
19001894
f'Invalid prerequisite cycle point:\n{exc.args[0]}')
19011895
else:
1902-
_prereqs[pre.duplicate(task_sel=msg, cycle=cycle)] = prereq
1896+
_prereqs.add(PrereqTuple(str(cycle), str(pre["task"]), msg))
19031897
return _prereqs
19041898

19051899
def _standardise_outputs(
@@ -1914,11 +1908,26 @@ def _standardise_outputs(
19141908
msg = tdef.outputs[output][0]
19151909
except KeyError:
19161910
LOG.warning(
1917-
f"output {point}/{tdef.name}:{output} not found")
1911+
f"Output {point}/{tdef.name}:{output} not found")
19181912
continue
19191913
_outputs.append(msg)
19201914
return _outputs
19211915

1916+
def _get_prereq_params(
1917+
self, prereqs: 'List[str]', tdef: 'TaskDef', point: 'PointBase'
1918+
) -> Tuple[bool, 'Iterable[Tokens]']:
1919+
"""Convert input prerequisites to Tokens of just the valid ones.
1920+
1921+
And convert the "['all']" prerequisite shortcut to a bool.
1922+
"""
1923+
if prereqs != ['all']:
1924+
set_all = False
1925+
valid_prereqs = self._get_valid_prereqs(prereqs, tdef, point)
1926+
else:
1927+
set_all = True
1928+
valid_prereqs = []
1929+
return set_all, valid_prereqs
1930+
19221931
def set_prereqs_and_outputs(
19231932
self,
19241933
items: Iterable[str],
@@ -1936,17 +1945,17 @@ def set_prereqs_and_outputs(
19361945
- spawn the task (if not spawned)
19371946
- update its prerequisites
19381947
1948+
Prerequisite format: "cycle/task:output" or "all".
1949+
1950+
Prerequisite validity is checked via the taskdef prior to spawning
1951+
so we can easily back out it if no valid prerequisites are given.
1952+
19391953
Set outputs:
19401954
- update task outputs in the DB
19411955
- (implied outputs are handled by the event manager)
19421956
- spawn children of the outputs (if not spawned)
19431957
- update the child prerequisites
19441958
1945-
Task matching restrictions (for now):
1946-
- globs (cycle and name) only match in the pool
1947-
- inactive tasks must be specified individually
1948-
- family names are not expanded to members
1949-
19501959
Uses a transient task proxy to spawn children. (Even if parent was
19511960
previously spawned in this flow its children might not have been).
19521961
@@ -1958,20 +1967,21 @@ def set_prereqs_and_outputs(
19581967
19591968
Args:
19601969
items: task ID match patterns
1961-
prereqs: prerequisites to set
1970+
prereqs: prerequisites to set ([pre1, pre2,...], ['all'] or [])
19621971
outputs: outputs to set
19631972
flow: flow numbers for spawned or merged tasks
19641973
flow_wait: wait for flows to catch up before continuing
19651974
flow_descr: description of new flow
19661975
19671976
"""
19681977
# Get matching pool tasks and inactive task definitions.
1969-
itasks, inactive_tasks, unmatched = self.filter_task_proxies(
1978+
itasks, inactive_tasks, _ = self.filter_task_proxies(
19701979
items,
19711980
inactive=True,
19721981
warn_no_active=False,
19731982
)
19741983

1984+
no_op = True
19751985
flow_nums = self._get_flow_nums(flow, flow_descr)
19761986

19771987
# Set existing task proxies.
@@ -1982,34 +1992,87 @@ def set_prereqs_and_outputs(
19821992
f" {repr_flow_nums(itask.flow_nums, full=True)}"
19831993
)
19841994
continue
1985-
self.merge_flows(itask, flow_nums)
1995+
19861996
if prereqs:
1987-
self._set_prereqs_itask(itask, prereqs, flow_nums)
1997+
set_all, valid_prereqs = (
1998+
self._get_prereq_params(prereqs, itask.tdef, itask.point)
1999+
)
2000+
if not (set_all or valid_prereqs):
2001+
continue
2002+
self.merge_flows(itask, flow_nums)
2003+
self._set_prereqs_itask(itask, valid_prereqs, set_all)
2004+
no_op = False
19882005
else:
2006+
# Outputs (may be empty list)
19892007
# Spawn as if seq xtrig of parentless task was satisfied,
19902008
# with associated task producing these outputs.
2009+
self.merge_flows(itask, flow_nums)
19912010
self.check_spawn_psx_task(itask)
19922011
self._set_outputs_itask(itask, outputs)
2012+
no_op = False
19932013

1994-
# Spawn and set inactive tasks.
19952014
if not flow:
19962015
# default: assign to all active flows
19972016
flow_nums = self._get_active_flow_nums()
2017+
2018+
# Spawn and set inactive tasks.
19982019
for tdef, point in inactive_tasks:
19992020
if prereqs:
2021+
set_all, valid_prereqs = (
2022+
self._get_prereq_params(prereqs, tdef, point)
2023+
)
2024+
if not (set_all or valid_prereqs):
2025+
continue
20002026
self._set_prereqs_tdef(
2001-
point, tdef, prereqs, flow_nums, flow_wait)
2027+
point, tdef, valid_prereqs, flow_nums, flow_wait, set_all)
2028+
no_op = False
20022029
else:
2030+
# Outputs (may be empty list)
20032031
trans = self._get_task_proxy_db_outputs(
20042032
point, tdef, flow_nums,
20052033
flow_wait=flow_wait, transient=True
20062034
)
20072035
if trans is not None:
20082036
self._set_outputs_itask(trans, outputs)
2037+
no_op = False
20092038

2010-
if self.compute_runahead():
2039+
if not no_op and self.compute_runahead():
20112040
self.release_runahead_tasks()
20122041

2042+
def _get_valid_prereqs(
2043+
self, prereqs: List[str], tdef: 'TaskDef', point: 'PointBase'
2044+
) -> 'Iterable[Tokens]':
2045+
"""Validate prerequisite triggers and return associated task messages.
2046+
2047+
To set prerequisites, the user gives triggers, but we need to use the
2048+
associated task messages to satisfy the prerequisites of target tasks.
2049+
2050+
Args:
2051+
prereqs:
2052+
list of string prerequisites of the form "point/task:output"
2053+
Returns:
2054+
set of tokens {(cycle, task, task_message),}
2055+
2056+
"""
2057+
valid = {key for pre in tdef.get_prereqs(point) for key in pre.keys()}
2058+
2059+
# Get prerequisite tuples in terms of task messages not triggers.
2060+
requested = self._standardise_prereqs(prereqs)
2061+
2062+
for prereq in requested - valid:
2063+
# But log bad ones with triggers, not messages.
2064+
trg = self.config.get_taskdef(
2065+
prereq.task
2066+
).get_output(prereq.output)
2067+
LOG.warning(
2068+
f'{point}/{tdef.name} does not depend on '
2069+
f'"{prereq.get_id()}:{trg}"'
2070+
)
2071+
return {
2072+
Tokens(cycle=pre.point, task=pre.task, task_sel=pre.output)
2073+
for pre in valid & requested
2074+
}
2075+
20132076
def _set_outputs_itask(
20142077
self,
20152078
itask: 'TaskProxy',
@@ -2020,8 +2083,11 @@ def _set_outputs_itask(
20202083
If no outputs were specified and the task has no required outputs to
20212084
set, set the "success pathway" outputs in the same way that skip mode
20222085
does.
2086+
2087+
Designated flows should already be merged to the task proxy.
20232088
"""
20242089
outputs = set(outputs)
2090+
20252091
if not outputs:
20262092
outputs = set(
20272093
# Set required outputs by default
@@ -2066,51 +2132,42 @@ def _set_outputs_itask(
20662132
def _set_prereqs_itask(
20672133
self,
20682134
itask: 'TaskProxy',
2069-
prereqs: 'List[str]',
2070-
flow_nums: 'Set[int]',
2071-
) -> bool:
2135+
prereqs: 'Iterable[Tokens]',
2136+
set_all: bool
2137+
) -> None:
20722138
"""Set prerequisites on a task proxy.
20732139
2074-
Prerequisite format: "cycle/task:output" or "all".
2075-
2076-
Return True if any prereqs are valid, else False.
2077-
2140+
Designated flows should already be merged to the task proxy.
20782141
"""
2079-
if prereqs == ["all"]:
2142+
if set_all:
20802143
itask.state.set_prerequisites_all_satisfied()
20812144
else:
2082-
# Attempt to set the given presrequisites.
2083-
# Log any that aren't valid for the task.
2084-
presus = self._standardise_prereqs(prereqs)
2085-
unmatched = itask.satisfy_me(presus.keys(), forced=True)
2086-
for task_msg in unmatched:
2087-
LOG.warning(
2088-
f"{itask.identity} does not depend on"
2089-
f' "{presus[task_msg]}"'
2090-
)
2091-
if len(unmatched) == len(prereqs):
2092-
# No prereqs matched.
2093-
return False
2145+
itask.satisfy_me(prereqs, forced=True)
20942146
if (
20952147
self.runahead_limit_point is not None
20962148
and itask.point <= self.runahead_limit_point
20972149
):
20982150
self.rh_release_and_queue(itask)
20992151
self.data_store_mgr.delta_task_prerequisite(itask)
2100-
return True
21012152

21022153
def _set_prereqs_tdef(
2103-
self, point, taskdef, prereqs, flow_nums, flow_wait
2154+
self,
2155+
point: 'PointBase',
2156+
taskdef: 'TaskDef',
2157+
prereqs: 'Iterable[Tokens]',
2158+
flow_nums: 'FlowNums',
2159+
flow_wait: bool,
2160+
set_all: bool
21042161
):
21052162
"""Spawn an inactive task and set prerequisites on it."""
2106-
21072163
itask = self.spawn_task(
21082164
taskdef.name, point, flow_nums, flow_wait=flow_wait
21092165
)
21102166
if itask is None:
21112167
return
2112-
if self._set_prereqs_itask(itask, prereqs, flow_nums):
2113-
self.add_to_pool(itask)
2168+
2169+
self._set_prereqs_itask(itask, prereqs, set_all)
2170+
self.add_to_pool(itask)
21142171

21152172
def _get_active_flow_nums(self) -> 'FlowNums':
21162173
"""Return all active flow numbers.
@@ -2396,7 +2453,14 @@ def filter_task_proxies(
23962453
warn_no_active: bool = True,
23972454
inactive: bool = False,
23982455
) -> 'Tuple[List[TaskProxy], Set[Tuple[TaskDef, PointBase]], List[str]]':
2399-
"""Return task proxies that match names, points, states in items.
2456+
"""Return task proxies and inactive tasks that match ids.
2457+
2458+
(TODO: method should be renamed to "filter_tasks").
2459+
2460+
Restrictions (for now):
2461+
- globs (cycle and name) only match in the pool
2462+
- inactive tasks must be specified individually
2463+
- family names are not expanded to members
24002464
24012465
Args:
24022466
ids:

cylc/flow/taskdef.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,20 @@ def get_parent_points(self, point):
325325
parent_points.add(trig.get_parent_point(point))
326326
return parent_points
327327

328+
def get_prereqs(self, point):
329+
"""Return my prereqs, at point."""
330+
prereqs = set()
331+
for seq in self.sequences:
332+
if not seq.is_valid(point):
333+
continue
334+
if seq in self.dependencies:
335+
# task has prereqs in this sequence
336+
for dep in self.dependencies[seq]:
337+
if dep.suicide:
338+
continue
339+
prereqs.add(dep.get_prerequisite(point, self))
340+
return prereqs
341+
328342
def has_only_abs_triggers(self, point):
329343
"""Return whether I have only absolute triggers at point."""
330344
if not self.has_abs_triggers:

0 commit comments

Comments
 (0)