Skip to content

Commit 4afec54

Browse files
committed
Address review comments.
1 parent 8dabb69 commit 4afec54

File tree

2 files changed

+30
-20
lines changed

2 files changed

+30
-20
lines changed

cylc/flow/prerequisite.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
ItemsView,
2424
Iterable,
2525
Iterator,
26+
KeysView,
2627
NamedTuple,
2728
Optional,
2829
Set,
@@ -172,7 +173,7 @@ def __iter__(self) -> Iterator[PrereqTuple]:
172173
def items(self) -> ItemsView[PrereqTuple, SatisfiedState]:
173174
return self._satisfied.items()
174175

175-
def get_tuples(self) -> Iterable[PrereqTuple]:
176+
def keys(self) -> KeysView[PrereqTuple]:
176177
return self._satisfied.keys()
177178

178179
def get_raw_conditional_expression(self):

cylc/flow/task_pool.py

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
from cylc.flow.id_cli import contains_fnmatch
6161
from cylc.flow.id_match import filter_ids
6262
from cylc.flow.platforms import get_platform
63+
from cylc.flow.prerequisite import PrereqTuple
6364
from cylc.flow.run_modes import RunMode
6465
from cylc.flow.run_modes.skip import process_outputs as get_skip_mode_outputs
6566
from cylc.flow.task_action_timer import (
@@ -1862,9 +1863,7 @@ def _get_task_proxy_db_outputs(
18621863
self._load_historical_outputs(itask)
18631864
return itask
18641865

1865-
def _standardise_prereqs(
1866-
self, prereqs: 'List[str]'
1867-
) -> 'Set[Tuple[str,str,str]]':
1866+
def _standardise_prereqs(self, prereqs: 'List[str]') -> 'Set[PrereqTuple]':
18681867
"""Convert trigger prerequisites to task messages."""
18691868
_prereqs = set()
18701869
for prereq in prereqs:
@@ -1889,7 +1888,7 @@ def _standardise_prereqs(
18891888
LOG.warning(
18901889
f'Invalid prerequisite cycle point:\n{exc.args[0]}')
18911890
else:
1892-
_prereqs.add((cycle, pre["task"], msg))
1891+
_prereqs.add(PrereqTuple(str(cycle), str(pre.task), msg))
18931892
return _prereqs
18941893

18951894
def _standardise_outputs(
@@ -1943,17 +1942,15 @@ def set_prereqs_and_outputs(
19431942
19441943
Prerequisite format: "cycle/task:output" or "all".
19451944
1945+
Prerequisite validity is checked via the taskdef prior to spawning
1946+
so we can easily back out it if no valid prerequisites are given.
1947+
19461948
Set outputs:
19471949
- update task outputs in the DB
19481950
- (implied outputs are handled by the event manager)
19491951
- spawn children of the outputs (if not spawned)
19501952
- update the child prerequisites
19511953
1952-
Task matching restrictions (for now):
1953-
- globs (cycle and name) only match in the pool
1954-
- inactive tasks must be specified individually
1955-
- family names are not expanded to members
1956-
19571954
Uses a transient task proxy to spawn children. (Even if parent was
19581955
previously spawned in this flow its children might not have been).
19591956
@@ -2052,22 +2049,22 @@ def _get_valid_prereqs(
20522049
set of tokens {(cycle, task, task_message),}
20532050
20542051
"""
2055-
valid: 'Set[Tuple[str, str, str]]' = set()
2056-
for pre in tdef.get_prereqs(point):
2057-
valid.update(list(pre.get_tuples()))
2052+
valid = {pre.keys() for pre in tdef.get_prereqs(point)}
20582053

20592054
# Get prerequisite tuples in terms of task messages not triggers.
20602055
requested = self._standardise_prereqs(prereqs)
20612056

20622057
for prereq in requested - valid:
20632058
# But log bad ones in using triggers, not messages.
2064-
trg = self.config.get_taskdef(str(prereq[1])).get_output(prereq[2])
2059+
trg = self.config.get_taskdef(
2060+
prereq.task
2061+
).get_output(prereq.output)
20652062
LOG.warning(
20662063
f'{point}/{tdef.name} does not depend on '
2067-
f'"{prereq[0]}/{prereq[1]}:{trg}"'
2064+
f'"{prereq.get_id()}:{trg}"'
20682065
)
20692066
return {
2070-
Tokens(cycle=pre[0], task=pre[1], task_sel=pre[2])
2067+
Tokens(cycle=pre.cycle, task=pre.task, task_sel=pre.output)
20712068
for pre in valid & requested
20722069
}
20732070

@@ -2076,7 +2073,10 @@ def _set_outputs_itask(
20762073
itask: 'TaskProxy',
20772074
outputs: Iterable[str],
20782075
) -> None:
2079-
"""Set requested outputs on a task proxy and spawn children."""
2076+
"""Set requested outputs on a task proxy and spawn children.
2077+
2078+
Designated flows should already be merged to the task proxy.
2079+
"""
20802080
if not outputs:
20812081
outputs = itask.state.outputs.iter_required_messages()
20822082
else:
@@ -2119,8 +2119,10 @@ def _set_prereqs_itask(
21192119
prereqs: 'Iterable[Tokens]',
21202120
set_all: bool
21212121
):
2122-
"""Set prerequisites on a task proxy."""
2123-
# (No need to check for unmatched - prereqs already validated)
2122+
"""Set prerequisites on a task proxy.
2123+
2124+
Designated flows should already be merged to the task proxy.
2125+
"""
21242126
if set_all:
21252127
itask.state.set_prerequisites_all_satisfied()
21262128
else:
@@ -2436,7 +2438,14 @@ def filter_task_proxies(
24362438
warn_no_active: bool = True,
24372439
inactive: bool = False,
24382440
) -> 'Tuple[List[TaskProxy], Set[Tuple[TaskDef, PointBase]], List[str]]':
2439-
"""Return task proxies that match names, points, states in items.
2441+
"""Return task proxies and inactive tasks that match ids.
2442+
2443+
(TODO: method should be renamed to "filter_tasks").
2444+
2445+
Restrictions (for now):
2446+
- globs (cycle and name) only match in the pool
2447+
- inactive tasks must be specified individually
2448+
- family names are not expanded to members
24402449
24412450
Args:
24422451
ids:

0 commit comments

Comments
 (0)