Skip to content

Commit 52d4595

Browse files
authored
feat(backend, sdk): Add custom_path field to RuntimeArtifact (#12248)
* Implement custom upload path on runtime artifact. Signed-off-by: agoins <[email protected]> * Update launcher_v2 local path helper method. Signed-off-by: agoins <[email protected]> --------- Signed-off-by: agoins <[email protected]>
1 parent c6a234b commit 52d4595

File tree

8 files changed

+128
-10
lines changed

8 files changed

+128
-10
lines changed

api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go

Lines changed: 17 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v2alpha1/pipeline_spec.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -967,6 +967,9 @@ message RuntimeArtifact {
967967

968968
// Properties of the Artifact.
969969
google.protobuf.Struct metadata = 6;
970+
971+
// Custom path for output artifact.
972+
optional string custom_path = 7;
970973
}
971974

972975
// Message that represents a list of artifacts.

backend/src/v2/component/launcher_v2.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,7 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec
628628
}
629629

630630
// Upload artifacts from local path to remote storages.
631-
localDir, err := LocalPathForURI(outputArtifact.Uri)
631+
localDir, err := retrieveArtifactPath(outputArtifact)
632632
if err != nil {
633633
glog.Warningf("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage.", name, outputArtifact.Uri)
634634
} else if !strings.HasPrefix(outputArtifact.Uri, "oci://") {
@@ -994,6 +994,16 @@ func LocalPathForURI(uri string) (string, error) {
994994
return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri)
995995
}
996996

997+
func retrieveArtifactPath(artifact *pipelinespec.RuntimeArtifact) (string, error) {
998+
// If artifact custom path is set, use custom path. Otherwise, use URI.
999+
customPath := artifact.CustomPath
1000+
if customPath != nil {
1001+
return *customPath, nil
1002+
} else {
1003+
return LocalPathForURI(artifact.Uri)
1004+
}
1005+
}
1006+
9971007
func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error {
9981008
for name, parameter := range executorInput.GetOutputs().GetParameters() {
9991009
dir := filepath.Dir(parameter.OutputFile)

backend/src/v2/component/launcher_v2_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,3 +418,36 @@ func Test_NewLauncherV2(t *testing.T) {
418418
})
419419
}
420420
}
421+
422+
func Test_retrieve_artifact_path(t *testing.T) {
423+
customPath := "/var/lib/kubelet/pods/pod-uid/volumes/kubernetes.io~csi/pvc-uuid/mount"
424+
tests := []struct {
425+
name string
426+
artifact *pipelinespec.RuntimeArtifact
427+
expectedPath string
428+
}{
429+
{
430+
"Artifact with no custom path",
431+
&pipelinespec.RuntimeArtifact{
432+
Uri: "gs://bucket/path/to/artifact",
433+
},
434+
"/gcs/bucket/path/to/artifact",
435+
},
436+
{
437+
"Artifact with custom path",
438+
&pipelinespec.RuntimeArtifact{
439+
Uri: "gs://bucket/path/to/artifact",
440+
CustomPath: &customPath,
441+
},
442+
customPath,
443+
},
444+
}
445+
446+
for _, test := range tests {
447+
t.Run(test.name, func(t *testing.T) {
448+
path, err := retrieveArtifactPath(test.artifact)
449+
assert.Nil(t, err)
450+
assert.Equal(t, path, test.expectedPath)
451+
})
452+
}
453+
}

sdk/python/kfp/dsl/executor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ def write_executor_output(self,
287287
'name': artifact.name,
288288
'uri': artifact.uri,
289289
'metadata': artifact.metadata,
290+
'custom_path': artifact.custom_path
290291
}
291292
artifacts_list = {'artifacts': [runtime_artifact]}
292293

