Skip to content

Commit 3337b5e

Browse files
ntnyarpechenin
andauthored
- fix(launcher): missing executorInput parameter values caused by {{$}} evaluation order (kubeflow#11925)
Signed-off-by: arpechenin <[email protected]> Co-authored-by: arpechenin <[email protected]>
1 parent 67f9b7d commit 3337b5e

File tree

2 files changed

+101
-16
lines changed

2 files changed

+101
-16
lines changed

backend/src/v2/component/launcher_v2.go

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -364,26 +364,16 @@ func executeV2(
364364
}
365365

366366
// Fill in placeholders with runtime values.
367-
placeholders, err := getPlaceholders(executorInputWithDefault)
367+
compiledCmd, compiledArgs, err := compileCmdAndArgs(executorInputWithDefault, cmd, args)
368368
if err != nil {
369369
return nil, nil, err
370370
}
371-
for placeholder, replacement := range placeholders {
372-
cmd = strings.ReplaceAll(cmd, placeholder, replacement)
373-
}
374-
for i := range args {
375-
arg := args[i]
376-
for placeholder, replacement := range placeholders {
377-
arg = strings.ReplaceAll(arg, placeholder, replacement)
378-
}
379-
args[i] = arg
380-
}
381371

382372
executorOutput, err := execute(
383373
ctx,
384374
executorInput,
385-
cmd,
386-
args,
375+
compiledCmd,
376+
compiledArgs,
387377
bucket,
388378
bucketConfig,
389379
namespace,
@@ -737,6 +727,31 @@ func fetchNonDefaultBuckets(
737727

738728
}
739729

730+
func compileCmdAndArgs(executorInput *pipelinespec.ExecutorInput, cmd string, args []string) (string, []string, error) {
731+
placeholders, err := getPlaceholders(executorInput)
732+
733+
executorInputJSON, err := protojson.Marshal(executorInput)
734+
if err != nil {
735+
return "", nil, fmt.Errorf("failed to convert ExecutorInput into JSON: %w", err)
736+
}
737+
executorInputJSONKey := "{{$}}"
738+
executorInputJSONString := string(executorInputJSON)
739+
740+
compiledCmd := strings.ReplaceAll(cmd, executorInputJSONKey, executorInputJSONString)
741+
compiledArgs := make([]string, 0, len(args))
742+
for placeholder, replacement := range placeholders {
743+
cmd = strings.ReplaceAll(cmd, placeholder, replacement)
744+
}
745+
for _, arg := range args {
746+
compiledArgTemplate := strings.ReplaceAll(arg, executorInputJSONKey, executorInputJSONString)
747+
for placeholder, replacement := range placeholders {
748+
compiledArgTemplate = strings.ReplaceAll(compiledArgTemplate, placeholder, replacement)
749+
}
750+
compiledArgs = append(compiledArgs, compiledArgTemplate)
751+
}
752+
return compiledCmd, compiledArgs, nil
753+
}
754+
740755
// Add executor input placeholders to provided map.
741756
func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders map[string]string, err error) {
742757
defer func() {
@@ -745,11 +760,9 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
745760
}
746761
}()
747762
placeholders = make(map[string]string)
748-
executorInputJSON, err := protojson.Marshal(executorInput)
749763
if err != nil {
750764
return nil, fmt.Errorf("failed to convert ExecutorInput into JSON: %w", err)
751765
}
752-
placeholders["{{$}}"] = string(executorInputJSON)
753766

754767
// Read input artifact metadata.
755768
for name, artifactList := range executorInput.GetInputs().GetArtifacts() {

backend/src/v2/component/launcher_v2_test.go

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@ package component
1515

1616
import (
1717
"context"
18+
"encoding/json"
1819
"io"
1920
"os"
2021
"testing"
2122

23+
"google.golang.org/protobuf/encoding/protojson"
24+
2225
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
2326
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
2427
"github.com/kubeflow/pipelines/backend/src/v2/objectstore"
@@ -106,7 +109,7 @@ func Test_executeV2_Parameters(t *testing.T) {
106109
}
107110
}
108111

109-
func Test_executeV2_publishLogss(t *testing.T) {
112+
func Test_executeV2_publishLogs(t *testing.T) {
110113
tests := []struct {
111114
name string
112115
executorInput *pipelinespec.ExecutorInput
@@ -167,6 +170,75 @@ func Test_executeV2_publishLogss(t *testing.T) {
167170
}
168171
}
169172

173+
func Test_executorInput_compileCmdAndArgs(t *testing.T) {
174+
executorInputJSON := `{
175+
"inputs": {
176+
"parameterValues": {
177+
"config": {
178+
"category_ids": "{{$.inputs.parameters['pipelinechannel--category_ids']}}",
179+
"dump_filename": "{{$.inputs.parameters['pipelinechannel--dump_filename']}}",
180+
"sphinx_host": "{{$.inputs.parameters['pipelinechannel--sphinx_host']}}",
181+
"sphinx_port": "{{$.inputs.parameters['pipelinechannel--sphinx_port']}}"
182+
},
183+
"pipelinechannel--category_ids": "116",
184+
"pipelinechannel--dump_filename": "dump_filename_test.txt",
185+
"pipelinechannel--sphinx_host": "sphinx-default-host.ru",
186+
"pipelinechannel--sphinx_port": 9312
187+
}
188+
},
189+
"outputs": {
190+
"artifacts": {
191+
"dataset": {
192+
"artifacts": [{
193+
"type": {
194+
"schemaTitle": "system.Dataset",
195+
"schemaVersion": "0.0.1"
196+
},
197+
"uri": "s3://aviflow-stage-kfp-artifacts/debug-component-pipeline/ae02034e-bd96-4b8a-a06b-55c99fe9eccb/sayhello/c98ac032-2448-4637-bf37-3ad1e13a112c/dataset"
198+
}]
199+
}
200+
},
201+
"outputFile": "/tmp/kfp_outputs/output_metadata.json"
202+
}
203+
}`
204+
205+
executorInput := &pipelinespec.ExecutorInput{}
206+
err := protojson.Unmarshal([]byte(executorInputJSON), executorInput)
207+
208+
assert.NoError(t, err)
209+
210+
cmd := "sh"
211+
args := []string{
212+
"--executor_input", "{{$}}",
213+
"--function_to_execute", "sayHello",
214+
}
215+
cmd, args, err = compileCmdAndArgs(executorInput, cmd, args)
216+
217+
assert.NoError(t, err)
218+
219+
var actualExecutorInput string
220+
for i := 0; i < len(args)-1; i++ {
221+
if args[i] == "--executor_input" {
222+
actualExecutorInput = args[i+1]
223+
break
224+
}
225+
}
226+
assert.NotEmpty(t, actualExecutorInput, "--executor_input not found")
227+
228+
var parsed map[string]any
229+
err = json.Unmarshal([]byte(actualExecutorInput), &parsed)
230+
assert.NoError(t, err)
231+
232+
inputs := parsed["inputs"].(map[string]any)
233+
paramValues := inputs["parameterValues"].(map[string]any)
234+
config := paramValues["config"].(map[string]any)
235+
236+
assert.Equal(t, "116", config["category_ids"])
237+
assert.Equal(t, "dump_filename_test.txt", config["dump_filename"])
238+
assert.Equal(t, "sphinx-default-host.ru", config["sphinx_host"])
239+
assert.Equal(t, "9312", config["sphinx_port"])
240+
}
241+
170242
func Test_get_log_Writer(t *testing.T) {
171243
old := osCreateFunc
172244
defer func() { osCreateFunc = old }()

0 commit comments

Comments
 (0)