Skip to content

Commit a89fba8

Browse files
Merge pull request #7148 from MetRonnie/trigger-warm-flow-on
Prevent flow-on after triggering pre-startcp tasks after a warm start
2 parents 9c9e9be + c9b8b06 commit a89fba8

File tree

14 files changed

+223
-120
lines changed

14 files changed

+223
-120
lines changed

changes.d/7101.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed manual group triggering of tasks before the start cycle point not obeying prerequisites within the group.

changes.d/7148.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed a bug where tasks would flow on from tasks manually triggered before the start cycle point.

cylc/flow/commands.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ def _remove_matched_tasks(
161161
schd: 'Scheduler',
162162
ids: Set[TaskTokens],
163163
flow_nums: 'FlowNums',
164+
warn_unremovable: bool = True,
164165
):
165166
"""Remove matched tasks."""
166167
# Mapping of *relative* task IDs to removed flow numbers:
@@ -267,15 +268,14 @@ def _remove_matched_tasks(
267268
)
268269
LOG.info(f"Removed tasks: {', '.join(sorted(tasks_str_list))}")
269270

270-
if not_removed:
271+
if warn_unremovable and not_removed:
271272
fnums_str = (
272273
repr_flow_nums(flow_nums, full=True) if flow_nums else ''
273274
)
274275
tasks_str = ', '.join(
275276
sorted(tokens.relative_id for tokens in not_removed)
276277
)
277-
# This often does not indicate an error - e.g. for group trigger.
278-
LOG.debug(f"Task(s) not removable: {tasks_str} {fnums_str}")
278+
LOG.warning(f"Task(s) not removable: {tasks_str} {fnums_str}")
279279

