Skip to content

Commit beae62f

Browse files
authored
feat(backend) implement retryStrategy for nested pipelines (kubeflow#11908)
Signed-off-by: agoins <[email protected]>
1 parent 53bb3a0 commit beae62f

16 files changed

+1885
-72
lines changed

backend/src/v2/compiler/argocompiler/argo_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,24 @@ func Test_argo_compiler(t *testing.T) {
101101
argoYAMLPath: "testdata/hello_world_cache_disabled.yaml",
102102
compilerOptions: argocompiler.Options{CacheDisabled: true},
103103
},
104+
// retry set at pipeline level only.
105+
{
106+
jobPath: "../testdata/nested_pipeline_pipeline_retry.json",
107+
platformSpecPath: "",
108+
argoYAMLPath: "testdata/nested_pipeline_pipeline_retry.yaml",
109+
},
110+
// retry set at component level only.
111+
{
112+
jobPath: "../testdata/nested_pipeline_sub_component_retry.json",
113+
platformSpecPath: "",
114+
argoYAMLPath: "testdata/nested_pipeline_sub_component_retry.yaml",
115+
},
116+
// retry set at both component and pipeline level.
117+
{
118+
jobPath: "../testdata/nested_pipeline_all_level_retry.json",
119+
platformSpecPath: "",
120+
argoYAMLPath: "testdata/nested_pipeline_all_level_retry.yaml",
121+
},
104122
}
105123
for _, tt := range tests {
106124
t.Run(fmt.Sprintf("%+v", tt), func(t *testing.T) {

backend/src/v2/compiler/argocompiler/container.go

Lines changed: 59 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -260,41 +260,47 @@ type containerExecutorInputs struct {
260260
// name: argo workflows DAG task name
261261
// The other arguments are argo workflows task parameters, they can be either a
262262
// string or a placeholder.
263-
func (c *workflowCompiler) containerExecutorTask(name string, inputs containerExecutorInputs, refName string) *wfapi.DAGTask {
263+
func (c *workflowCompiler) containerExecutorTask(name string, inputs containerExecutorInputs, task *pipelinespec.PipelineTaskSpec) *wfapi.DAGTask {
264264
when := ""
265265
if inputs.condition != "" {
266266
when = inputs.condition + " != false"
267267
}
268-
task := &wfapi.DAGTask{
268+
dagTask := &wfapi.DAGTask{
269269
Name: name,
270-
Template: c.addContainerExecutorTemplate(name, refName),
270+
Template: c.addContainerExecutorTemplate(task),
271271
When: when,
272272
Arguments: wfapi.Arguments{
273273
Parameters: append([]wfapi.Parameter{
274274
{Name: paramPodSpecPatch, Value: wfapi.AnyStringPtr(inputs.podSpecPatch)},
275-
{Name: paramCachedDecision, Value: wfapi.AnyStringPtr(inputs.cachedDecision), Default: wfapi.AnyStringPtr("false")}},
276-
c.getTaskRetryParametersWithValues(name)...,
275+
{Name: paramCachedDecision, Value: wfapi.AnyStringPtr(inputs.cachedDecision), Default: wfapi.AnyStringPtr("false")},
276+
},
277+
c.getTaskRetryParametersWithValues(task)...,
277278
),
278279
},
279280
}
280281

281-
addExitTask(task, inputs.exitTemplate, inputs.hookParentDagID)
282+
addExitTask(dagTask, inputs.exitTemplate, inputs.hookParentDagID)
282283

283-
return task
284+
return dagTask
284285
}
285286

286287
// addContainerExecutorTemplate adds a generic container executor template for
287288
// any container component task.
288289
// During runtime, it's expected that pod-spec-patch will specify command, args
289290
// and resources etc, that are different for different tasks.
290-
func (c *workflowCompiler) addContainerExecutorTemplate(name string, refName string) string {
291+
func (c *workflowCompiler) addContainerExecutorTemplate(task *pipelinespec.PipelineTaskSpec) string {
291292
// container template is parent of container implementation template
292293
nameContainerExecutor := "system-container-executor"
293294
nameContainerImpl := "system-container-impl"
294-
taskRetrySpec := c.getTaskRetryPolicySpec(name)
295+
taskRetrySpec := task.GetRetryPolicy()
296+
refName := task.GetComponentRef().GetName()
295297
if taskRetrySpec != nil {
296-
nameContainerExecutor = "retry-" + nameContainerExecutor
297-
nameContainerImpl = "retry-" + nameContainerImpl
298+
task := c.spec.Components[refName]
299+
// retry-system-container-executor/impl is used if retry is set at the component level (if task is not a DAG)
300+
if task != nil && task.GetDag() == nil {
301+
nameContainerExecutor = "retry-" + nameContainerExecutor
302+
nameContainerImpl = "retry-" + nameContainerImpl
303+
}
298304
}
299305
_, ok := c.templates[nameContainerExecutor]
300306
if ok {
@@ -305,8 +311,9 @@ func (c *workflowCompiler) addContainerExecutorTemplate(name string, refName str
305311
Inputs: wfapi.Inputs{
306312
Parameters: append([]wfapi.Parameter{
307313
{Name: paramPodSpecPatch},
308-
{Name: paramCachedDecision, Default: wfapi.AnyStringPtr("false")}},
309-
c.addParameterDefault(c.getTaskRetryParameters(name), "0")...,
314+
{Name: paramCachedDecision, Default: wfapi.AnyStringPtr("false")},
315+
},
316+
c.addParameterDefault(c.getTaskRetryParameters(task), "0")...,
310317
),
311318
},
312319
DAG: &wfapi.DAGTemplate{
@@ -316,8 +323,9 @@ func (c *workflowCompiler) addContainerExecutorTemplate(name string, refName str
316323
Arguments: wfapi.Arguments{
317324
Parameters: append([]wfapi.Parameter{{
318325
Name: paramPodSpecPatch,
319-
Value: wfapi.AnyStringPtr(inputParameter(paramPodSpecPatch))}},
320-
c.addParameterInputPath(c.getTaskRetryParameters(name))...,
326+
Value: wfapi.AnyStringPtr(inputParameter(paramPodSpecPatch))},
327+
},
328+
c.addParameterInputPath(c.getTaskRetryParameters(task))...,
321329
),
322330
},
323331
// When cached decision is true, the container
@@ -347,8 +355,9 @@ func (c *workflowCompiler) addContainerExecutorTemplate(name string, refName str
347355
Name: nameContainerImpl,
348356
Inputs: wfapi.Inputs{
349357
Parameters: append([]wfapi.Parameter{
350-
{Name: paramPodSpecPatch}},
351-
c.getTaskRetryParameters(name)...),
358+
{Name: paramPodSpecPatch},
359+
},
360+
c.getTaskRetryParameters(task)...),
352361
},
353362
// PodSpecPatch input param is where actual image, command and
354363
// args come from. It is treated as a strategic merge patch on
@@ -530,34 +539,23 @@ func (c *workflowCompiler) addContainerExecutorTemplate(name string, refName str
530539
return nameContainerExecutor
531540
}
532541

533-
func (c *workflowCompiler) getTaskRetryPolicySpec(name string) *pipelinespec.PipelineTaskSpec_RetryPolicy {
534-
if c.spec == nil || c.spec.Root == nil || c.spec.Root.GetDag() == nil {
535-
return nil
536-
}
537-
rootDag := c.spec.Root.GetDag()
538-
taskSpec := rootDag.Tasks[name]
539-
if taskSpec == nil {
540-
return nil
541-
}
542-
return taskSpec.RetryPolicy
543-
}
544-
545-
func (c *workflowCompiler) getTaskRetryParameters(name string) []wfapi.Parameter {
546-
retryPolicy := c.getTaskRetryPolicySpec(name)
547-
if retryPolicy == nil {
542+
func (c *workflowCompiler) getTaskRetryParameters(task *pipelinespec.PipelineTaskSpec) []wfapi.Parameter {
543+
if task != nil && task.RetryPolicy != nil {
544+
// Create slice of parameters with "Name" field set.
545+
parameters := []wfapi.Parameter{
546+
{Name: paramRetryMaxCount},
547+
{Name: paramRetryBackOffDuration},
548+
{Name: paramRetryBackOffFactor},
549+
{Name: paramRetryBackOffMaxDuration},
550+
}
551+
return parameters
552+
} else {
548553
return nil
549554
}
550-
// Create slice of parameters with "Name" field set.
551-
parameters := []wfapi.Parameter{
552-
{Name: paramRetryMaxCount},
553-
{Name: paramRetryBackOffDuration},
554-
{Name: paramRetryBackOffFactor},
555-
{Name: paramRetryBackOffMaxDuration}}
556-
return parameters
557555
}
558556

559-
func (c *workflowCompiler) getTaskRetryParametersWithValues(name string) []wfapi.Parameter {
560-
retryPolicy := c.getTaskRetryPolicySpec(name)
557+
func (c *workflowCompiler) getTaskRetryParametersWithValues(task *pipelinespec.PipelineTaskSpec) []wfapi.Parameter {
558+
retryPolicy := task.GetRetryPolicy()
561559
if retryPolicy == nil {
562560
return []wfapi.Parameter{}
563561
}
@@ -609,6 +607,26 @@ func (c *workflowCompiler) addParameterInputPath(parameters []wfapi.Parameter) [
609607
return parameters
610608
}
611609

610+
func (c *workflowCompiler) getTaskRetryStrategyFromInput(maxCount string, backOffDuration string, backOffFactor string,
611+
backOffMaxDuration string) *wfapi.RetryStrategy {
612+
backoff := &wfapi.Backoff{
613+
Factor: &intstr.IntOrString{
614+
Type: intstr.String,
615+
StrVal: backOffFactor,
616+
},
617+
MaxDuration: backOffMaxDuration,
618+
Duration: backOffDuration,
619+
}
620+
621+
return &wfapi.RetryStrategy{
622+
Limit: &intstr.IntOrString{
623+
Type: intstr.String,
624+
StrVal: maxCount,
625+
},
626+
Backoff: backoff,
627+
}
628+
}
629+
612630
// Extends the PodMetadata to include Kubernetes-specific executor config.
613631
// Although the current podMetadata object is always empty, this function
614632
// doesn't overwrite the existing podMetadata because for security reasons
@@ -638,26 +656,6 @@ func extendPodMetadata(
638656
}
639657
}
640658

641-
func (c *workflowCompiler) getTaskRetryStrategyFromInput(maxCount string, backOffDuration string, backOffFactor string,
642-
backOffMaxDuration string) *wfapi.RetryStrategy {
643-
backoff := &wfapi.Backoff{
644-
Factor: &intstr.IntOrString{
645-
Type: intstr.String,
646-
StrVal: backOffFactor,
647-
},
648-
MaxDuration: backOffMaxDuration,
649-
Duration: backOffDuration,
650-
}
651-
652-
return &wfapi.RetryStrategy{
653-
Limit: &intstr.IntOrString{
654-
Type: intstr.String,
655-
StrVal: maxCount,
656-
},
657-
Backoff: backoff,
658-
}
659-
}
660-
661659
// Extends metadata map values, highPriorityMap should overwrites lowPriorityMap values
662660
// The original Map inputs should have higher priority since its defined by admin
663661
// TODO: Use maps.Copy after moving to go 1.21+

backend/src/v2/compiler/argocompiler/container_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"os"
1919
"testing"
2020

21+
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
22+
2123
wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
2224
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
2325
"github.com/stretchr/testify/assert"
@@ -59,7 +61,7 @@ func TestAddContainerExecutorTemplate(t *testing.T) {
5961
},
6062
}
6163

62-
c.addContainerExecutorTemplate("test-ref", "comp-test-ref")
64+
c.addContainerExecutorTemplate(&pipelinespec.PipelineTaskSpec{ComponentRef: &pipelinespec.ComponentRef{Name: "comp-test-ref"}})
6365
assert.NotEmpty(t, "system-container-impl", "Template name should not be empty")
6466

6567
executorTemplate, exists := c.templates["system-container-impl"]

backend/src/v2/compiler/argocompiler/dag.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
311311
condition: driverOutputs.condition,
312312
exitTemplate: inputs.exitTemplate,
313313
hookParentDagID: inputs.parentDagID,
314-
}, task.GetComponentRef().GetName())
314+
}, task)
315315
executor.Depends = depends([]string{driverTaskName})
316316
return []wfapi.DAGTask{*driver, *executor}, nil
317317
case *pipelinespec.PipelineDeploymentConfig_ExecutorSpec_Importer:

0 commit comments

Comments
 (0)