Skip to content

Commit a174e85

Browse files
committed
Update launcher_v2 local path helper method.
Signed-off-by: agoins <[email protected]>
1 parent e7382fa commit a174e85

File tree

3 files changed

+84
-23
lines changed

3 files changed

+84
-23
lines changed

backend/src/v2/component/launcher_v2.go

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func stopWaitingArtifacts(artifacts map[string]*pipelinespec.ArtifactList) {
140140
continue
141141
}
142142

143-
localPath, err := LocalPathForURI(inputArtifact.Uri)
143+
localPath, err := retrieveArtifactPath(inputArtifact)
144144
if err != nil {
145145
continue
146146
}
@@ -472,7 +472,7 @@ func getLogWriter(artifacts map[string]*pipelinespec.ArtifactList) (writer io.Wr
472472
}
473473

474474
logURI := logsArtifactList.Artifacts[0].Uri
475-
logFilePath, err := LocalPathForURI(logURI)
475+
logFilePath, err := retrieveArtifactPath(logsArtifactList.Artifacts[0])
476476
if err != nil {
477477
glog.Errorf("Error converting log artifact URI, %s, to file path.", logURI)
478478
return os.Stdout
@@ -628,7 +628,12 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec
628628
}
629629

630630
// Upload artifacts from local path to remote storages.
631-
localDir, err := LocalPathForURI(outputArtifact.Uri)
631+
localDir, err := retrieveArtifactPath(outputArtifact)
632+
633+
if outputArtifact.CustomPath != nil {
634+
localDir = *outputArtifact.CustomPath
635+
}
636+
632637
if err != nil {
633638
glog.Warningf("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage.", name, outputArtifact.Uri)
634639
} else if !strings.HasPrefix(outputArtifact.Uri, "oci://") {
@@ -713,7 +718,7 @@ func downloadArtifacts(ctx context.Context, executorInput *pipelinespec.Executor
713718
for _, artifact := range artifactList.Artifacts {
714719
// Iterating through the artifact list allows for collected artifacts to be properly consumed.
715720
inputArtifact := artifact
716-
localPath, err := LocalPathForURI(inputArtifact.Uri)
721+
localPath, err := retrieveArtifactPath(inputArtifact)
717722
if err != nil {
718723
glog.Warningf("Input Artifact %q does not have a recognized storage URI %q. Skipping downloading to local path.", name, inputArtifact.Uri)
719724

@@ -856,7 +861,7 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
856861
key := fmt.Sprintf(`{{$.inputs.artifacts['%s'].uri}}`, name)
857862
placeholders[key] = inputArtifact.Uri
858863

859-
localPath, err := LocalPathForURI(inputArtifact.Uri)
864+
localPath, err := retrieveArtifactPath(inputArtifact)
860865
if err != nil {
861866
// Input Artifact does not have a recognized storage URI
862867
continue
@@ -875,7 +880,7 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
875880
outputArtifact := artifactList.Artifacts[0]
876881
placeholders[fmt.Sprintf(`{{$.outputs.artifacts['%s'].uri}}`, name)] = outputArtifact.Uri
877882

878-
localPath, err := LocalPathForURI(outputArtifact.Uri)
883+
localPath, err := retrieveArtifactPath(outputArtifact)
879884
if err != nil {
880885
return nil, fmt.Errorf("resolve output artifact %q's local path: %w", name, err)
881886
}
@@ -978,20 +983,26 @@ func getExecutorOutputFile(path string) (*pipelinespec.ExecutorOutput, error) {
978983
return executorOutput, nil
979984
}
980985

981-
func LocalPathForURI(uri string) (string, error) {
982-
if strings.HasPrefix(uri, "gs://") {
983-
return "/gcs/" + strings.TrimPrefix(uri, "gs://"), nil
984-
}
985-
if strings.HasPrefix(uri, "minio://") {
986-
return "/minio/" + strings.TrimPrefix(uri, "minio://"), nil
987-
}
988-
if strings.HasPrefix(uri, "s3://") {
989-
return "/s3/" + strings.TrimPrefix(uri, "s3://"), nil
990-
}
991-
if strings.HasPrefix(uri, "oci://") {
992-
return "/oci/" + strings.ReplaceAll(strings.TrimPrefix(uri, "oci://"), "/", "_") + "/models", nil
986+
func retrieveArtifactPath(artifact *pipelinespec.RuntimeArtifact) (string, error) {
987+
customPath := artifact.CustomPath
988+
if customPath != nil {
989+
return *customPath, nil
990+
} else {
991+
uri := artifact.Uri
992+
if strings.HasPrefix(uri, "gs://") {
993+
return "/gcs/" + strings.TrimPrefix(uri, "gs://"), nil
994+
}
995+
if strings.HasPrefix(uri, "minio://") {
996+
return "/minio/" + strings.TrimPrefix(uri, "minio://"), nil
997+
}
998+
if strings.HasPrefix(uri, "s3://") {
999+
return "/s3/" + strings.TrimPrefix(uri, "s3://"), nil
1000+
}
1001+
if strings.HasPrefix(uri, "oci://") {
1002+
return "/oci/" + strings.ReplaceAll(strings.TrimPrefix(uri, "oci://"), "/", "_") + "/models", nil
1003+
}
1004+
return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri)
9931005
}
994-
return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri)
9951006
}
9961007

9971008
func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error {
@@ -1009,7 +1020,7 @@ func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error {
10091020

10101021
for _, outputArtifact := range artifactList.Artifacts {
10111022

1012-
localPath, err := LocalPathForURI(outputArtifact.Uri)
1023+
localPath, err := retrieveArtifactPath(outputArtifact)
10131024
if err != nil {
10141025
return fmt.Errorf("failed to generate local storage path for output artifact %q: %w", name, err)
10151026
}

backend/src/v2/component/launcher_v2_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,3 +418,37 @@ func Test_NewLauncherV2(t *testing.T) {
418418
})
419419
}
420420
}
421+
422+
func Test_retrieve_artifact_path(t *testing.T) {
423+
customPath := "custom-path"
424+
tests := []struct {
425+
name string
426+
artifact *pipelinespec.RuntimeArtifact
427+
expectedPath string
428+
}{
429+
{
430+
"Artifact with no custom path",
431+
&pipelinespec.RuntimeArtifact{
432+
Uri: "gs://bucket/path/to/artifact",
433+
},
434+
"/gcs/bucket/path/to/artifact",
435+
},
436+
{
437+
"Artifact with custom path",
438+
&pipelinespec.RuntimeArtifact{
439+
Uri: "gs://bucket/path/to/artifact",
440+
//todo: use more specific custom path
441+
CustomPath: &customPath,
442+
},
443+
customPath,
444+
},
445+
}
446+
447+
for _, test := range tests {
448+
t.Run(test.name, func(t *testing.T) {
449+
path, err := retrieveArtifactPath(test.artifact)
450+
assert.Nil(t, err)
451+
assert.Equal(t, path, test.expectedPath)
452+
})
453+
}
454+
}

backend/src/v2/driver/driver.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
// you may not use this file except in compliance with the License.
55
// You may obtain a copy of the License at
66
//
7-
// https://www.apache.org/licenses/LICENSE-2.0
7+
// https://www.apache.org/licenses/LICENSE-2.0
88
//
99
// Unless required by applicable law or agreed to in writing, software
1010
// distributed under the License is distributed on an "AS IS" BASIS,
@@ -545,7 +545,7 @@ func addModelcarsToPodSpec(
545545
for i, name := range modelcarArtifactNames {
546546
inputArtifact := modelcarArtifacts[name]
547547

548-
localPath, err := component.LocalPathForURI(inputArtifact.Uri)
548+
localPath, err := localPathForURI(inputArtifact.Uri)
549549
if err != nil {
550550
continue
551551
}
@@ -716,7 +716,7 @@ func provisionOutputs(
716716

717717
// Place the executor output file under localTaskRoot to enable Pythonic artifacts. The SDK's pythonic artifact
718718
// runtime derives CONTAINER_TASK_ROOT from the directory of OutputFile to use it in dsl.get_uri.
719-
if localTaskRoot, err := component.LocalPathForURI(taskRootRemote); err == nil {
719+
if localTaskRoot, err := localPathForURI(taskRootRemote); err == nil {
720720
outputs.OutputFile = filepath.Join(localTaskRoot, "output_metadata.json")
721721
} else {
722722
// Fallback to legacy path if the pipeline root scheme is not recognized.
@@ -742,3 +742,19 @@ func validateVolumeMounts(podSpec *k8score.PodSpec) error {
742742

743743
return nil
744744
}
745+
746+
func localPathForURI(uri string) (string, error) {
747+
if strings.HasPrefix(uri, "gs://") {
748+
return "/gcs/" + strings.TrimPrefix(uri, "gs://"), nil
749+
}
750+
if strings.HasPrefix(uri, "minio://") {
751+
return "/minio/" + strings.TrimPrefix(uri, "minio://"), nil
752+
}
753+
if strings.HasPrefix(uri, "s3://") {
754+
return "/s3/" + strings.TrimPrefix(uri, "s3://"), nil
755+
}
756+
if strings.HasPrefix(uri, "oci://") {
757+
return "/oci/" + strings.ReplaceAll(strings.TrimPrefix(uri, "oci://"), "/", "_") + "/models", nil
758+
}
759+
return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri)
760+
}

0 commit comments

Comments
 (0)