@@ -140,7 +140,7 @@ func stopWaitingArtifacts(artifacts map[string]*pipelinespec.ArtifactList) {
140140				continue 
141141			}
142142
143- 			localPath , err  :=  LocalPathForURI (inputArtifact . Uri )
143+ 			localPath , err  :=  retrieveArtifactPath (inputArtifact )
144144			if  err  !=  nil  {
145145				continue 
146146			}
@@ -472,7 +472,7 @@ func getLogWriter(artifacts map[string]*pipelinespec.ArtifactList) (writer io.Wr
472472	}
473473
474474	logURI  :=  logsArtifactList .Artifacts [0 ].Uri 
475- 	logFilePath , err  :=  LocalPathForURI ( logURI )
475+ 	logFilePath , err  :=  retrieveArtifactPath ( logsArtifactList . Artifacts [ 0 ] )
476476	if  err  !=  nil  {
477477		glog .Errorf ("Error converting log artifact URI, %s, to file path." , logURI )
478478		return  os .Stdout 
@@ -628,7 +628,12 @@ func uploadOutputArtifacts(ctx context.Context, executorInput *pipelinespec.Exec
628628			}
629629
630630			// Upload artifacts from local path to remote storages. 
631- 			localDir , err  :=  LocalPathForURI (outputArtifact .Uri )
631+ 			localDir , err  :=  retrieveArtifactPath (outputArtifact )
632+ 
633+ 			if  outputArtifact .CustomPath  !=  nil  {
634+ 				localDir  =  * outputArtifact .CustomPath 
635+ 			}
636+ 
632637			if  err  !=  nil  {
633638				glog .Warningf ("Output Artifact %q does not have a recognized storage URI %q. Skipping uploading to remote storage." , name , outputArtifact .Uri )
634639			} else  if  ! strings .HasPrefix (outputArtifact .Uri , "oci://" ) {
@@ -713,7 +718,7 @@ func downloadArtifacts(ctx context.Context, executorInput *pipelinespec.Executor
713718		for  _ , artifact  :=  range  artifactList .Artifacts  {
714719			// Iterating through the artifact list allows for collected artifacts to be properly consumed. 
715720			inputArtifact  :=  artifact 
716- 			localPath , err  :=  LocalPathForURI (inputArtifact . Uri )
721+ 			localPath , err  :=  retrieveArtifactPath (inputArtifact )
717722			if  err  !=  nil  {
718723				glog .Warningf ("Input Artifact %q does not have a recognized storage URI %q. Skipping downloading to local path." , name , inputArtifact .Uri )
719724
@@ -856,7 +861,7 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
856861		key  :=  fmt .Sprintf (`{{$.inputs.artifacts['%s'].uri}}` , name )
857862		placeholders [key ] =  inputArtifact .Uri 
858863
859- 		localPath , err  :=  LocalPathForURI (inputArtifact . Uri )
864+ 		localPath , err  :=  retrieveArtifactPath (inputArtifact )
860865		if  err  !=  nil  {
861866			// Input Artifact does not have a recognized storage URI 
862867			continue 
@@ -875,7 +880,7 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
875880		outputArtifact  :=  artifactList .Artifacts [0 ]
876881		placeholders [fmt .Sprintf (`{{$.outputs.artifacts['%s'].uri}}` , name )] =  outputArtifact .Uri 
877882
878- 		localPath , err  :=  LocalPathForURI (outputArtifact . Uri )
883+ 		localPath , err  :=  retrieveArtifactPath (outputArtifact )
879884		if  err  !=  nil  {
880885			return  nil , fmt .Errorf ("resolve output artifact %q's local path: %w" , name , err )
881886		}
@@ -978,20 +983,26 @@ func getExecutorOutputFile(path string) (*pipelinespec.ExecutorOutput, error) {
978983	return  executorOutput , nil 
979984}
980985
981- func  LocalPathForURI (uri  string ) (string , error ) {
982- 	if  strings .HasPrefix (uri , "gs://" ) {
983- 		return  "/gcs/"  +  strings .TrimPrefix (uri , "gs://" ), nil 
984- 	}
985- 	if  strings .HasPrefix (uri , "minio://" ) {
986- 		return  "/minio/"  +  strings .TrimPrefix (uri , "minio://" ), nil 
987- 	}
988- 	if  strings .HasPrefix (uri , "s3://" ) {
989- 		return  "/s3/"  +  strings .TrimPrefix (uri , "s3://" ), nil 
990- 	}
991- 	if  strings .HasPrefix (uri , "oci://" ) {
992- 		return  "/oci/"  +  strings .ReplaceAll (strings .TrimPrefix (uri , "oci://" ), "/" , "_" ) +  "/models" , nil 
986+ func  retrieveArtifactPath (artifact  * pipelinespec.RuntimeArtifact ) (string , error ) {
987+ 	customPath  :=  artifact .CustomPath 
988+ 	if  customPath  !=  nil  {
989+ 		return  * customPath , nil 
990+ 	} else  {
991+ 		uri  :=  artifact .Uri 
992+ 		if  strings .HasPrefix (uri , "gs://" ) {
993+ 			return  "/gcs/"  +  strings .TrimPrefix (uri , "gs://" ), nil 
994+ 		}
995+ 		if  strings .HasPrefix (uri , "minio://" ) {
996+ 			return  "/minio/"  +  strings .TrimPrefix (uri , "minio://" ), nil 
997+ 		}
998+ 		if  strings .HasPrefix (uri , "s3://" ) {
999+ 			return  "/s3/"  +  strings .TrimPrefix (uri , "s3://" ), nil 
1000+ 		}
1001+ 		if  strings .HasPrefix (uri , "oci://" ) {
1002+ 			return  "/oci/"  +  strings .ReplaceAll (strings .TrimPrefix (uri , "oci://" ), "/" , "_" ) +  "/models" , nil 
1003+ 		}
1004+ 		return  "" , fmt .Errorf ("failed to generate local path for URI %s: unsupported storage scheme" , uri )
9931005	}
994- 	return  "" , fmt .Errorf ("failed to generate local path for URI %s: unsupported storage scheme" , uri )
9951006}
9961007
9971008func  prepareOutputFolders (executorInput  * pipelinespec.ExecutorInput ) error  {
@@ -1009,7 +1020,7 @@ func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error {
10091020
10101021		for  _ , outputArtifact  :=  range  artifactList .Artifacts  {
10111022
1012- 			localPath , err  :=  LocalPathForURI (outputArtifact . Uri )
1023+ 			localPath , err  :=  retrieveArtifactPath (outputArtifact )
10131024			if  err  !=  nil  {
10141025				return  fmt .Errorf ("failed to generate local storage path for output artifact %q: %w" , name , err )
10151026			}
0 commit comments