diff --git a/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go b/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go index d809e54f8fe..301e3e2a30a 100644 --- a/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go +++ b/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go @@ -2065,7 +2065,9 @@ type RuntimeArtifact struct { // Deprecated: Marked as deprecated in pipeline_spec.proto. CustomProperties map[string]*Value `protobuf:"bytes,5,rep,name=custom_properties,json=customProperties,proto3" json:"custom_properties,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` // Properties of the Artifact. - Metadata *structpb.Struct `protobuf:"bytes,6,opt,name=metadata,proto3" json:"metadata,omitempty"` + Metadata *structpb.Struct `protobuf:"bytes,6,opt,name=metadata,proto3" json:"metadata,omitempty"` + // Custom path for output artifact. + CustomPath *string `protobuf:"bytes,7,opt,name=custom_path,json=customPath,proto3,oneof" json:"custom_path,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -2144,6 +2146,13 @@ func (x *RuntimeArtifact) GetMetadata() *structpb.Struct { return nil } +func (x *RuntimeArtifact) GetCustomPath() string { + if x != nil && x.CustomPath != nil { + return *x.CustomPath + } + return "" +} + // Message that represents a list of artifacts. type ArtifactList struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -5988,7 +5997,7 @@ const file_pipeline_spec_proto_rawDesc = "" + "\tint_value\x18\x01 \x01(\x03H\x00R\bintValue\x12#\n" + "\fdouble_value\x18\x02 \x01(\x01H\x00R\vdoubleValue\x12#\n" + "\fstring_value\x18\x03 \x01(\tH\x00R\vstringValueB\a\n" + - "\x05value\"\x89\x04\n" + + "\x05value\"\xbf\x04\n" + "\x0fRuntimeArtifact\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\x124\n" + "\x04type\x18\x02 \x01(\v2 .ml_pipelines.ArtifactTypeSchemaR\x04type\x12\x10\n" + @@ -5997,13 +6006,16 @@ const file_pipeline_spec_proto_rawDesc = "" + "properties\x18\x04 \x03(\v2-.ml_pipelines.RuntimeArtifact.PropertiesEntryB\x02\x18\x01R\n" + "properties\x12d\n" + "\x11custom_properties\x18\x05 \x03(\v23.ml_pipelines.RuntimeArtifact.CustomPropertiesEntryB\x02\x18\x01R\x10customProperties\x123\n" + - "\bmetadata\x18\x06 \x01(\v2\x17.google.protobuf.StructR\bmetadata\x1aR\n" + + "\bmetadata\x18\x06 \x01(\v2\x17.google.protobuf.StructR\bmetadata\x12$\n" + + "\vcustom_path\x18\a \x01(\tH\x00R\n" + + "customPath\x88\x01\x01\x1aR\n" + "\x0fPropertiesEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12)\n" + "\x05value\x18\x02 \x01(\v2\x13.ml_pipelines.ValueR\x05value:\x028\x01\x1aX\n" + "\x15CustomPropertiesEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12)\n" + - "\x05value\x18\x02 \x01(\v2\x13.ml_pipelines.ValueR\x05value:\x028\x01\"K\n" + + "\x05value\x18\x02 \x01(\v2\x13.ml_pipelines.ValueR\x05value:\x028\x01B\x0e\n" + + "\f_custom_path\"K\n" + "\fArtifactList\x12;\n" + "\tartifacts\x18\x01 \x03(\v2\x1d.ml_pipelines.RuntimeArtifactR\tartifacts\"\xfa\b\n" + "\rExecutorInput\x12:\n" + @@ -6430,6 +6442,7 @@ func file_pipeline_spec_proto_init() { (*Value_DoubleValue)(nil), (*Value_StringValue)(nil), } + file_pipeline_spec_proto_msgTypes[23].OneofWrappers = []any{} file_pipeline_spec_proto_msgTypes[32].OneofWrappers = []any{} file_pipeline_spec_proto_msgTypes[33].OneofWrappers = []any{} file_pipeline_spec_proto_msgTypes[34].OneofWrappers = []any{} diff --git a/api/v2alpha1/pipeline_spec.proto b/api/v2alpha1/pipeline_spec.proto index 11dece99820..5d796d6a812 100644 --- a/api/v2alpha1/pipeline_spec.proto +++ b/api/v2alpha1/pipeline_spec.proto @@ -967,6 +967,9 @@ message RuntimeArtifact { // Properties of the Artifact. google.protobuf.Struct metadata = 6; + + // Custom path for output artifact. + optional string custom_path = 7; } // Message that represents a list of artifacts. diff --git a/backend/src/v2/component/launcher_v2.go b/backend/src/v2/component/launcher_v2.go index c068b666e25..792268cd51e 100644 --- a/backend/src/v2/component/launcher_v2.go +++ b/backend/src/v2/component/launcher_v2.go @@ -628,7 +628,7 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec } // Upload artifacts from local path to remote storages. - localDir, err := LocalPathForURI(outputArtifact.Uri) + localDir, err := retrieveArtifactPath(outputArtifact) if err != nil { glog.Warningf("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage.", name, outputArtifact.Uri) } else if !strings.HasPrefix(outputArtifact.Uri, "oci://") { @@ -994,6 +994,16 @@ func LocalPathForURI(uri string) (string, error) { return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri) } +func retrieveArtifactPath(artifact *pipelinespec.RuntimeArtifact) (string, error) { + // If artifact custom path is set, use custom path. Otherwise, use URI. + customPath := artifact.CustomPath + if customPath != nil { + return *customPath, nil + } else { + return LocalPathForURI(artifact.Uri) + } +} + func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error { for name, parameter := range executorInput.GetOutputs().GetParameters() { dir := filepath.Dir(parameter.OutputFile) diff --git a/backend/src/v2/component/launcher_v2_test.go b/backend/src/v2/component/launcher_v2_test.go index 05c71d0e2de..0b44fbb69b2 100644 --- a/backend/src/v2/component/launcher_v2_test.go +++ b/backend/src/v2/component/launcher_v2_test.go @@ -418,3 +418,36 @@ func Test_NewLauncherV2(t *testing.T) { }) } } + +func Test_retrieve_artifact_path(t *testing.T) { + customPath := "/var/lib/kubelet/pods/pod-uid/volumes/kubernetes.io~csi/pvc-uuid/mount" + tests := []struct { + name string + artifact *pipelinespec.RuntimeArtifact + expectedPath string + }{ + { + "Artifact with no custom path", + &pipelinespec.RuntimeArtifact{ + Uri: "gs://bucket/path/to/artifact", + }, + "/gcs/bucket/path/to/artifact", + }, + { + "Artifact with custom path", + &pipelinespec.RuntimeArtifact{ + Uri: "gs://bucket/path/to/artifact", + CustomPath: &customPath, + }, + customPath, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + path, err := retrieveArtifactPath(test.artifact) + assert.Nil(t, err) + assert.Equal(t, path, test.expectedPath) + }) + } +} diff --git a/sdk/python/kfp/dsl/executor.py b/sdk/python/kfp/dsl/executor.py index ded418cc4ea..56ad353fa56 100644 --- a/sdk/python/kfp/dsl/executor.py +++ b/sdk/python/kfp/dsl/executor.py @@ -287,6 +287,7 @@ def write_executor_output(self, 'name': artifact.name, 'uri': artifact.uri, 'metadata': artifact.metadata, + 'custom_path': artifact.custom_path } artifacts_list = {'artifacts': [runtime_artifact]} diff --git a/sdk/python/kfp/dsl/executor_test.py b/sdk/python/kfp/dsl/executor_test.py index b5082dd9a36..3f1ddaa417b 100644 --- a/sdk/python/kfp/dsl/executor_test.py +++ b/sdk/python/kfp/dsl/executor_test.py @@ -318,6 +318,8 @@ def test_func(output_artifact_two: Output[Metrics]): 'projects/123/locations/us-central1/metadataStores/default/artifacts/123', 'uri': 'new-uri', + 'custom_path': + '', 'metadata': { 'key_1': 'value_1', 'key_2': 2, @@ -645,7 +647,9 @@ def test_func(first: str, second: str, output: Output[Artifact]) -> str: 'name': 'projects/123/locations/us-central1/metadataStores/default/artifacts/123', 'uri': - 'gs://some-bucket/output' + 'gs://some-bucket/output', + 'custom_path': + '' }] } }, @@ -702,7 +706,9 @@ def test_func(first: str, second: str) -> Artifact: 'name': 'projects/123/locations/us-central1/metadataStores/default/artifacts/123', 'uri': - 'gs://some-bucket/output' + 'gs://some-bucket/output', + 'custom_path': + '' }] } }, @@ -815,7 +821,9 @@ def func_returning_plain_tuple() -> NamedTuple('Outputs', [ 'name': 'projects/123/locations/us-central1/metadataStores/default/artifacts/123', 'uri': - 'gs://some-bucket/output_dataset' + 'gs://some-bucket/output_dataset', + 'custom_path': + '' }] } }, @@ -1054,6 +1062,8 @@ def test_func( '', 'uri': 'gs://mlpipeline/v2/artifacts/my-test-pipeline-beta/b2b0cdee-b15c-48ff-b8bc-a394ae46c854/train/model', + 'custom_path': + '', 'metadata': { 'accuracy': 0.9 } @@ -1288,6 +1298,8 @@ def test_func() -> Artifact: 'projects/123/locations/us-central1/metadataStores/default/artifacts/123', 'uri': 'gs://manually_specified_bucket/foo', + 'custom_path': + '', 'metadata': { 'data': 123 } @@ -1339,6 +1351,8 @@ def test_func() -> Artifact: 'projects/123/locations/us-central1/metadataStores/default/artifacts/123', 'uri': 'gs://another_bucket/my_artifact', + 'custom_path': + '', 'metadata': { 'data': 123 } @@ -1408,6 +1422,8 @@ def test_func() -> NamedTuple('outputs', a=Artifact, d=Dataset): 'projects/123/locations/us-central1/metadataStores/default/artifacts/123', 'uri': 'gs://another_bucket/artifact', + 'custom_path': + '', 'metadata': { 'data': 123 } @@ -1419,6 +1435,8 @@ def test_func() -> NamedTuple('outputs', a=Artifact, d=Dataset): 'projects/123/locations/us-central1/metadataStores/default/artifacts/321', 'uri': 'gs://another_bucket/dataset', + 'custom_path': + '', 'metadata': {} }] } diff --git a/sdk/python/kfp/dsl/types/artifact_types.py b/sdk/python/kfp/dsl/types/artifact_types.py index fed21b159c1..a077c452f6e 100644 --- a/sdk/python/kfp/dsl/types/artifact_types.py +++ b/sdk/python/kfp/dsl/types/artifact_types.py @@ -78,10 +78,12 @@ def __init__(self, name: Optional[str] = None, uri: Optional[str] = None, metadata: Optional[Dict] = None) -> None: - """Initializes the Artifact with the given name, URI and metadata.""" + """Initializes the Artifact with the given name, URI, metadata, and + blank custom path.""" self.uri = uri or '' self.name = name or '' self.metadata = metadata or {} + self._custom_path = '' @property def path(self) -> str: @@ -89,9 +91,11 @@ def path(self) -> str: @path.setter def path(self, path: str) -> None: - self._set_path(path) + self._set_custom_path(path) def _get_path(self) -> Optional[str]: + if self.custom_path is not '': + return self._get_custom_path() if self.uri.startswith(RemotePrefix.GCS.value): return _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value ):] @@ -108,9 +112,23 @@ def _get_path(self) -> Optional[str]: # uri == path for local execution return self.uri + @property + def custom_path(self) -> str: + return self._custom_path + + def _get_custom_path(self) -> str: + return self._custom_path + def _set_path(self, path: str) -> None: self.uri = convert_local_path_to_remote_path(path) + def _set_custom_path(self, value: str) -> None: + self._custom_path = value + + @custom_path.setter + def custom_path(self, value: str): + self._custom_path = value + def convert_local_path_to_remote_path(path: str) -> str: if path.startswith(_GCS_LOCAL_MOUNT_PREFIX): diff --git a/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_artifact_custom_path.py b/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_artifact_custom_path.py new file mode 100644 index 00000000000..9c062772b7a --- /dev/null +++ b/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_artifact_custom_path.py @@ -0,0 +1,22 @@ +from kfp.dsl import Output +from kfp.dsl.types.artifact_types import Artifact +from kfp.v2 import dsl + + +@dsl.component +def generate_artifact() -> list: + return [1, 2, 3, 4] + +@dsl.component +def validate_artifact_path(exp_path: str, input_list: Output[Artifact]) -> bool: + if input_list.path is not exp_path: + raise ValueError(f"File uri is {input_list.path} but should be {exp_path}.") + +@dsl.pipeline +def pipeline_with_custom_path_artifact(): + # Generate artifact, and set its custom path. + output_artifact_task = generate_artifact() + output_artifact_task.output.set_path('/etc/test/file/path') + + # Validate generated artifact's path. + validate_artifact_task = validate_artifact_path(path='/etc/test/file/path', input_list=output_artifact_task.output)