@@ -24,7 +24,6 @@ import (
2424 wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
2525 "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
2626 "github.com/kubeflow/pipelines/backend/src/v2/compiler"
27- k8score "k8s.io/api/core/v1"
2827 "k8s.io/apimachinery/pkg/util/intstr"
2928)
3029
@@ -283,7 +282,7 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
283282 driverTaskName := name + "-driver"
284283 // The following call will return an empty string for tasks without kubernetes-specific annotation.
285284 kubernetesConfigPlaceholder , _ := c .useKubernetesImpl (componentName )
286- driver , driverOutputs := c .containerDriverTask (driverTaskName , containerDriverInputs {
285+ driver , driverOutputs , err := c .containerDriverTask (driverTaskName , containerDriverInputs {
287286 component : componentSpecPlaceholder ,
288287 task : taskSpecJson ,
289288 container : containerPlaceholder ,
@@ -292,6 +291,9 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
292291 kubernetesConfig : kubernetesConfigPlaceholder ,
293292 taskName : name ,
294293 })
294+ if err != nil {
295+ return nil , err
296+ }
295297 if task .GetTriggerPolicy ().GetCondition () == "" {
296298 driverOutputs .condition = ""
297299 }
@@ -531,9 +533,13 @@ func (c *workflowCompiler) dagDriverTask(name string, inputs dagDriverInputs) (*
531533 Value : wfapi .AnyStringPtr (inputs .taskName ),
532534 })
533535 }
536+ dagDriverTemplate , err := c .addDAGDriverTemplate ()
537+ if err != nil {
538+ return nil , nil , err
539+ }
534540 t := & wfapi.DAGTask {
535541 Name : name ,
536- Template : c . addDAGDriverTemplate () ,
542+ Template : dagDriverTemplate ,
537543 Arguments : wfapi.Arguments {
538544 Parameters : params ,
539545 },
@@ -545,40 +551,40 @@ func (c *workflowCompiler) dagDriverTask(name string, inputs dagDriverInputs) (*
545551 }, nil
546552}
547553
548- func (c * workflowCompiler ) addDAGDriverTemplate () string {
554+ func (c * workflowCompiler ) addDAGDriverTemplate () ( string , error ) {
549555 name := "system-dag-driver"
550556 _ , ok := c .templates [name ]
551557 if ok {
552- return name
553- }
554-
555- args := [] string {
556- "--type" , inputValue ( paramDriverType ),
557- "--pipeline_name" , c . spec . GetPipelineInfo (). GetName (),
558- "--run_id" , runID (),
559- "--run_name" , runResourceName ( ),
560- "--run_display_name" , c . job . DisplayName ,
561- "--dag_execution_id" , inputValue ( paramParentDagID ),
562- "--component" , inputValue ( paramComponent ),
563- "--task" , inputValue ( paramTask ) ,
564- "--task_name" , inputValue (paramTaskName ),
565- "--runtime_config" , inputValue (paramRuntimeConfig ),
566- "--iteration_index" , inputValue (paramIterationIndex ),
567- "--execution_id_path" , outputPath ( paramExecutionID ),
568- "--iteration_count_path" , outputPath ( paramIterationCount ),
569- "--condition_path" , outputPath ( paramCondition ),
570- "--http_proxy" , proxy . GetConfig (). GetHttpProxy ( ),
571- "--https_proxy" , proxy . GetConfig (). GetHttpsProxy ( ),
572- "--no_proxy" , proxy . GetConfig (). GetNoProxy ( ),
573- }
574- if c . cacheDisabled {
575- args = append ( args , "--cache_disabled" )
576- }
577- if value , ok := os . LookupEnv ( PipelineLogLevelEnvVar ); ok {
578- args = append ( args , "--log_level" , value )
579- }
580- if value , ok := os . LookupEnv ( PublishLogsEnvVar ); ok {
581- args = append ( args , "--publish_logs" , value )
558+ return name , nil
559+ }
560+
561+ logLevel , _ := os . LookupEnv ( PipelineLogLevelEnvVar )
562+ publishLogs , _ := os . LookupEnv ( PublishLogsEnvVar )
563+
564+ driverPlugin , err := driverPlugin ( map [ string ] interface {}{
565+ "type" : inputValue ( paramDriverType ),
566+ "pipeline_name" : c . spec . GetPipelineInfo (). GetName () ,
567+ "run_id" : runID ( ),
568+ "run_name" : runResourceName ( ),
569+ "run_display_name" : c . job . DisplayName ,
570+ "dag_execution_id" : inputValue (paramParentDagID ),
571+ "component" : inputValue (paramComponent ),
572+ "task" : inputValue (paramTask ),
573+ "task_name" : inputValue ( paramTaskName ),
574+ "runtime_config" : inputValue ( paramRuntimeConfig ),
575+ "iteration_index" : inputValue ( paramIterationIndex ),
576+ "execution_id_path" : outputPath ( paramExecutionID ),
577+ "iteration_count_path" : outputPath ( paramIterationCount ),
578+ "condition_path" : outputPath ( paramCondition ),
579+ "http_proxy" : proxy . GetConfig (). GetHttpProxy (),
580+ "https_proxy" : proxy . GetConfig (). GetHttpsProxy (),
581+ "no_proxy" : proxy . GetConfig (). GetNoProxy (),
582+ "cache_disabled" : c . cacheDisabled ,
583+ "log_level" : logLevel ,
584+ "publish_logs" : publishLogs ,
585+ })
586+ if err != nil {
587+ return "" , fmt . Errorf ( "failed to create dag driver template: %w" , err )
582588 }
583589
584590 t := & wfapi.Template {
@@ -596,22 +602,16 @@ func (c *workflowCompiler) addDAGDriverTemplate() string {
596602 },
597603 Outputs : wfapi.Outputs {
598604 Parameters : []wfapi.Parameter {
599- {Name : paramExecutionID , ValueFrom : & wfapi.ValueFrom {Path : "/tmp/outputs/ execution-id" }},
600- {Name : paramIterationCount , ValueFrom : & wfapi.ValueFrom {Path : "/tmp/outputs/ iteration-count" , Default : wfapi .AnyStringPtr ("0" )}},
601- {Name : paramCondition , ValueFrom : & wfapi.ValueFrom {Path : "/tmp/outputs/ condition" , Default : wfapi .AnyStringPtr ("true" )}},
605+ {Name : paramExecutionID , ValueFrom : & wfapi.ValueFrom {JSONPath : "$. execution-id" }},
606+ {Name : paramIterationCount , ValueFrom : & wfapi.ValueFrom {JSONPath : "$. iteration-count" , Default : wfapi .AnyStringPtr ("0" )}},
607+ {Name : paramCondition , ValueFrom : & wfapi.ValueFrom {JSONPath : "$. condition" , Default : wfapi .AnyStringPtr ("true" )}},
602608 },
603609 },
604- Container : & k8score.Container {
605- Image : c .driverImage ,
606- Command : c .driverCommand ,
607- Args : args ,
608- Resources : driverResources ,
609- Env : proxy .GetConfig ().GetEnvVars (),
610- },
610+ Plugin : driverPlugin ,
611611 }
612612 c .templates [name ] = t
613613 c .wf .Spec .Templates = append (c .wf .Spec .Templates , * t )
614- return name
614+ return name , nil
615615}
616616
617617func addImplicitDependencies (dagSpec * pipelinespec.DagSpec ) error {
0 commit comments