Skip to content

Commit 33c5035

Browse files
authored
Merge pull request #6910 from hjoliver/group-trigger-followup
Group trigger: re-use already-completed outputs of active group start tasks.
1 parent 9352f61 commit 33c5035

File tree

4 files changed

+115
-42
lines changed

4 files changed

+115
-42
lines changed

changes.d/6910.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Trigger command: automatically handle already-completed outputs of triggered tasks.

cylc/flow/commands.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,8 @@ def _force_trigger_tasks(
735735

736736
warnings_flow_none = []
737737
warnings_has_job = []
738+
active_completed_outputs = {}
739+
738740
for itask in active:
739741
# Find active group start tasks (parentless, or with only off-group
740742
# prerequisites) and set all prerequisites (to trigger them now).
@@ -764,6 +766,12 @@ def _force_trigger_tasks(
764766
)
765767
continue
766768

769+
if itask.state(*TASK_STATUSES_ACTIVE):
770+
for (label, msg, completed) in itask.state.outputs:
771+
if completed:
772+
active_completed_outputs[
773+
(str(itask.point), itask.tdef.name)] = (label, msg)
774+
767775
if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
768776
# This is a live active group start task
769777
warnings_has_job.append(str(itask))
@@ -826,7 +834,8 @@ def _force_trigger_tasks(
826834
set_all=True # prerequisites
827835
)
828836
else:
829-
off_flow_prereqs = {
837+
# Off-flow prereqs to satisfy, for the triggered flow:
838+
prereqs_to_set = {
830839
PrereqTuple(str(key.point), str(key.task), key.output)
831840
for pre in tdef.get_prereqs(point)
832841
for key in pre.keys()
@@ -838,15 +847,24 @@ def _force_trigger_tasks(
838847
for key in pre.keys()
839848
if (key.task, str(key.point)) in group_ids
840849
)
850+
# Prereqs to satisfy, from already-completed outputs of active
851+
# group start tasks, for the triggered flow.
852+
prereqs_to_set.update({
853+
PrereqTuple(str(key.point), str(key.task), key.output)
854+
for pre in tdef.get_prereqs(point)
855+
for key in pre.keys()
856+
if (str(key.point), key.task) in active_completed_outputs
857+
})
858+
841859
if (
842-
off_flow_prereqs
860+
prereqs_to_set
843861
or tdef.get_xtrigs(point)
844862
or tdef.external_triggers
845863
):
846864
# Satisfy any off-group prereqs or ext/xtriggers to spawn task.
847865
jtask = schd.pool._set_prereqs_tdef(
848866
point, tdef,
849-
off_flow_prereqs,
867+
prereqs_to_set,
850868
{"all": True}, # xtriggers
851869
flow_nums,
852870
flow_wait,

cylc/flow/scripts/trigger.py

Lines changed: 14 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,34 +17,20 @@
1717

1818
"""cylc trigger [OPTIONS] ARGS
1919
20-
Manually trigger tasks, respecting dependencies among them.
21-
22-
Triggering individual tasks:
23-
* Triggering an unqueued task queues it, triggering a queued task runs it.
24-
So you many need to trigger an unqueued task twice to run it immediately.
25-
* Live tasks (preparing, submitted, or running) can't be triggered.
26-
* Triggered tasks can run even if the workflow is paused.
27-
28-
Triggering a group of tasks at once (e.g. members of a sub-graph):
29-
Cylc will automatically:
30-
* Erase the run history of members, so they can re-run in the same flow.
31-
* Identify group start tasks, and trigger them to start the flow.
32-
* Identify off-group dependencies, and satisfy them to avoid a stall.
33-
* Leave in-group dependencies to be satisfied by the triggered flow.
34-
35-
If the workflow is paused, group start tasks will trigger immediately; the
36-
flow will continue downstream of them when you resume the workflow.
37-
38-
Beware of triggering live (preparing, submitted, or running) tasks:
39-
* Live in-group tasks will be killed and their run history erased, to allow
40-
them to re-run in the triggered flow.
41-
* Live group-start tasks are left to run; they don't need to be retriggered.
42-
WARNING: if they already completed outputs that other group members depend
43-
on, you must manually satisfy those prerequisites again for the triggered
44-
flow (run history erasure wipes out the original satisfied prerequisites).
45-
46-
Flow number assignment in triggered tasks:
47-
Active tasks (n=0) already have flows assigned; inactive tasks (n>0) do not.
20+
Trigger a group of one or more tasks, respecting dependencies among them.
21+
22+
Prerequisites on tasks outside of the group will be satisfied automatically.
23+
24+
Tasks will be removed if necessary to allow re-run without intervention, so
25+
triggered tasks that are preparing, submitted, or running may be killed.
26+
27+
Tasks that lead into a group will run immediately even if the workflow is
28+
paused; activity will flow on from them once the workflow is resumed.
29+
30+
Triggering an unqueued task queues it; triggering a queued task runs it.
31+
32+
How flow numbers are assigned to triggered tasks:
33+
Active tasks (n=0) already have assigned flows; inactive tasks (n>0) do not.
4834
* If an interdependent group of triggered tasks includes active tasks, the
4935
flow will be assigned the existing flow numbers of those active tasks.
5036
* Otherwise the flow will be assigned all current active flow numbers.
@@ -63,16 +49,6 @@
6349
# rerun sub-graph "a => b & c" in the same flow, ignoring "off => b"
6450
$ cylc trigger test //1/a //1/b //1/c
6551
66-
Flow numbers of triggered tasks are determined as follows:
67-
Active tasks (n=0) already have existing flow numbers.
68-
* default: merge active and existing flow numbers
69-
* --flow=INT or "new": merge given and existing flow numbers
70-
* --flow="none": ERROR (not valid for already-active tasks)
71-
Inactive tasks (n>0) do not have flow numbers assigned:
72-
* default: run with all active flow numbers
73-
* --flow=INT or "new": run with the given flow numbers
74-
* --flow="none": run as no-flow (activity will not flow on downstream)
75-
7652
"""
7753

7854
from functools import partial

tests/integration/test_force_trigger.py

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@
3737
set_prereqs_and_outputs,
3838
)
3939
from cylc.flow.cycling.integer import IntegerPoint
40-
from cylc.flow.task_state import TASK_STATUS_WAITING
40+
from cylc.flow.task_state import (
41+
TASK_STATUS_WAITING,
42+
TASK_STATUS_RUNNING
43+
)
4144

4245

4346
async def test_trigger_workflow_paused(
@@ -625,6 +628,81 @@ async def test_trigger_n0_tasks(
625628
}
626629

627630

631+
async def test_replay_outputs(flow, scheduler, start, complete, log_filter):
632+
"""Triggered group start tasks re-emit (kind of) earlier outputs.
633+
634+
https://github.com/cylc/cylc-flow/issues/6858
635+
636+
Example graph:
637+
a:started => b => end
638+
k:kustom => l => end
639+
k:kustom => offg
640+
641+
If I trigger a, b, k, l AFTER a:started and k:kustom have completed:
642+
cylc trigger workflow //1/a //1/b //1/k //1/l
643+
644+
Then I should expect outputs `k:kustom` and `a:started` to be re-used
645+
to satify b and l in the triggered flow, but NOT off-group task offg.
646+
"""
647+
msg_prereq = '[1/{}:waiting(runahead)] prerequisite force-satisfied: 1/{}'
648+
msg_spawned = "[1/{}:waiting(runahead)] => waiting"
649+
msg_removed = "Removed tasks: 1/{}"
650+
651+
wid = flow({
652+
'scheduling': {
653+
'graph': {
654+
'R1': """
655+
a:started => b => end
656+
k:kustom => l => end
657+
k:kustom => offg
658+
"""
659+
}
660+
},
661+
'runtime': {
662+
'a': {},
663+
'k': {
664+
'outputs': {'kustom': 'custom message'}
665+
}
666+
}
667+
})
668+
schd = scheduler(wid, paused_start=True)
669+
async with start(schd):
670+
# Set initial tasks a and k to "running" so they are recognized as
671+
# live during the forthcoming trigger operation.
672+
673+
# Complete the a:started and k:kustom outputs.
674+
schd.pool.set_prereqs_and_outputs(
675+
['1/a'], ['started'], [], []
676+
)
677+
schd.pool.set_prereqs_and_outputs(
678+
['1/k'], ['kustom'], [], []
679+
)
680+
# It should spawn b, l, and offg.
681+
for task in ['b', 'l', 'offg']:
682+
assert log_filter(contains=msg_spawned.format(task))
683+
684+
# Set a and k as running so they're recognized as live start tasks
685+
# by the trigger operation.
686+
for itask in schd.pool.get_tasks():
687+
itask.state_reset(TASK_STATUS_RUNNING)
688+
689+
# Now trigger the group.
690+
await run_cmd(
691+
force_trigger_tasks(schd, ['1/a', '1/b', '1/k', '1/l'], [])
692+
)
693+
# It should remove b and l (in-group tasks)
694+
for task in ['b', 'l']:
695+
assert log_filter(contains=msg_removed.format(task))
696+
697+
# But they will be respawned immediately by re-satisfying dependence
698+
# on the earlier outputs of a and k for in-group tasks:
699+
assert log_filter(contains=msg_prereq.format('b', 'a:started'))
700+
assert log_filter(contains=msg_prereq.format('l', 'k:custom message'))
701+
# But not for the off-group task offg:
702+
assert not log_filter(
703+
contains=msg_prereq.format('offg', 'k:custom message'))
704+
705+
628706
async def test_trigger_with_sequential_task(flow, scheduler, run, log_filter):
629707
"""It should trigger a failed sequential task.
630708

0 commit comments

Comments
 (0)