Skip to content

Commit 97b09aa

Browse files
committed
fix
Signed-off-by: agoins <[email protected]> # Conflicts: # samples/v2/sample_test.py
1 parent 03f470f commit 97b09aa

File tree

5 files changed

+95
-15
lines changed

5 files changed

+95
-15
lines changed

api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go

Lines changed: 17 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/src/v2/component/launcher_v2.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ func executeV2(
368368
return nil, nil, err
369369
}
370370

371+
// ensure executorOutput contains custompath
371372
executorOutput, err := execute(
372373
ctx,
373374
executorInput,
@@ -545,27 +546,32 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec
545546
mergeRuntimeArtifacts(list.Artifacts[0], outputArtifact)
546547
}
547548

548-
// If artifact customPath is set, upload to remote storages from this path. Otherwise, upload from local path.
549-
var artifactDir string
549+
var dir string
550+
var copyDir string
550551
var err error
552+
// If artifact customPath is set, upload to remote storages from this path. Otherwise, upload from local path (outputArtifact.URI)
551553
if *outputArtifact.CustomPath != "" {
552-
artifactDir = *outputArtifact.CustomPath
554+
dir = *outputArtifact.CustomPath
555+
copyDir = *outputArtifact.CustomPath
553556
} else {
554-
artifactDir, err = LocalPathForURI(outputArtifact.Uri)
557+
dir = outputArtifact.Uri
558+
copyDir, err = LocalPathForURI(dir)
559+
if err != nil {
560+
glog.Warningf("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage.", name, dir)
561+
}
555562
}
556-
if err != nil {
557-
glog.Warningf("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage.", name, outputArtifact.Uri)
558-
} else if !strings.HasPrefix(outputArtifact.Uri, "oci://") {
559-
blobKey, err := opts.bucketConfig.KeyFromURI(outputArtifact.Uri)
563+
564+
if !strings.HasPrefix(dir, "oci://") {
565+
blobKey, err := opts.bucketConfig.KeyFromURI(dir)
560566
if err != nil {
561567
return nil, fmt.Errorf("failed to upload output artifact %q: %w", name, err)
562568
}
563-
if err := objectstore.UploadBlob(ctx, opts.bucket, artifactDir, blobKey); err != nil {
569+
if err := objectstore.UploadBlob(ctx, opts.bucket, copyDir, blobKey); err != nil {
564570
// We allow components to not produce output files
565571
if errors.Is(err, os.ErrNotExist) {
566-
glog.Warningf("Local filepath %q does not exist", artifactDir)
572+
glog.Warningf("Local filepath %q does not exist", copyDir)
567573
} else {
568-
return nil, fmt.Errorf("failed to upload output artifact %q to remote storage URI %q: %w", name, outputArtifact.Uri, err)
574+
return nil, fmt.Errorf("failed to upload output artifact %q to remote storage URI %q: %w", name, dir, err)
569575
}
570576
}
571577
}

backend/src/v2/component/launcher_v2_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ var addNumbersComponent = &pipelinespec.ComponentSpec{
5252

5353
// Tests that launcher correctly executes the user component and successfully writes output parameters to file.
5454
func Test_executeV2_Parameters(t *testing.T) {
55+
customLocalPath := "/tmp/custom/artifact.txt"
5556
tests := []struct {
5657
name string
5758
executorInput *pipelinespec.ExecutorInput
@@ -64,6 +65,17 @@ func Test_executeV2_Parameters(t *testing.T) {
6465
Inputs: &pipelinespec.ExecutorInput_Inputs{
6566
ParameterValues: map[string]*structpb.Value{"a": structpb.NewNumberValue(1), "b": structpb.NewNumberValue(2)},
6667
},
68+
Outputs: &pipelinespec.ExecutorInput_Outputs{
69+
Artifacts: map[string]*pipelinespec.ArtifactList{
70+
"dataset": {
71+
Artifacts: []*pipelinespec.RuntimeArtifact{
72+
{
73+
Uri: customLocalPath,
74+
},
75+
},
76+
},
77+
},
78+
},
6779
},
6880
[]string{"-c", "test {{$.inputs.parameters['a']}} -eq 1 || exit 1\ntest {{$.inputs.parameters['b']}} -eq 2 || exit 1"},
6981
false,
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from sdk.python.kfp.dsl import Output
2+
from sdk.python.kfp.dsl.types.artifact_types import Artifact
3+
from sdk.python.kfp import dsl
4+
5+
@dsl.component
6+
def create_list() -> list:
7+
return [1, 2, 3, 4]
8+
9+
@dsl.component
10+
def append_to_list(digit: int, input_list: Output[Artifact]) -> list:
11+
input_list.append(digit)
12+
return input_list
13+
14+
@dsl.component
15+
def validate_custom_path(exp_path: str, input_list: Output[Artifact]) -> bool:
16+
#todo: is this the correct comparison? (or should use != instead?)
17+
if input_list.path is not exp_path:
18+
raise ValueError(f"File uri is {input_list.path} but should be {exp_path}.")
19+
20+
@dsl.pipeline
21+
def pipeline_with_custom_path_artifact():
22+
task1 = create_list()
23+
task1.output.set_custom_path('/etc/test/file/path')
24+
# task2 = validate_custom_path(path='/etc/test/file/path', input_list=task1.output)
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from kfp.dsl import Output
2+
from kfp.dsl.types.artifact_types import Artifact
3+
from kfp.v2 import dsl
4+
5+
6+
@dsl.component
7+
def create_list() -> list:
8+
return [1, 2, 3, 4]
9+
10+
@dsl.component
11+
def append_to_list(digit: int, input_list: Output[Artifact]) -> list:
12+
input_list.append(digit)
13+
return input_list
14+
15+
@dsl.component
16+
def validate_custom_path(exp_path: str, input_list: Output[Artifact]) -> bool:
17+
#todo: is this the correct comparison? (or should use != instead?)
18+
if input_list.path is not exp_path:
19+
raise ValueError(f"File uri is {input_list.path} but should be {exp_path}.")
20+
21+
@dsl.pipeline
22+
def pipeline_with_custom_path_artifact():
23+
task1 = create_list()
24+
task1.output.set_custom_path('/etc/test/file/path')
25+
task2 = validate_custom_path(path='/etc/test/file/path', input_list=task1.output)

0 commit comments

Comments
 (0)