sdk/python/kfp/dsl/executor_test.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,8 @@ def test_func(output_artifact_two: Output[Metrics]):
318318
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
319319
'uri':
320320
'new-uri',
321+
'custom_path':
322+
'',
321323
'metadata': {
322324
'key_1': 'value_1',
323325
'key_2': 2,
@@ -645,7 +647,9 @@ def test_func(first: str, second: str, output: Output[Artifact]) -> str:
645647
'name':
646648
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
647649
'uri':
648-
'gs://some-bucket/output'
650+
'gs://some-bucket/output',
651+
'custom_path':
652+
''
649653
}]
650654
}
651655
},
@@ -702,7 +706,9 @@ def test_func(first: str, second: str) -> Artifact:
702706
'name':
703707
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
704708
'uri':
705-
'gs://some-bucket/output'
709+
'gs://some-bucket/output',
710+
'custom_path':
711+
''
706712
}]
707713
}
708714
},
@@ -815,7 +821,9 @@ def func_returning_plain_tuple() -> NamedTuple('Outputs', [
815821
'name':
816822
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
817823
'uri':
818-
'gs://some-bucket/output_dataset'
824+
'gs://some-bucket/output_dataset',
825+
'custom_path':
826+
''
819827
}]
820828
}
821829
},
@@ -1054,6 +1062,8 @@ def test_func(
10541062
'',
10551063
'uri':
10561064
'gs://mlpipeline/v2/artifacts/my-test-pipeline-beta/b2b0cdee-b15c-48ff-b8bc-a394ae46c854/train/model',
1065+
'custom_path':
1066+
'',
10571067
'metadata': {
10581068
'accuracy': 0.9
10591069
}
@@ -1288,6 +1298,8 @@ def test_func() -> Artifact:
12881298
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
12891299
'uri':
12901300
'gs://manually_specified_bucket/foo',
1301+
'custom_path':
1302+
'',
12911303
'metadata': {
12921304
'data': 123
12931305
}
@@ -1339,6 +1351,8 @@ def test_func() -> Artifact:
13391351
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
13401352
'uri':
13411353
'gs://another_bucket/my_artifact',
1354+
'custom_path':
1355+
'',
13421356
'metadata': {
13431357
'data': 123
13441358
}
@@ -1408,6 +1422,8 @@ def test_func() -> NamedTuple('outputs', a=Artifact, d=Dataset):
14081422
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
14091423
'uri':
14101424
'gs://another_bucket/artifact',
1425+
'custom_path':
1426+
'',
14111427
'metadata': {
14121428
'data': 123
14131429
}
@@ -1419,6 +1435,8 @@ def test_func() -> NamedTuple('outputs', a=Artifact, d=Dataset):
14191435
'projects/123/locations/us-central1/metadataStores/default/artifacts/321',
14201436
'uri':
14211437
'gs://another_bucket/dataset',
1438+
'custom_path':
1439+
'',
14221440
'metadata': {}
14231441
}]
14241442
}

sdk/python/kfp/dsl/types/artifact_types.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,20 +78,24 @@ def __init__(self,
7878
name: Optional[str] = None,
7979
uri: Optional[str] = None,
8080
metadata: Optional[Dict] = None) -> None:
81-
"""Initializes the Artifact with the given name, URI and metadata."""
81+
"""Initializes the Artifact with the given name, URI, metadata, and
82+
blank custom path."""
8283
self.uri = uri or ''
8384
self.name = name or ''
8485
self.metadata = metadata or {}
86+
self._custom_path = ''
8587

8688
@property
8789
def path(self) -> str:
8890
return self._get_path()
8991

9092
@path.setter
9193
def path(self, path: str) -> None:
92-
self._set_path(path)
94+
self._set_custom_path(path)
9395

9496
def _get_path(self) -> Optional[str]:
97+
if self.custom_path is not '':
98+
return self._get_custom_path()
9599
if self.uri.startswith(RemotePrefix.GCS.value):
96100
return _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value
97101
):]
@@ -108,9 +112,23 @@ def _get_path(self) -> Optional[str]:
108112
# uri == path for local execution
109113
return self.uri
110114

115+
@property
116+
def custom_path(self) -> str:
117+
return self._custom_path
118+
119+
def _get_custom_path(self) -> str:
120+
return self._custom_path
121+
111122
def _set_path(self, path: str) -> None:
112123
self.uri = convert_local_path_to_remote_path(path)
113124

125+
def _set_custom_path(self, value: str) -> None:
126+
self._custom_path = value
127+
128+
@custom_path.setter
129+
def custom_path(self, value: str):
130+
self._custom_path = value
131+
114132

115133
def convert_local_path_to_remote_path(path: str) -> str:
116134
if path.startswith(_GCS_LOCAL_MOUNT_PREFIX):
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from kfp.dsl import Output
2+
from kfp.dsl.types.artifact_types import Artifact
3+
from kfp.v2 import dsl
4+
5+
6+
@dsl.component
7+
def generate_artifact() -> list:
8+
return [1, 2, 3, 4]
9+
10+
@dsl.component
11+
def validate_artifact_path(exp_path: str, input_list: Output[Artifact]) -> bool:
12+
if input_list.path is not exp_path:
13+
raise ValueError(f"File uri is {input_list.path} but should be {exp_path}.")
14+
15+
@dsl.pipeline
16+
def pipeline_with_custom_path_artifact():
17+
# Generate artifact, and set its custom path.
18+
output_artifact_task = generate_artifact()
19+
output_artifact_task.output.set_path('/etc/test/file/path')
20+
21+
# Validate generated artifact's path.
22+
validate_artifact_task = validate_artifact_path(path='/etc/test/file/path', input_list=output_artifact_task.output)

0 commit comments

Comments
 (0)