Skip to content

Commit 254c730

Browse files
smile-luobinsarabala1979
authored andcommitted
fix: Fix bugs, unable to resolve tasks aggregated outputs in dag outputs. Fixes argoproj#6684 (argoproj#6692)
Signed-off-by: smile-luobin <[email protected]>
1 parent 9653099 commit 254c730

File tree

4 files changed

+276
-2
lines changed

4 files changed

+276
-2
lines changed

workflow/controller/dag.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,19 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
276276
// Can happen when dag.target was specified
277277
continue
278278
}
279-
woc.buildLocalScope(scope, fmt.Sprintf("tasks.%s", task.Name), taskNode)
279+
280+
prefix := fmt.Sprintf("tasks.%s", task.Name)
281+
if taskNode.Type == wfv1.NodeTypeTaskGroup {
282+
childNodes := make([]wfv1.NodeStatus, len(taskNode.Children))
283+
for i, childID := range taskNode.Children {
284+
childNodes[i] = woc.wf.Status.Nodes[childID]
285+
}
286+
err := woc.processAggregateNodeOutputs(scope, prefix, childNodes)
287+
if err != nil {
288+
return nil, errors.InternalWrapError(err)
289+
}
290+
}
291+
woc.buildLocalScope(scope, prefix, taskNode)
280292
woc.addOutputsToGlobalScope(taskNode.Outputs)
281293
}
282294
outputs, err := getTemplateOutputsFromScope(tmpl, scope)