280280
if removed and schd.pool.compute_runahead():
281281
schd.pool.release_runahead_tasks()
@@ -806,7 +806,19 @@ def _force_trigger_tasks(
806806
# Remove all inactive and selected active group members.
807807
if flow != [FLOW_NONE]:
808808
# (No need to remove tasks if triggering with no-flow).
809-
_remove_matched_tasks(schd, {*active_to_remove, *inactive}, flow_nums)
809+
_remove_matched_tasks(
810+
schd,
811+
ids := {*active_to_remove, *inactive},
812+
flow_nums,
813+
warn_unremovable=False,
814+
)
815+
# Record pre-startcp tasks, as these would not normally be spawned -
816+
# see https://github.com/cylc/cylc-flow/pull/7148
817+
schd.pool.pre_start_tasks_to_trigger.update(
818+
(tokens['task'], point)
819+
for tokens in ids
820+
if (point := get_point(tokens['cycle'])) < schd.config.start_point
821+
)
810822

811823
# trigger should override the held state, however, in-group tasks may
812824
# have previously been held and active in-group tasks will become
@@ -827,7 +839,7 @@ def _force_trigger_tasks(
827839
icycle = get_point(id_['cycle'])
828840
in_flow_prereqs = False
829841
jtask: Optional[TaskProxy] = None
830-
if tdef.is_parentless(icycle):
842+
if tdef.is_parentless(icycle, cutoff=schd.config.initial_point):
831843
# Parentless: set all prereqs to spawn the task.
832844
jtask = schd.pool._set_prereqs_tdef(
833845
icycle, tdef,

cylc/flow/cycling/integer.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,22 @@ def __lt__(self, other: 'IntegerSequence') -> bool:
588588
return True
589589
return False
590590

591+
def __repr__(self) -> str:
592+
"""
593+
>>> IntegerSequence('R6/3/P1', '2')
594+
<IntegerSequence start=3, stop=8, step=P1, self.p_context_start=2,
595+
i_offset=P0>
596+
"""
597+
ret = f"start={self.p_start}, stop={self.p_stop}, step={self.i_step}"
598+
if self.p_context_start not in {self.p_start, None}:
599+
ret += f", {self.p_context_start=}"
600+
if self.p_context_stop not in {self.p_stop, None}:
601+
ret += f", {self.p_context_stop=}"
602+
for attr in ('i_offset', 'exclusions'):
603+
if (value := getattr(self, attr)) is not None:
604+
ret += f", {attr}={value}"
605+
return f"<{type(self).__name__} {ret}>"
606+
591607

592608
def init_from_cfg(_):
593609
"""Placeholder function required by all cycling modules."""

cylc/flow/task_pool.py

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,9 @@ def __init__(
211211
self.config.runtime['descendants']
212212
)
213213

214-
self.tasks_to_hold: Set[Tuple[str, 'PointBase']] = set()
215-
self.tasks_to_trigger_now: Set['TaskProxy'] = set()
214+
self.tasks_to_hold: set[tuple[str, 'PointBase']] = set()
215+
self.tasks_to_trigger_now: set['TaskProxy'] = set()
216+
self.pre_start_tasks_to_trigger: set[tuple[str, 'PointBase']] = set()
216217

217218
def set_stop_task(self, task_id):
218219
"""Set stop after a task."""
@@ -837,9 +838,11 @@ def spawn_to_rh_limit(
837838
longer parentless, and/or hit the runahead limit.
838839
839840
"""
840-
if not flow_nums or point is None:
841-
# Force-triggered no-flow task.
842-
# Or called with an invalid next_point.
841+
if (
842+
not flow_nums # Force-triggered no-flow task
843+
or point is None # Reached end of sequence?
844+
or point < self.config.start_point # Warm start
845+
):
843846
return
844847
if self.runahead_limit_point is None:
845848
self.compute_runahead()
@@ -848,7 +851,7 @@ def spawn_to_rh_limit(
848851

849852
is_xtrig_sequential = False
850853
while point is not None and (point <= self.runahead_limit_point):
851-
if tdef.is_parentless(point):
854+
if tdef.is_parentless(point, cutoff=self.config.start_point):
852855
ntask, is_in_pool, is_xtrig_sequential = (
853856
self.get_or_spawn_task(point, tdef, flow_nums)
854857
)
@@ -866,7 +869,11 @@ def spawn_to_rh_limit(
866869

867870
def spawn_if_parentless(self, tdef, point, flow_nums):
868871
"""Spawn a task if parentless, regardless of runahead limit."""
869-
if flow_nums and point is not None and tdef.is_parentless(point):
872+
if (
873+
flow_nums
874+
and point is not None
875+
and tdef.is_parentless(point, cutoff=self.config.start_point)
876+
):
870877
ntask, is_in_pool, _ = self.get_or_spawn_task(
871878
point, tdef, flow_nums
872879
)
@@ -906,6 +913,9 @@ def remove(self, itask: 'TaskProxy', reason: Optional[str] = None) -> None:
906913
pass
907914
else:
908915
self.tasks_to_trigger_now.discard(itask)
916+
self.pre_start_tasks_to_trigger.discard(
917+
(itask.tdef.name, itask.point)
918+
)
909919
self.tasks_removed = True
910920
self.active_tasks_changed = True
911921
if not self.active_tasks[itask.point]:
@@ -1727,8 +1737,8 @@ def can_be_spawned(self, name: str, point: 'PointBase') -> bool:
17271737
return True
17281738

17291739
def _get_task_history(
1730-
self, name: str, point: 'PointBase', flow_nums: Set[int]
1731-
) -> Tuple[int, Optional[str], bool]:
1740+
self, name: str, point: 'PointBase', flow_nums: 'FlowNums'
1741+
) -> tuple[int, str | None, bool]:
17321742
"""Get submit_num, status, flow_wait for point/name in flow_nums.
17331743
17341744
Args:
@@ -1766,7 +1776,10 @@ def _get_task_history(
17661776
return submit_num, status, flow_wait
17671777

17681778
def _load_historical_outputs(self, itask: 'TaskProxy') -> None:
1769-
"""Load a task's historical outputs from the DB."""
1779+
"""Load a task's historical outputs from the DB.
1780+
1781+
NOTE this creates a task_states/task_outputs DB entry if not present.
1782+
"""
17701783
info = self.workflow_db_mgr.pri_dao.select_task_outputs(
17711784
itask.tdef.name, str(itask.point))
17721785
if not info:
@@ -1803,9 +1816,9 @@ def spawn_task(
18031816
self,
18041817
name: str,
18051818
point: 'PointBase',
1806-
flow_nums: Set[int],
1819+
flow_nums: 'FlowNums',
18071820
flow_wait: bool = False,
1808-
) -> Optional[TaskProxy]:
1821+
) -> TaskProxy | None:
18091822
"""Return a new task proxy for the given flow if possible.
18101823
18111824
We need to hit the DB for:
@@ -1826,8 +1839,18 @@ def spawn_task(
18261839
self._get_task_history(name, point, flow_nums)
18271840
)
18281841

1842+
if (
1843+
not prev_status
1844+
and point < self.config.start_point
1845+
and flow_nums.issuperset({1})
1846+
# Warm start - treat pre-startcp tasks as already run in flow=1,
1847+
# unless manually triggered:
1848+
and (name, point) not in self.pre_start_tasks_to_trigger
1849+
):
1850+
return None
1851+
18291852
# Create the task proxy with any completed outputs loaded.
1830-
itask = self._get_task_proxy_db_outputs(
1853+
itask = self._load_db_task_proxy(
18311854
point,
18321855
self.config.get_taskdef(name),
18331856
flow_nums,
@@ -1925,7 +1948,7 @@ def _spawn_after_flow_wait(self, itask: TaskProxy) -> None:
19251948
self.workflow_db_mgr.put_update_task_flow_wait(itask)
19261949
return None
19271950

1928-
def _get_task_proxy_db_outputs(
1951+
def _load_db_task_proxy(
19291952
self,
19301953
point: 'PointBase',
19311954
taskdef: 'TaskDef',
@@ -1935,8 +1958,11 @@ def _get_task_proxy_db_outputs(
19351958
transient: bool = False,
19361959
is_manual_submit: bool = False,
19371960
submit_num: int = 0,
1938-
) -> Optional['TaskProxy']:
1939-
"""Spawn a task, update outputs from DB."""
1961+
) -> 'TaskProxy | None':
1962+
"""Spawn a task, update outputs from DB.
1963+
1964+
NOTE this creates a task_states/task_outputs DB entry if not present.
1965+
"""
19401966

19411967
if not self.can_be_spawned(taskdef.name, point):
19421968
return None
@@ -2140,7 +2166,7 @@ def set_prereqs_and_outputs(
21402166
no_op = False
21412167
else:
21422168
# Outputs (may be empty list)
2143-
trans = self._get_task_proxy_db_outputs(
2169+
trans = self._load_db_task_proxy(
21442170
icycle, tdef, flow_nums,
21452171
flow_wait=flow_wait, transient=True
21462172
)

cylc/flow/task_proxy.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,8 @@ def __init__(
218218
self,
219219
scheduler_tokens: 'Tokens',
220220
tdef: 'TaskDef',
221-
start_point: 'PointBase',
222-
flow_nums: Optional['FlowNums'] = None,
221+
point: 'PointBase',
222+
flow_nums: 'FlowNums | None' = None,
223223
status: str = TASK_STATUS_WAITING,
224224
is_held: bool = False,
225225
submit_num: int | None = 0,
@@ -242,7 +242,7 @@ def __init__(
242242
# (don't share flow_nums ref with parent task)
243243
self.flow_nums = flow_nums.copy()
244244
self.flow_wait = flow_wait
245-
self.point = start_point
245+
self.point = point
246246
self.tokens: TaskTokens = scheduler_tokens.duplicate(
247247
cycle=str(self.point),
248248
task=self.tdef.name,
@@ -294,7 +294,9 @@ def __init__(
294294
# Set xtrigger checking type, which effects parentless spawning.
295295
self.is_xtrigger_sequential = bool(
296296
sequential_xtrigger_labels
297-
and self.tdef.is_parentless(start_point)
297+
and self.tdef.is_parentless(
298+
self.point, cutoff=self.tdef.initial_point
299+
)
298300
and sequential_xtrigger_labels.intersection(self.state.xtriggers)
299301
)
300302

cylc/flow/task_trigger.py

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,11 @@ def get_prerequisite(
239239

240240
# Loop over TaskTrigger instances.
241241
for task_trigger in self.task_triggers:
242+
key = (
243+
task_trigger.get_point(point),
244+
task_trigger.task_name,
245+
task_trigger.output,
246+
)
242247
if task_trigger.cycle_point_offset is not None:
243248
# Compute trigger cycle point from offset.
244249
if task_trigger.offset_is_from_icp:
@@ -247,30 +252,26 @@ def get_prerequisite(
247252
else:
248253
prereq_offset_point = get_point_relative(
249254
task_trigger.cycle_point_offset, point)
255+
if prereq_offset_point < tdef.initial_point:
256+
# Pre-initial dependency - treat as satisfied
257+
cpre[key] = True
258+
continue
250259
if prereq_offset_point > point:
251260
# Update tdef.max_future_prereq_offset.
252261
prereq_offset = prereq_offset_point - point
253-
if (tdef.max_future_prereq_offset is None or
254-
(prereq_offset >
255-
tdef.max_future_prereq_offset)):
256-
tdef.max_future_prereq_offset = (
257-
prereq_offset)
258-
cpre[(
259-
task_trigger.get_point(point),
260-
task_trigger.task_name,
261-
task_trigger.output
262-
)] = (
263-
(prereq_offset_point < tdef.start_point) &
264-
(point >= tdef.start_point)
262+
if (
263+
tdef.max_future_prereq_offset is None
264+
or prereq_offset > tdef.max_future_prereq_offset
265+
):
266+
tdef.max_future_prereq_offset = prereq_offset
267+
cpre[key] = (
268+
prereq_offset_point < tdef.start_point
269+
and point >= tdef.start_point
265270
)
266271
else:
267272
# Trigger is within the same cycle point.
268273
# Register task message with Prerequisite object.
269-
cpre[(
270-
task_trigger.get_point(point),
271-
task_trigger.task_name,
272-
task_trigger.output,
273-
)] = False
274+
cpre[key] = False
274275
cpre.set_conditional_expr(self.get_expression(point))
275276
return cpre
276277

@@ -299,6 +300,14 @@ def __str__(self):
299300
ret.append('( %s )' % str(item))
300301
return ' '.join(ret)
301302

303+
def __repr__(self) -> str:
304+
"""
305+
>>> from unittest.mock import Mock
306+
>>> Dependency(exp=[Mock()], task_triggers=[Mock()], suicide=False)
307+
<Dependency ...>
308+
"""
309+
return f"<{type(self).__name__} {self}>"
310+
302311
@classmethod
303312
def _stringify_list(cls, nested_expr, point):
304313
"""Stringify a nested list of TaskTrigger objects."""

cylc/flow/taskdef.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -419,18 +419,24 @@ def next_point(self, point):
419419
p_next = min(adjusted)
420420
return p_next
421421

422-
def is_parentless(self, point):
423-
"""Return True if task has no parents at point.
422+
def is_parentless(self, point: 'PointBase', cutoff: 'PointBase') -> bool:
423+
"""Return True if task has no parents at the given point.
424424
425425
Tasks are considered parentless if they have:
426426
- no parents at all
427-
- all parents < initial cycle point
427+
- all parents < cutoff cycle point
428428
- only absolute triggers
429429
430430
Absolute-triggered tasks are auto-spawned like true parentless tasks,
431431
(once the trigger is satisfied they are effectively parentless) but
432432
with a prerequisite that gets satisfied when the absolute output is
433433
completed at runtime.
434+
435+
Args:
436+
point: The cycle point to check.
437+
cutoff: This should be the start cycle point for the startup
438+
spawning, or the intial cycle point for manually triggered
439+
tasks.
434440
"""
435441
if not self.graph_parents:
436442
# No parents at any point
@@ -441,7 +447,7 @@ def is_parentless(self, point):
441447
parent_points = self.get_parent_points(point)
442448
return (
443449
not parent_points
444-
or all(x < self.start_point for x in parent_points)
450+
or all(x < cutoff for x in parent_points)
445451
or self.has_only_abs_triggers(point)
446452
)
447453

0 commit comments

Comments
 (0)