Skip to content

Commit 4aa1556

Browse files
committed
fix
Signed-off-by: agoins <[email protected]>
1 parent e52cc56 commit 4aa1556

File tree

6 files changed

+103
-23
lines changed

6 files changed

+103
-23
lines changed

api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go

Lines changed: 17 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/src/v2/component/launcher_v2.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ func executeV2(
368368
return nil, nil, err
369369
}
370370

371+
// ensure executorOutput contains custompath
371372
executorOutput, err := execute(
372373
ctx,
373374
executorInput,
@@ -545,27 +546,32 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec
545546
mergeRuntimeArtifacts(list.Artifacts[0], outputArtifact)
546547
}
547548

548-
// If artifact customPath is set, upload to remote storages from this path. Otherwise, upload from local path.
549-
var artifactDir string
549+
var dir string
550+
var copyDir string
550551
var err error
552+
// If artifact customPath is set, upload to remote storages from this path. Otherwise, upload from local path (outputArtifact.URI)
551553
if *outputArtifact.CustomPath != "" {
552-
artifactDir = *outputArtifact.CustomPath
554+
dir = *outputArtifact.CustomPath
555+
copyDir = *outputArtifact.CustomPath
553556
} else {
554-
artifactDir, err = LocalPathForURI(outputArtifact.Uri)
557+
dir = outputArtifact.Uri
558+
copyDir, err = LocalPathForURI(dir)
559+
if err != nil {
560+
glog.Warningf("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage.", name, dir)
561+
}
555562
}
556-
if err != nil {
557-
glog.Warningf("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage.", name, outputArtifact.Uri)
558-
} else if !strings.HasPrefix(outputArtifact.Uri, "oci://") {
559-
blobKey, err := opts.bucketConfig.KeyFromURI(outputArtifact.Uri)
563+
564+
if !strings.HasPrefix(dir, "oci://") {
565+
blobKey, err := opts.bucketConfig.KeyFromURI(dir)
560566
if err != nil {
561567
return nil, fmt.Errorf("failed to upload output artifact %q: %w", name, err)
562568
}
563-
if err := objectstore.UploadBlob(ctx, opts.bucket, artifactDir, blobKey); err != nil {
569+
if err := objectstore.UploadBlob(ctx, opts.bucket, copyDir, blobKey); err != nil {
564570
// We allow components to not produce output files
565571
if errors.Is(err, os.ErrNotExist) {
566-
glog.Warningf("Local filepath %q does not exist", artifactDir)
572+
glog.Warningf("Local filepath %q does not exist", copyDir)
567573
} else {
568-
return nil, fmt.Errorf("failed to upload output artifact %q to remote storage URI %q: %w", name, outputArtifact.Uri, err)
574+
return nil, fmt.Errorf("failed to upload output artifact %q to remote storage URI %q: %w", name, dir, err)
569575
}
570576
}
571577
}

backend/src/v2/component/launcher_v2_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ var addNumbersComponent = &pipelinespec.ComponentSpec{
5252

5353
// Tests that launcher correctly executes the user component and successfully writes output parameters to file.
5454
func Test_executeV2_Parameters(t *testing.T) {
55+
customLocalPath := "/tmp/custom/artifact.txt"
5556
tests := []struct {
5657
name string
5758
executorInput *pipelinespec.ExecutorInput
@@ -64,6 +65,17 @@ func Test_executeV2_Parameters(t *testing.T) {
6465
Inputs: &pipelinespec.ExecutorInput_Inputs{
6566
ParameterValues: map[string]*structpb.Value{"a": structpb.NewNumberValue(1), "b": structpb.NewNumberValue(2)},
6667
},
68+
Outputs: &pipelinespec.ExecutorInput_Outputs{
69+
Artifacts: map[string]*pipelinespec.ArtifactList{
70+
"dataset": {
71+
Artifacts: []*pipelinespec.RuntimeArtifact{
72+
{
73+
Uri: customLocalPath,
74+
},
75+
},
76+
},
77+
},
78+
},
6779
},
6880
[]string{"-c", "test {{$.inputs.parameters['a']}} -eq 1 || exit 1\ntest {{$.inputs.parameters['b']}} -eq 2 || exit 1"},
6981
false,
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from sdk.python.kfp.dsl import Output
2+
from sdk.python.kfp.dsl.types.artifact_types import Artifact
3+
from sdk.python.kfp import dsl
4+
5+
@dsl.component
6+
def create_list() -> list:
7+
return [1, 2, 3, 4]
8+
9+
@dsl.component
10+
def append_to_list(digit: int, input_list: Output[Artifact]) -> list:
11+
input_list.append(digit)
12+
return input_list
13+
14+
@dsl.component
15+
def validate_custom_path(exp_path: str, input_list: Output[Artifact]) -> bool:
16+
#todo: is this the correct comparison? (or should use != instead?)
17+
if input_list.path is not exp_path:
18+
raise ValueError(f"File uri is {input_list.path} but should be {exp_path}.")
19+
20+
@dsl.pipeline
21+
def pipeline_with_custom_path_artifact():
22+
task1 = create_list()
23+
task1.output.set_custom_path('/etc/test/file/path')
24+
# task2 = validate_custom_path(path='/etc/test/file/path', input_list=task1.output)

samples/v2/sample_test.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ def get_package_path(subdir: str) -> str:
6565
import pipeline_container_no_input
6666
import pipeline_with_env
6767
import pipeline_with_placeholders
68-
import pipeline_with_secret_as_env
69-
import pipeline_with_secret_as_volume
68+
# import pipeline_with_secret_as_env
69+
# import pipeline_with_secret_as_volume
7070
import producer_consumer_param
7171
import pipeline_with_retry
7272
import pipeline_with_input_status_state
@@ -177,14 +177,14 @@ class SampleTest(unittest.TestCase):
177177
'http://localhost:8888')
178178
_kfp_ui_and_port = os.getenv('KFP_UI_HOST_AND_PORT',
179179
'http://localhost:8080')
180-
180+
181181
def __init__(self, *args, **kwargs):
182182
super().__init__(*args, **kwargs)
183183
# Initialize client with token if in multi-user mode
184184
auth_token = get_authentication_token()
185185
if auth_token:
186186
self._client = kfp.Client(
187-
host=self._kfp_host_and_port,
187+
host=self._kfp_host_and_port,
188188
ui_host=self._kfp_ui_and_port,
189189
existing_token=auth_token
190190
)
@@ -225,10 +225,10 @@ def test(self):
225225
# The following tests are not working. Tracking issue: https://github.com/kubeflow/pipelines/issues/11053
226226
# TestCase(pipeline_func=pipeline_with_importer.pipeline_with_importer),
227227
# TestCase(pipeline_func=pipeline_with_volume.pipeline_with_volume),
228-
TestCase(pipeline_func=pipeline_with_secret_as_volume
229-
.pipeline_secret_volume),
230-
TestCase(
231-
pipeline_func=pipeline_with_secret_as_env.pipeline_secret_env),
228+
# TestCase(pipeline_func=pipeline_with_secret_as_volume
229+
# .pipeline_secret_volume),
230+
# TestCase(
231+
# pipeline_func=pipeline_with_secret_as_env.pipeline_secret_env),
232232
TestCase(pipeline_func=subdagio.parameter.crust),
233233
TestCase(pipeline_func=subdagio.parameter_cache.crust),
234234
TestCase(pipeline_func=subdagio.mixed_parameters.crust),
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from kfp.dsl import Output
2+
from kfp.dsl.types.artifact_types import Artifact
3+
from kfp.v2 import dsl
4+
5+
6+
@dsl.component
7+
def create_list() -> list:
8+
return [1, 2, 3, 4]
9+
10+
@dsl.component
11+
def append_to_list(digit: int, input_list: Output[Artifact]) -> list:
12+
input_list.append(digit)
13+
return input_list
14+
15+
@dsl.component
16+
def validate_custom_path(exp_path: str, input_list: Output[Artifact]) -> bool:
17+
#todo: is this the correct comparison? (or should use != instead?)
18+
if input_list.path is not exp_path:
19+
raise ValueError(f"File uri is {input_list.path} but should be {exp_path}.")
20+
21+
@dsl.pipeline
22+
def pipeline_with_custom_path_artifact():
23+
task1 = create_list()
24+
task1.output.set_custom_path('/etc/test/file/path')
25+
task2 = validate_custom_path(path='/etc/test/file/path', input_list=task1.output)

0 commit comments

Comments
 (0)