workflow/controller/dag_test.go

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3150,3 +3150,174 @@ func TestLeafContinueOn(t *testing.T) {
31503150
woc.operate(ctx)
31513151
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
31523152
}
3153+
3154+
var dagOutputsReferTaskAggregatedOuputs = `
3155+
apiVersion: argoproj.io/v1alpha1
3156+
kind: Workflow
3157+
metadata:
3158+
name: parameter-aggregation-dag-h8b82
3159+
spec:
3160+
3161+
entrypoint: parameter-aggregation
3162+
templates:
3163+
-
3164+
dag:
3165+
tasks:
3166+
- arguments:
3167+
parameters:
3168+
- name: num
3169+
value: '{{item}}'
3170+
name: odd-or-even
3171+
template: odd-or-even
3172+
withItems:
3173+
- 1
3174+
- 2
3175+
inputs: {}
3176+
metadata: {}
3177+
name: parameter-aggregation
3178+
outputs:
3179+
parameters:
3180+
- name: dag-nums
3181+
valueFrom:
3182+
parameter: '{{tasks.odd-or-even.outputs.parameters.num}}'
3183+
- name: dag-evenness
3184+
valueFrom:
3185+
parameter: '{{tasks.odd-or-even.outputs.parameters.evenness}}'
3186+
-
3187+
container:
3188+
args:
3189+
- |
3190+
sleep 1 &&
3191+
echo {{inputs.parameters.num}} > /tmp/num &&
3192+
if [ $(({{inputs.parameters.num}}%2)) -eq 0 ]; then
3193+
echo "even" > /tmp/even;
3194+
else
3195+
echo "odd" > /tmp/even;
3196+
fi
3197+
command:
3198+
- sh
3199+
- -c
3200+
image: alpine:latest
3201+
name: ""
3202+
resources: {}
3203+
inputs:
3204+
parameters:
3205+
- name: num
3206+
metadata: {}
3207+
name: odd-or-even
3208+
outputs:
3209+
parameters:
3210+
- name: num
3211+
valueFrom:
3212+
path: /tmp/num
3213+
- name: evenness
3214+
valueFrom:
3215+
path: /tmp/even
3216+
status:
3217+
nodes:
3218+
parameter-aggregation-dag-h8b82:
3219+
children:
3220+
- parameter-aggregation-dag-h8b82-3379492521
3221+
displayName: parameter-aggregation-dag-h8b82
3222+
finishedAt: "2020-12-09T15:37:07Z"
3223+
id: parameter-aggregation-dag-h8b82
3224+
name: parameter-aggregation-dag-h8b82
3225+
outboundNodes:
3226+
- parameter-aggregation-dag-h8b82-3175470584
3227+
- parameter-aggregation-dag-h8b82-2243926302
3228+
phase: Running
3229+
startedAt: "2020-12-09T15:36:46Z"
3230+
templateName: parameter-aggregation
3231+
templateScope: local/parameter-aggregation-dag-h8b82
3232+
type: DAG
3233+
parameter-aggregation-dag-h8b82-1440345089:
3234+
boundaryID: parameter-aggregation-dag-h8b82
3235+
displayName: odd-or-even(1:2)
3236+
finishedAt: "2020-12-09T15:36:54Z"
3237+
hostNodeName: minikube
3238+
id: parameter-aggregation-dag-h8b82-1440345089
3239+
inputs:
3240+
parameters:
3241+
- name: num
3242+
value: "2"
3243+
name: parameter-aggregation-dag-h8b82.odd-or-even(1:2)
3244+
outputs:
3245+
exitCode: "0"
3246+
parameters:
3247+
- name: num
3248+
value: "2"
3249+
valueFrom:
3250+
path: /tmp/num
3251+
- name: evenness
3252+
value: even
3253+
valueFrom:
3254+
path: /tmp/even
3255+
phase: Succeeded
3256+
startedAt: "2020-12-09T15:36:46Z"
3257+
templateName: odd-or-even
3258+
templateScope: local/parameter-aggregation-dag-h8b82
3259+
type: Pod
3260+
parameter-aggregation-dag-h8b82-3379492521:
3261+
boundaryID: parameter-aggregation-dag-h8b82
3262+
children:
3263+
- parameter-aggregation-dag-h8b82-3572919299
3264+
- parameter-aggregation-dag-h8b82-1440345089
3265+
displayName: odd-or-even
3266+
finishedAt: "2020-12-09T15:36:55Z"
3267+
id: parameter-aggregation-dag-h8b82-3379492521
3268+
name: parameter-aggregation-dag-h8b82.odd-or-even
3269+
phase: Succeeded
3270+
startedAt: "2020-12-09T15:36:46Z"
3271+
templateName: odd-or-even
3272+
templateScope: local/parameter-aggregation-dag-h8b82
3273+
type: TaskGroup
3274+
parameter-aggregation-dag-h8b82-3572919299:
3275+
boundaryID: parameter-aggregation-dag-h8b82
3276+
displayName: odd-or-even(0:1)
3277+
finishedAt: "2020-12-09T15:36:53Z"
3278+
hostNodeName: minikube
3279+
id: parameter-aggregation-dag-h8b82-3572919299
3280+
inputs:
3281+
parameters:
3282+
- name: num
3283+
value: "1"
3284+
name: parameter-aggregation-dag-h8b82.odd-or-even(0:1)
3285+
outputs:
3286+
exitCode: "0"
3287+
parameters:
3288+
- name: num
3289+
value: "1"
3290+
valueFrom:
3291+
path: /tmp/num
3292+
- name: evenness
3293+
value: odd
3294+
valueFrom:
3295+
path: /tmp/even
3296+
phase: Succeeded
3297+
startedAt: "2020-12-09T15:36:46Z"
3298+
templateName: odd-or-even
3299+
templateScope: local/parameter-aggregation-dag-h8b82
3300+
type: Pod
3301+
phase: Succeeded
3302+
startedAt: "2020-12-09T15:36:46Z"
3303+
`
3304+
3305+
func TestDAGReferTaskAggregatedOutputs(t *testing.T) {
3306+
wf := wfv1.MustUnmarshalWorkflow(dagOutputsReferTaskAggregatedOuputs)
3307+
cancel, controller := newController(wf)
3308+
defer cancel()
3309+
3310+
ctx := context.Background()
3311+
woc := newWorkflowOperationCtx(wf, controller)
3312+
woc.operate(ctx)
3313+
3314+
dagNode := woc.wf.Status.Nodes.FindByDisplayName("parameter-aggregation-dag-h8b82")
3315+
if assert.NotNil(t, dagNode) {
3316+
if assert.NotNil(t, dagNode.Outputs) {
3317+
if assert.Len(t, dagNode.Outputs.Parameters, 2) {
3318+
assert.Equal(t, `["1","2"]`, dagNode.Outputs.Parameters[0].Value.String())
3319+
assert.Equal(t, `["odd","even"]`, dagNode.Outputs.Parameters[1].Value.String())
3320+
}
3321+
}
3322+
}
3323+
}

