Skip to content

Commit 8a94f2e

Browse files
jswxstwJoibel
authored andcommitted
fix: Set default value to output parameters if suspend node timeout. Fixes argoproj#12230 (argoproj#12960)
Signed-off-by: oninowang <[email protected]> (cherry picked from commit 3df05eb)
1 parent 1a3a5c2 commit 8a94f2e

File tree

3 files changed

+150
-11
lines changed

3 files changed

+150
-11
lines changed

test/e2e/suspend_test.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
//go:build functional
2+
// +build functional
3+
4+
package e2e
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
apiv1 "k8s.io/api/core/v1"
11+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
13+
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
14+
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
15+
)
16+
17+
type TestSuspendSitue struct {
18+
fixtures.E2ESuite
19+
}
20+
21+
func (s *TestSuspendSitue) TestSuspendNodeTimeoutWithoutDefaultValue() {
22+
s.Given().Workflow(`
23+
apiVersion: argoproj.io/v1alpha1
24+
kind: Workflow
25+
metadata:
26+
name: suspend-node-timeout-without-default-value
27+
spec:
28+
entrypoint: suspend
29+
templates:
30+
- name: suspend
31+
steps:
32+
- - name: approve
33+
template: approve
34+
- - name: release
35+
template: whalesay
36+
arguments:
37+
parameters:
38+
- name: message
39+
value: "{{steps.approve.outputs.parameters.message}}"
40+
- name: approve
41+
suspend:
42+
duration: 5s
43+
outputs:
44+
parameters:
45+
- name: message
46+
valueFrom:
47+
supplied: {}
48+
- name: whalesay
49+
inputs:
50+
parameters:
51+
- name: message
52+
container:
53+
image: docker/whalesay
54+
command: [cowsay]
55+
args: ["{{inputs.parameters.message}}"]
56+
`).
57+
When().
58+
SubmitWorkflow().
59+
WaitForWorkflow(fixtures.ToBeFailed).
60+
Then().
61+
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
62+
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
63+
assert.Contains(t, "raw output parameter 'message' has not been set and does not have a default value", status.Message)
64+
})
65+
}
66+
67+
func (s *TestSuspendSitue) TestSuspendNodeTimeoutWithDefaultValue() {
68+
s.Given().Workflow(`
69+
apiVersion: argoproj.io/v1alpha1
70+
kind: Workflow
71+
metadata:
72+
name: suspend-node-timeout-with-default-value
73+
spec:
74+
entrypoint: suspend
75+
templates:
76+
- name: suspend
77+
steps:
78+
- - name: approve
79+
template: approve
80+
- - name: release
81+
template: whalesay
82+
arguments:
83+
parameters:
84+
- name: message
85+
value: "{{steps.approve.outputs.parameters.message}}"
86+
- name: approve
87+
suspend:
88+
duration: 5s
89+
outputs:
90+
parameters:
91+
- name: message
92+
valueFrom:
93+
default: default message
94+
supplied: {}
95+
- name: whalesay
96+
inputs:
97+
parameters:
98+
- name: message
99+
container:
100+
image: docker/whalesay
101+
command: [cowsay]
102+
args: ["{{inputs.parameters.message}}"]
103+
`).
104+
When().
105+
SubmitWorkflow().
106+
WaitForWorkflow(fixtures.ToBeSucceeded).
107+
Then().
108+
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
109+
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
110+
assert.Equal(t, status.Progress, wfv1.Progress("2/2"))
111+
}).
112+
ExpectWorkflowNode(func(status wfv1.NodeStatus) bool {
113+
return status.Name == "suspend-node-timeout-with-default-value[0].approve"
114+
}, func(t *testing.T, status *wfv1.NodeStatus, pod *apiv1.Pod) {
115+
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
116+
assert.Equal(t, 1, len(status.Outputs.Parameters))
117+
assert.Equal(t, "message", status.Outputs.Parameters[0].Name)
118+
assert.Equal(t, wfv1.AnyStringPtr("default message"), status.Outputs.Parameters[0].Value)
119+
}).
120+
ExpectWorkflowNode(func(status wfv1.NodeStatus) bool {
121+
return status.Name == "suspend-node-timeout-with-default-value[1].release"
122+
}, func(t *testing.T, status *wfv1.NodeStatus, pod *apiv1.Pod) {
123+
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
124+
assert.Equal(t, 1, len(status.Inputs.Parameters))
125+
assert.Equal(t, "message", status.Inputs.Parameters[0].Name)
126+
assert.Equal(t, wfv1.AnyStringPtr("default message"), status.Inputs.Parameters[0].Value)
127+
})
128+
}

workflow/controller/operator.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3455,6 +3455,9 @@ func (woc *wfOperationCtx) executeSuspend(nodeName string, templateScope string,
34553455
if time.Now().UTC().After(suspendDeadline) {
34563456
// Suspension is expired, node can be resumed
34573457
woc.log.Infof("auto resuming node %s", nodeName)
3458+
if err := wfutil.OverrideOutputParametersWithDefault(node.Outputs); err != nil {
3459+
return node, err
3460+
}
34583461
_ = woc.markNodePhase(nodeName, wfv1.NodeSucceeded)
34593462
return node, nil
34603463
}

workflow/util/util.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,23 @@ func SuspendWorkflow(ctx context.Context, wfIf v1alpha1.WorkflowInterface, workf
377377
return err
378378
}
379379

380+
func OverrideOutputParametersWithDefault(outputs *wfv1.Outputs) error {
381+
if outputs == nil {
382+
return nil
383+
}
384+
for i, param := range outputs.Parameters {
385+
if param.ValueFrom != nil && param.ValueFrom.Supplied != nil {
386+
if param.ValueFrom.Default != nil {
387+
outputs.Parameters[i].Value = param.ValueFrom.Default
388+
outputs.Parameters[i].ValueFrom = nil
389+
} else {
390+
return fmt.Errorf("raw output parameter '%s' has not been set and does not have a default value", param.Name)
391+
}
392+
}
393+
}
394+
return nil
395+
}
396+
380397
// ResumeWorkflow resumes a workflow by setting spec.suspend to nil and any suspended nodes to Successful.
381398
// Retries conflict errors
382399
func ResumeWorkflow(ctx context.Context, wfIf v1alpha1.WorkflowInterface, hydrator hydrator.Interface, workflowName string, nodeFieldSelector string) error {
@@ -408,17 +425,8 @@ func ResumeWorkflow(ctx context.Context, wfIf v1alpha1.WorkflowInterface, hydrat
408425
// To resume a workflow with a suspended node we simply mark the node as Successful
409426
for nodeID, node := range wf.Status.Nodes {
410427
if node.IsActiveSuspendNode() {
411-
if node.Outputs != nil {
412-
for i, param := range node.Outputs.Parameters {
413-
if param.ValueFrom != nil && param.ValueFrom.Supplied != nil {
414-
if param.ValueFrom.Default != nil {
415-
node.Outputs.Parameters[i].Value = param.ValueFrom.Default
416-
node.Outputs.Parameters[i].ValueFrom = nil
417-
} else {
418-
return false, fmt.Errorf("raw output parameter '%s' has not been set and does not have a default value", param.Name)
419-
}
420-
}
421-
}
428+
if err := OverrideOutputParametersWithDefault(node.Outputs); err != nil {
429+
return false, err
422430
}
423431
node.Phase = wfv1.NodeSucceeded
424432
if node.Message != "" {

0 commit comments

Comments
 (0)