Skip to content

Commit 0571b8c

Browse files
committed
Update launcher_v2 local path helper method.
Signed-off-by: agoins <[email protected]>
1 parent 1533981 commit 0571b8c

File tree

2 files changed

+50
-22
lines changed

2 files changed

+50
-22
lines changed

backend/src/v2/component/launcher_v2.go

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func stopWaitingArtifacts(artifacts map[string]*pipelinespec.ArtifactList) {
140140
continue
141141
}
142142

143-
localPath, err := LocalPathForURI(inputArtifact.Uri)
143+
localPath, err := retrieveArtifactPath(inputArtifact)
144144
if err != nil {
145145
continue
146146
}
@@ -472,7 +472,7 @@ func getLogWriter(artifacts map[string]*pipelinespec.ArtifactList) (writer io.Wr
472472
}
473473

474474
logURI := logsArtifactList.Artifacts[0].Uri
475-
logFilePath, err := LocalPathForURI(logURI)
475+
logFilePath, err := retrieveArtifactPath(logsArtifactList.Artifacts[0])
476476
if err != nil {
477477
glog.Errorf("Error converting log artifact URI, %s, to file path.", logURI)
478478
return os.Stdout
@@ -628,7 +628,12 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec
628628
}
629629

630630
// Upload artifacts from local path to remote storages.
631-
localDir, err := LocalPathForURI(outputArtifact.Uri)
631+
localDir, err := retrieveArtifactPath(outputArtifact)
632+
633+
if outputArtifact.CustomPath != nil {
634+
localDir = *outputArtifact.CustomPath
635+
}
636+
632637
if err != nil {
633638
glog.Warningf("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage.", name, outputArtifact.Uri)
634639
} else if !strings.HasPrefix(outputArtifact.Uri, "oci://") {
@@ -713,7 +718,7 @@ func downloadArtifacts(ctx context.Context, executorInput *pipelinespec.Executor
713718
for _, artifact := range artifactList.Artifacts {
714719
// Iterating through the artifact list allows for collected artifacts to be properly consumed.
715720
inputArtifact := artifact
716-
localPath, err := LocalPathForURI(inputArtifact.Uri)
721+
localPath, err := retrieveArtifactPath(inputArtifact)
717722
if err != nil {
718723
glog.Warningf("Input Artifact %q does not have a recognized storage URI %q. Skipping downloading to local path.", name, inputArtifact.Uri)
719724

@@ -856,7 +861,7 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
856861
key := fmt.Sprintf(`{{$.inputs.artifacts['%s'].uri}}`, name)
857862
placeholders[key] = inputArtifact.Uri
858863

859-
localPath, err := LocalPathForURI(inputArtifact.Uri)
864+
localPath, err := retrieveArtifactPath(inputArtifact)
860865
if err != nil {
861866
// Input Artifact does not have a recognized storage URI
862867
continue
@@ -875,7 +880,7 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
875880
outputArtifact := artifactList.Artifacts[0]
876881
placeholders[fmt.Sprintf(`{{$.outputs.artifacts['%s'].uri}}`, name)] = outputArtifact.Uri
877882

878-
localPath, err := LocalPathForURI(outputArtifact.Uri)
883+
localPath, err := retrieveArtifactPath(outputArtifact)
879884
if err != nil {
880885
return nil, fmt.Errorf("resolve output artifact %q's local path: %w", name, err)
881886
}
@@ -978,20 +983,27 @@ func getExecutorOutputFile(path string) (*pipelinespec.ExecutorOutput, error) {
978983
return executorOutput, nil
979984
}
980985

981-
func LocalPathForURI(uri string) (string, error) {
982-
if strings.HasPrefix(uri, "gs://") {
983-
return "/gcs/" + strings.TrimPrefix(uri, "gs://"), nil
984-
}
985-
if strings.HasPrefix(uri, "minio://") {
986-
return "/minio/" + strings.TrimPrefix(uri, "minio://"), nil
987-
}
988-
if strings.HasPrefix(uri, "s3://") {
989-
return "/s3/" + strings.TrimPrefix(uri, "s3://"), nil
990-
}
991-
if strings.HasPrefix(uri, "oci://") {
992-
return "/oci/" + strings.ReplaceAll(strings.TrimPrefix(uri, "oci://"), "/", "_") + "/models", nil
986+
func retrieveArtifactPath(artifact *pipelinespec.RuntimeArtifact) (string, error) {
987+
// If artifact custom path is set, use custom path. Otherwise, use URI.
988+
customPath := artifact.CustomPath
989+
if customPath != nil {
990+
return *customPath, nil
991+
} else {
992+
uri := artifact.Uri
993+
if strings.HasPrefix(uri, "gs://") {
994+
return "/gcs/" + strings.TrimPrefix(uri, "gs://"), nil
995+
}
996+
if strings.HasPrefix(uri, "minio://") {
997+
return "/minio/" + strings.TrimPrefix(uri, "minio://"), nil
998+
}
999+
if strings.HasPrefix(uri, "s3://") {
1000+
return "/s3/" + strings.TrimPrefix(uri, "s3://"), nil
1001+
}
1002+
if strings.HasPrefix(uri, "oci://") {
1003+
return "/oci/" + strings.ReplaceAll(strings.TrimPrefix(uri, "oci://"), "/", "_") + "/models", nil
1004+
}
1005+
return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri)
9931006
}
994-
return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri)
9951007
}
9961008

9971009
func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error {
@@ -1009,7 +1021,7 @@ func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error {
10091021

10101022
for _, outputArtifact := range artifactList.Artifacts {
10111023

1012-
localPath, err := LocalPathForURI(outputArtifact.Uri)
1024+
localPath, err := retrieveArtifactPath(outputArtifact)
10131025
if err != nil {
10141026
return fmt.Errorf("failed to generate local storage path for output artifact %q: %w", name, err)
10151027
}

backend/src/v2/driver/driver.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ func addModelcarsToPodSpec(
545545
for i, name := range modelcarArtifactNames {
546546
inputArtifact := modelcarArtifacts[name]
547547

548-
localPath, err := component.LocalPathForURI(inputArtifact.Uri)
548+
localPath, err := localPathForURI(inputArtifact.Uri)
549549
if err != nil {
550550
continue
551551
}
@@ -716,7 +716,7 @@ func provisionOutputs(
716716

717717
// Place the executor output file under localTaskRoot to enable Pythonic artifacts. The SDK's pythonic artifact
718718
// runtime derives CONTAINER_TASK_ROOT from the directory of OutputFile to use it in dsl.get_uri.
719-
if localTaskRoot, err := component.LocalPathForURI(taskRootRemote); err == nil {
719+
if localTaskRoot, err := localPathForURI(taskRootRemote); err == nil {
720720
outputs.OutputFile = filepath.Join(localTaskRoot, "output_metadata.json")
721721
} else {
722722
// Fallback to legacy path if the pipeline root scheme is not recognized.
@@ -742,3 +742,19 @@ func validateVolumeMounts(podSpec *k8score.PodSpec) error {
742742

743743
return nil
744744
}
745+
746+
func localPathForURI(uri string) (string, error) {
747+
if strings.HasPrefix(uri, "gs://") {
748+
return "/gcs/" + strings.TrimPrefix(uri, "gs://"), nil
749+
}
750+
if strings.HasPrefix(uri, "minio://") {
751+
return "/minio/" + strings.TrimPrefix(uri, "minio://"), nil
752+
}
753+
if strings.HasPrefix(uri, "s3://") {
754+
return "/s3/" + strings.TrimPrefix(uri, "s3://"), nil
755+
}
756+
if strings.HasPrefix(uri, "oci://") {
757+
return "/oci/" + strings.ReplaceAll(strings.TrimPrefix(uri, "oci://"), "/", "_") + "/models", nil
758+
}
759+
return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri)
760+
}

0 commit comments

Comments
 (0)