workflow/validate/validate.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1199,7 +1199,8 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl
11991199
resolvedTemplates[task.Name] = resolvedTmpl
12001200

12011201
prefix := fmt.Sprintf("tasks.%s", task.Name)
1202-
ctx.addOutputsToScope(resolvedTmpl, prefix, scope, false, false)
1202+
aggregate := len(task.WithItems) > 0 || task.WithParam != ""
1203+
ctx.addOutputsToScope(resolvedTmpl, prefix, scope, aggregate, false)
12031204

12041205
err = common.ValidateTaskResults(&task)
12051206
if err != nil {

workflow/validate/validate_dag_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -943,3 +943,93 @@ func TestDAGWithDigitNameNoDepends(t *testing.T) {
943943
_, err := validate(dagWithDigitNoDepends)
944944
assert.NoError(t, err)
945945
}
946+
947+
var dagOutputsResolveTaskAggregatedOutputs = `
948+
apiVersion: argoproj.io/v1alpha1
949+
kind: Workflow
950+
metadata:
951+
generateName: loops-
952+
spec:
953+
serviceAccountName: argo
954+
entrypoint: dag
955+
templates:
956+
- name: dag
957+
dag:
958+
tasks:
959+
- name: fanout
960+
template: fanout
961+
arguments:
962+
parameters:
963+
- name: input
964+
value: "[1, 2]"
965+
- name: dag-process
966+
template: sub-dag
967+
depends: fanout
968+
arguments:
969+
parameters:
970+
- name: item
971+
value: '{{item}}'
972+
- name: input
973+
value: '{{tasks.fanout.outputs.parameters.output}}'
974+
withParam: "{{tasks.fanout.outputs.parameters.output}}"
975+
976+
- name: sub-dag
977+
inputs:
978+
parameters:
979+
- name: input
980+
- name: item
981+
outputs:
982+
parameters:
983+
- name: output
984+
valueFrom:
985+
parameter: "{{tasks.process.outputs.parameters}}"
986+
dag:
987+
tasks:
988+
- name: fanout
989+
template: fanout
990+
arguments:
991+
parameters:
992+
- name: input
993+
value: '{{inputs.parameters.input}}'
994+
- name: process
995+
template: process
996+
depends: fanout
997+
arguments:
998+
parameters:
999+
- name: item
1000+
value: '{{item}}'
1001+
withParam: "{{tasks.fanout.outputs.parameters.output}}"
1002+
1003+
- name: fanout
1004+
inputs:
1005+
parameters:
1006+
- name: input
1007+
container:
1008+
image: docker/whalesay:latest
1009+
command: [sh, -c]
1010+
args: ["echo {{inputs.parameters.input}} | tee /tmp/output"]
1011+
outputs:
1012+
parameters:
1013+
- name: output
1014+
valueFrom:
1015+
path: /tmp/output
1016+
1017+
- name: process
1018+
inputs:
1019+
parameters:
1020+
- name: item
1021+
container:
1022+
image: docker/whalesay:latest
1023+
command: [sh, -c]
1024+
args: ["echo {{inputs.parameters.item}} | tee /tmp/output"]
1025+
outputs:
1026+
parameters:
1027+
- name: output
1028+
valueFrom:
1029+
path: /tmp/output
1030+
`
1031+
1032+
func TestDAGOutputsResolveTaskAggregatedOutputs(t *testing.T) {
1033+
_, err := validate(dagOutputsResolveTaskAggregatedOutputs)
1034+
assert.NoError(t, err)
1035+
}

0 commit comments

Comments
 (0)