Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions api/v2alpha1/pipeline_spec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,9 @@ message RuntimeArtifact {

// Properties of the Artifact.
google.protobuf.Struct metadata = 6;

// Custom path for output artifact.
optional string custom_path = 7;
}

// Message that represents a list of artifacts.
Expand Down
12 changes: 11 additions & 1 deletion backend/src/v2/component/launcher_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec
}

// Upload artifacts from local path to remote storages.
localDir, err := LocalPathForURI(outputArtifact.Uri)
localDir, err := retrieveArtifactPath(outputArtifact)
if err != nil {
glog.Warningf("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage.", name, outputArtifact.Uri)
} else if !strings.HasPrefix(outputArtifact.Uri, "oci://") {
Expand Down Expand Up @@ -994,6 +994,16 @@ func LocalPathForURI(uri string) (string, error) {
return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri)
}

func retrieveArtifactPath(artifact *pipelinespec.RuntimeArtifact) (string, error) {
// If artifact custom path is set, use custom path. Otherwise, use URI.
customPath := artifact.CustomPath
if customPath != nil {
return *customPath, nil
} else {
return LocalPathForURI(artifact.Uri)
}
}

func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error {
for name, parameter := range executorInput.GetOutputs().GetParameters() {
dir := filepath.Dir(parameter.OutputFile)
Expand Down
33 changes: 33 additions & 0 deletions backend/src/v2/component/launcher_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,3 +418,36 @@ func Test_NewLauncherV2(t *testing.T) {
})
}
}

func Test_retrieve_artifact_path(t *testing.T) {
customPath := "/var/lib/kubelet/pods/pod-uid/volumes/kubernetes.io~csi/pvc-uuid/mount"
tests := []struct {
name string
artifact *pipelinespec.RuntimeArtifact
expectedPath string
}{
{
"Artifact with no custom path",
&pipelinespec.RuntimeArtifact{
Uri: "gs://bucket/path/to/artifact",
},
"/gcs/bucket/path/to/artifact",
},
{
"Artifact with custom path",
&pipelinespec.RuntimeArtifact{
Uri: "gs://bucket/path/to/artifact",
CustomPath: &customPath,
},
customPath,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
path, err := retrieveArtifactPath(test.artifact)
assert.Nil(t, err)
assert.Equal(t, path, test.expectedPath)
})
}
}
1 change: 1 addition & 0 deletions sdk/python/kfp/dsl/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ def write_executor_output(self,
'name': artifact.name,
'uri': artifact.uri,
'metadata': artifact.metadata,
'custom_path': artifact.custom_path
}
artifacts_list = {'artifacts': [runtime_artifact]}

Expand Down
24 changes: 21 additions & 3 deletions sdk/python/kfp/dsl/executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ def test_func(output_artifact_two: Output[Metrics]):
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
'uri':
'new-uri',
'custom_path':
'',
'metadata': {
'key_1': 'value_1',
'key_2': 2,
Expand Down Expand Up @@ -645,7 +647,9 @@ def test_func(first: str, second: str, output: Output[Artifact]) -> str:
'name':
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
'uri':
'gs://some-bucket/output'
'gs://some-bucket/output',
'custom_path':
''
}]
}
},
Expand Down Expand Up @@ -702,7 +706,9 @@ def test_func(first: str, second: str) -> Artifact:
'name':
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
'uri':
'gs://some-bucket/output'
'gs://some-bucket/output',
'custom_path':
''
}]
}
},
Expand Down Expand Up @@ -815,7 +821,9 @@ def func_returning_plain_tuple() -> NamedTuple('Outputs', [
'name':
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
'uri':
'gs://some-bucket/output_dataset'
'gs://some-bucket/output_dataset',
'custom_path':
''
}]
}
},
Expand Down Expand Up @@ -1054,6 +1062,8 @@ def test_func(
'',
'uri':
'gs://mlpipeline/v2/artifacts/my-test-pipeline-beta/b2b0cdee-b15c-48ff-b8bc-a394ae46c854/train/model',
'custom_path':
'',
'metadata': {
'accuracy': 0.9
}
Expand Down Expand Up @@ -1288,6 +1298,8 @@ def test_func() -> Artifact:
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
'uri':
'gs://manually_specified_bucket/foo',
'custom_path':
'',
'metadata': {
'data': 123
}
Expand Down Expand Up @@ -1339,6 +1351,8 @@ def test_func() -> Artifact:
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
'uri':
'gs://another_bucket/my_artifact',
'custom_path':
'',
'metadata': {
'data': 123
}
Expand Down Expand Up @@ -1408,6 +1422,8 @@ def test_func() -> NamedTuple('outputs', a=Artifact, d=Dataset):
'projects/123/locations/us-central1/metadataStores/default/artifacts/123',
'uri':
'gs://another_bucket/artifact',
'custom_path':
'',
'metadata': {
'data': 123
}
Expand All @@ -1419,6 +1435,8 @@ def test_func() -> NamedTuple('outputs', a=Artifact, d=Dataset):
'projects/123/locations/us-central1/metadataStores/default/artifacts/321',
'uri':
'gs://another_bucket/dataset',
'custom_path':
'',
'metadata': {}
}]
}
Expand Down
22 changes: 20 additions & 2 deletions sdk/python/kfp/dsl/types/artifact_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,24 @@ def __init__(self,
name: Optional[str] = None,
uri: Optional[str] = None,
metadata: Optional[Dict] = None) -> None:
"""Initializes the Artifact with the given name, URI and metadata."""
"""Initializes the Artifact with the given name, URI, metadata, and
blank custom path."""
self.uri = uri or ''
self.name = name or ''
self.metadata = metadata or {}
self._custom_path = ''

@property
def path(self) -> str:
return self._get_path()

@path.setter
def path(self, path: str) -> None:
self._set_path(path)
self._set_custom_path(path)

def _get_path(self) -> Optional[str]:
if self.custom_path is not '':
return self._get_custom_path()
if self.uri.startswith(RemotePrefix.GCS.value):
return _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value
):]
Expand All @@ -108,9 +112,23 @@ def _get_path(self) -> Optional[str]:
# uri == path for local execution
return self.uri

@property
def custom_path(self) -> str:
return self._custom_path

def _get_custom_path(self) -> str:
return self._custom_path

def _set_path(self, path: str) -> None:
self.uri = convert_local_path_to_remote_path(path)

def _set_custom_path(self, value: str) -> None:
self._custom_path = value

@custom_path.setter
def custom_path(self, value: str):
self._custom_path = value


def convert_local_path_to_remote_path(path: str) -> str:
if path.startswith(_GCS_LOCAL_MOUNT_PREFIX):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from kfp.dsl import Output
from kfp.dsl.types.artifact_types import Artifact
from kfp.v2 import dsl


@dsl.component
def generate_artifact() -> list:
return [1, 2, 3, 4]

@dsl.component
def validate_artifact_path(exp_path: str, input_list: Output[Artifact]) -> bool:
if input_list.path is not exp_path:
raise ValueError(f"File uri is {input_list.path} but should be {exp_path}.")

@dsl.pipeline
def pipeline_with_custom_path_artifact():
# Generate artifact, and set its custom path.
output_artifact_task = generate_artifact()
output_artifact_task.output.set_path('/etc/test/file/path')

# Validate generated artifact's path.
validate_artifact_task = validate_artifact_path(path='/etc/test/file/path', input_list=output_artifact_task.output)
Loading