Skip to content

Commit dbb6edd

Browse files
committed
Revert LocalPathForURI() removal and update retrieveArtifactPath()
Signed-off-by: agoins <[email protected]>
1 parent 0571b8c commit dbb6edd

File tree

2 files changed

+25
-38
lines changed

2 files changed

+25
-38
lines changed

backend/src/v2/component/launcher_v2.go

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func stopWaitingArtifacts(artifacts map[string]*pipelinespec.ArtifactList) {
140140
continue
141141
}
142142

143-
localPath, err := retrieveArtifactPath(inputArtifact)
143+
localPath, err := LocalPathForURI(inputArtifact.Uri)
144144
if err != nil {
145145
continue
146146
}
@@ -472,7 +472,7 @@ func getLogWriter(artifacts map[string]*pipelinespec.ArtifactList) (writer io.Wr
472472
}
473473

474474
logURI := logsArtifactList.Artifacts[0].Uri
475-
logFilePath, err := retrieveArtifactPath(logsArtifactList.Artifacts[0])
475+
logFilePath, err := LocalPathForURI(logURI)
476476
if err != nil {
477477
glog.Errorf("Error converting log artifact URI, %s, to file path.", logURI)
478478
return os.Stdout
@@ -718,7 +718,7 @@ func downloadArtifacts(ctx context.Context, executorInput *pipelinespec.Executor
718718
for _, artifact := range artifactList.Artifacts {
719719
// Iterating through the artifact list allows for collected artifacts to be properly consumed.
720720
inputArtifact := artifact
721-
localPath, err := retrieveArtifactPath(inputArtifact)
721+
localPath, err := LocalPathForURI(inputArtifact.Uri)
722722
if err != nil {
723723
glog.Warningf("Input Artifact %q does not have a recognized storage URI %q. Skipping downloading to local path.", name, inputArtifact.Uri)
724724

@@ -861,7 +861,7 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
861861
key := fmt.Sprintf(`{{$.inputs.artifacts['%s'].uri}}`, name)
862862
placeholders[key] = inputArtifact.Uri
863863

864-
localPath, err := retrieveArtifactPath(inputArtifact)
864+
localPath, err := LocalPathForURI(inputArtifact.Uri)
865865
if err != nil {
866866
// Input Artifact does not have a recognized storage URI
867867
continue
@@ -880,7 +880,7 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
880880
outputArtifact := artifactList.Artifacts[0]
881881
placeholders[fmt.Sprintf(`{{$.outputs.artifacts['%s'].uri}}`, name)] = outputArtifact.Uri
882882

883-
localPath, err := retrieveArtifactPath(outputArtifact)
883+
localPath, err := LocalPathForURI(outputArtifact.Uri)
884884
if err != nil {
885885
return nil, fmt.Errorf("resolve output artifact %q's local path: %w", name, err)
886886
}
@@ -983,26 +983,29 @@ func getExecutorOutputFile(path string) (*pipelinespec.ExecutorOutput, error) {
983983
return executorOutput, nil
984984
}
985985

986+
func LocalPathForURI(uri string) (string, error) {
987+
if strings.HasPrefix(uri, "gs://") {
988+
return "/gcs/" + strings.TrimPrefix(uri, "gs://"), nil
989+
}
990+
if strings.HasPrefix(uri, "minio://") {
991+
return "/minio/" + strings.TrimPrefix(uri, "minio://"), nil
992+
}
993+
if strings.HasPrefix(uri, "s3://") {
994+
return "/s3/" + strings.TrimPrefix(uri, "s3://"), nil
995+
}
996+
if strings.HasPrefix(uri, "oci://") {
997+
return "/oci/" + strings.ReplaceAll(strings.TrimPrefix(uri, "oci://"), "/", "_") + "/models", nil
998+
}
999+
return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri)
1000+
}
1001+
9861002
func retrieveArtifactPath(artifact *pipelinespec.RuntimeArtifact) (string, error) {
9871003
// If artifact custom path is set, use custom path. Otherwise, use URI.
9881004
customPath := artifact.CustomPath
9891005
if customPath != nil {
9901006
return *customPath, nil
9911007
} else {
992-
uri := artifact.Uri
993-
if strings.HasPrefix(uri, "gs://") {
994-
return "/gcs/" + strings.TrimPrefix(uri, "gs://"), nil
995-
}
996-
if strings.HasPrefix(uri, "minio://") {
997-
return "/minio/" + strings.TrimPrefix(uri, "minio://"), nil
998-
}
999-
if strings.HasPrefix(uri, "s3://") {
1000-
return "/s3/" + strings.TrimPrefix(uri, "s3://"), nil
1001-
}
1002-
if strings.HasPrefix(uri, "oci://") {
1003-
return "/oci/" + strings.ReplaceAll(strings.TrimPrefix(uri, "oci://"), "/", "_") + "/models", nil
1004-
}
1005-
return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri)
1008+
return LocalPathForURI(artifact.Uri)
10061009
}
10071010
}
10081011

@@ -1021,7 +1024,7 @@ func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error {
10211024

10221025
for _, outputArtifact := range artifactList.Artifacts {
10231026

1024-
localPath, err := retrieveArtifactPath(outputArtifact)
1027+
localPath, err := LocalPathForURI(outputArtifact.Uri)
10251028
if err != nil {
10261029
return fmt.Errorf("failed to generate local storage path for output artifact %q: %w", name, err)
10271030
}

backend/src/v2/driver/driver.go

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ func addModelcarsToPodSpec(
545545
for i, name := range modelcarArtifactNames {
546546
inputArtifact := modelcarArtifacts[name]
547547

548-
localPath, err := localPathForURI(inputArtifact.Uri)
548+
localPath, err := component.LocalPathForURI(inputArtifact.Uri)
549549
if err != nil {
550550
continue
551551
}
@@ -716,7 +716,7 @@ func provisionOutputs(
716716

717717
// Place the executor output file under localTaskRoot to enable Pythonic artifacts. The SDK's pythonic artifact
718718
// runtime derives CONTAINER_TASK_ROOT from the directory of OutputFile to use it in dsl.get_uri.
719-
if localTaskRoot, err := localPathForURI(taskRootRemote); err == nil {
719+
if localTaskRoot, err := component.LocalPathForURI(taskRootRemote); err == nil {
720720
outputs.OutputFile = filepath.Join(localTaskRoot, "output_metadata.json")
721721
} else {
722722
// Fallback to legacy path if the pipeline root scheme is not recognized.
@@ -742,19 +742,3 @@ func validateVolumeMounts(podSpec *k8score.PodSpec) error {
742742

743743
return nil
744744
}
745-
746-
func localPathForURI(uri string) (string, error) {
747-
if strings.HasPrefix(uri, "gs://") {
748-
return "/gcs/" + strings.TrimPrefix(uri, "gs://"), nil
749-
}
750-
if strings.HasPrefix(uri, "minio://") {
751-
return "/minio/" + strings.TrimPrefix(uri, "minio://"), nil
752-
}
753-
if strings.HasPrefix(uri, "s3://") {
754-
return "/s3/" + strings.TrimPrefix(uri, "s3://"), nil
755-
}
756-
if strings.HasPrefix(uri, "oci://") {
757-
return "/oci/" + strings.ReplaceAll(strings.TrimPrefix(uri, "oci://"), "/", "_") + "/models", nil
758-
}
759-
return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri)
760-
}

0 commit comments

Comments
 (0)