diff --git a/backend/api/v1beta1/run.proto b/backend/api/v1beta1/run.proto index bb0f5616cdd..7d0ea643a92 100644 --- a/backend/api/v1beta1/run.proto +++ b/backend/api/v1beta1/run.proto @@ -116,12 +116,6 @@ service RunService { }; } - // Finds a run's artifact data. - rpc ReadArtifactV1(ReadArtifactRequest) returns (ReadArtifactResponse) { - option (google.api.http) = { - get: "/apis/v1beta1/runs/{run_id}/nodes/{node_id}/artifacts/{artifact_name}:read" - }; - } // Terminates an active run. rpc TerminateRunV1(TerminateRunRequest) returns (google.protobuf.Empty) { diff --git a/backend/src/apiserver/main.go b/backend/src/apiserver/main.go index 8cfb8a79c0e..ba4a412a109 100644 --- a/backend/src/apiserver/main.go +++ b/backend/src/apiserver/main.go @@ -324,6 +324,57 @@ func startHttpProxy(resourceManager *resource.ResourceManager, usePipelinesKuber runLogServer := server.NewRunLogServer(resourceManager) topMux.HandleFunc("/apis/v1alpha1/runs/{run_id}/nodes/{node_id}/log", runLogServer.ReadRunLogV1) + topMux.HandleFunc("/apis/v1beta1/runs/{run_id}/nodes/{node_id}/artifacts/{artifact_name}:read", func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + + runId := vars["run_id"] + nodeId := vars["node_id"] + artifactName := vars["artifact_name"] + + if runId == "" || nodeId == "" || artifactName == "" { + http.Error(w, "Missing required parameters", http.StatusBadRequest) + return + } + + run, err := resourceManager.GetRun(runId) + if err != nil { + http.Error(w, "Run not found", http.StatusNotFound) + return + } + + if run.WorkflowRuntimeManifest == "" { + http.Error(w, "V2 IR spec not supported", http.StatusBadRequest) + return + } + + execSpec, err := util.NewExecutionSpecJSON(util.ArgoWorkflow, []byte(run.WorkflowRuntimeManifest)) + if err != nil { + http.Error(w, "Failed to parse workflow", http.StatusInternalServerError) + return + } + + artifactPath := execSpec.ExecutionStatus().FindObjectStoreArtifactKeyOrEmpty(nodeId, artifactName) + if artifactPath == "" { + http.Error(w, "Artifact not found", http.StatusNotFound) + return + } + + reader, err := resourceManager.GetObjectStore().GetFileReader(r.Context(), artifactPath) + if err != nil { + http.Error(w, "File not found", http.StatusNotFound) + return + } + defer reader.Close() + + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Cache-Control", "no-cache, private") + + if _, err := io.Copy(w, reader); err != nil { + glog.Errorf("Failed to stream artifact: %v", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + } + }) + topMux.PathPrefix("/apis/").Handler(runtimeMux) // Register a handler for Prometheus to poll. diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index b3305929e36..6a92783dead 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -15,8 +15,10 @@ package resource import ( + "bufio" "context" "encoding/json" + "errors" "fmt" "io" "net" @@ -38,7 +40,7 @@ import ( exec "github.com/kubeflow/pipelines/backend/src/common" "github.com/kubeflow/pipelines/backend/src/common/util" scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1" - "github.com/pkg/errors" + pkgerrors "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc/codes" @@ -129,6 +131,7 @@ type ResourceManager struct { } func NewResourceManager(clientManager ClientManagerInterface, options *ResourceManagerOptions) *ResourceManager { + k8sCoreClient := clientManager.KubernetesCoreClient() return &ResourceManager{ experimentStore: clientManager.ExperimentStore(), pipelineStore: clientManager.PipelineStore(), @@ -141,7 +144,7 @@ func NewResourceManager(clientManager ClientManagerInterface, options *ResourceM objectStore: clientManager.ObjectStore(), execClient: clientManager.ExecClient(), swfClient: clientManager.SwfClient(), - k8sCoreClient: clientManager.KubernetesCoreClient(), + k8sCoreClient: k8sCoreClient, subjectAccessReviewClient: clientManager.SubjectAccessReviewClient(), tokenReviewClient: clientManager.TokenReviewClient(), logArchive: clientManager.LogArchive(), @@ -666,7 +669,7 @@ func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error { if err != nil { if apierrors.IsConflict(errors.Unwrap(err)) { continue - } else if util.IsNotFound(errors.Cause(err)) { + } else if util.IsNotFound(pkgerrors.Cause(err)) { break } return failedToReconcileSwfCrsError(err) @@ -1543,13 +1546,13 @@ func (r *ResourceManager) fetchTemplateFromPipelineVersion(pipelineVersion *mode } else { // Try reading object store from pipeline_spec_uri // nolint:staticcheck // [ST1003] Field name matches upstream legacy naming - template, errUri := r.objectStore.GetFile(context.TODO(), string(pipelineVersion.PipelineSpecURI)) + template, errUri := r.readFileStreaming(context.TODO(), string(pipelineVersion.PipelineSpecURI)) if errUri != nil { // Try reading object store from pipeline_version_id - template, errUUID := r.objectStore.GetFile(context.TODO(), r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.UUID))) + template, errUUID := r.readFileStreaming(context.TODO(), r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.UUID))) if errUUID != nil { // Try reading object store from pipeline_id - template, errPipelineId := r.objectStore.GetFile(context.TODO(), r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.PipelineId))) + template, errPipelineId := r.readFileStreaming(context.TODO(), r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.PipelineId))) if errPipelineId != nil { return nil, "", util.Wrap( util.Wrap( @@ -1567,6 +1570,36 @@ func (r *ResourceManager) fetchTemplateFromPipelineVersion(pipelineVersion *mode } } +func (r *ResourceManager) readFileStreaming(ctx context.Context, filePath string) ([]byte, error) { + reader, err := r.objectStore.GetFileReader(ctx, filePath) + if err != nil { + return nil, err + } + defer reader.Close() + + return r.loadFileWithSizeLimit(reader, common.MaxFileLength) +} + +func (r *ResourceManager) loadFileWithSizeLimit(fileReader io.Reader, maxFileLength int) ([]byte, error) { + reader := bufio.NewReaderSize(fileReader, maxFileLength) + var fileContent []byte + for { + currentRead := make([]byte, bufio.MaxScanTokenSize) + size, err := reader.Read(currentRead) + fileContent = append(fileContent, currentRead[:size]...) + if err == io.EOF { + break + } + if err != nil { + return nil, util.NewInternalServerError(err, "Error reading file from object store") + } + } + if len(fileContent) > maxFileLength { + return nil, util.NewInternalServerError(nil, "File size too large. Maximum supported size: %v", maxFileLength) + } + return fileContent, nil +} + // Creates the default experiment entry. func (r *ResourceManager) CreateDefaultExperiment(namespace string) (string, error) { // First check that we don't already have a default experiment ID in the DB. @@ -1617,28 +1650,8 @@ func (r *ResourceManager) ReportMetric(metric *model.RunMetric) error { return nil } -// ReadArtifact parses run's workflow to find artifact file path and reads the content of the file -// from object store. -func (r *ResourceManager) ReadArtifact(runID string, nodeID string, artifactName string) ([]byte, error) { - run, err := r.runStore.GetRun(runID) - if err != nil { - return nil, err - } - if run.WorkflowRuntimeManifest == "" { - return nil, util.NewInvalidInputError("read artifact from run with v2 IR spec is not supported") - } - execSpec, err := util.NewExecutionSpecJSON(util.ArgoWorkflow, []byte(run.WorkflowRuntimeManifest)) - if err != nil { - // This should never happen. - return nil, util.NewInternalServerError( - err, "failed to unmarshal workflow '%s'", run.WorkflowRuntimeManifest) - } - artifactPath := execSpec.ExecutionStatus().FindObjectStoreArtifactKeyOrEmpty(nodeID, artifactName) - if artifactPath == "" { - return nil, util.NewResourceNotFoundError( - "artifact", common.CreateArtifactPath(runID, nodeID, artifactName)) - } - return r.objectStore.GetFile(context.TODO(), artifactPath) +func (r *ResourceManager) GetObjectStore() storage.ObjectStoreInterface { + return r.objectStore } // Fetches the default experiment id. diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index 210b7c59579..7572cb8d0b8 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "io" "strings" "testing" "time" @@ -67,6 +68,10 @@ func (m *FakeBadObjectStore) GetFile(ctx context.Context, filePath string) ([]by return []byte(""), nil } +func (m *FakeBadObjectStore) GetFileReader(ctx context.Context, filePath string) (io.ReadCloser, error) { + return nil, util.NewInternalServerError(errors.New("Error"), "bad object store") +} + func (m *FakeBadObjectStore) AddAsYamlFile(ctx context.Context, o interface{}, filePath string) error { return util.NewInternalServerError(errors.New("Error"), "bad object store") } @@ -3301,104 +3306,6 @@ func TestReportScheduledWorkflowResource_Error(t *testing.T) { assert.Contains(t, err.(*util.UserError).String(), "database is closed") } -func TestReadArtifact_Succeed(t *testing.T) { - store, manager, job := initWithJob(t) - defer store.Close() - - expectedContent := "test" - filePath := "test/file.txt" - store.ObjectStore().AddFile(context.TODO(), []byte(expectedContent), filePath) - - // Create a scheduled run - // job, _ := manager.CreateJob(model.Job{ - // Name: "pp1", - // PipelineId: p.UUID, - // Enabled: true, - // }) - workflow := util.NewWorkflow(&v1alpha1.Workflow{ - TypeMeta: v1.TypeMeta{ - APIVersion: "argoproj.io/v1alpha1", - Kind: "Workflow", - }, - ObjectMeta: v1.ObjectMeta{ - Name: "MY_NAME", - Namespace: "MY_NAMESPACE", - UID: "run-1", - Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"}, - CreationTimestamp: v1.NewTime(time.Unix(11, 0).UTC()), - OwnerReferences: []v1.OwnerReference{{ - APIVersion: "kubeflow.org/v1beta1", - Kind: "ScheduledWorkflow", - Name: "SCHEDULE_NAME", - UID: types.UID(job.UUID), - }}, - }, - Status: v1alpha1.WorkflowStatus{ - Nodes: map[string]v1alpha1.NodeStatus{ - "node-1": { - Outputs: &v1alpha1.Outputs{ - Artifacts: []v1alpha1.Artifact{ - { - Name: "artifact-1", - ArtifactLocation: v1alpha1.ArtifactLocation{ - S3: &v1alpha1.S3Artifact{ - Key: filePath, - }, - }, - }, - }, - }, - }, - }, - }, - }) - _, err := manager.ReportWorkflowResource(context.Background(), workflow) - assert.Nil(t, err) - - artifactContent, err := manager.ReadArtifact("run-1", "node-1", "artifact-1") - assert.Nil(t, err) - assert.Equal(t, expectedContent, string(artifactContent)) -} - -func TestReadArtifact_WorkflowNoStatus_NotFound(t *testing.T) { - store, manager, job := initWithJob(t) - defer store.Close() - // report workflow - workflow := util.NewWorkflow(&v1alpha1.Workflow{ - TypeMeta: v1.TypeMeta{ - APIVersion: "argoproj.io/v1alpha1", - Kind: "Workflow", - }, - ObjectMeta: v1.ObjectMeta{ - Name: "MY_NAME", - Namespace: "MY_NAMESPACE", - UID: "run-1", - Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"}, - CreationTimestamp: v1.NewTime(time.Unix(11, 0).UTC()), - OwnerReferences: []v1.OwnerReference{{ - APIVersion: "kubeflow.org/v1beta1", - Kind: "ScheduledWorkflow", - Name: "SCHEDULE_NAME", - UID: types.UID(job.UUID), - }}, - }, - }) - _, err := manager.ReportWorkflowResource(context.Background(), workflow) - assert.Nil(t, err) - - _, err = manager.ReadArtifact("run-1", "node-1", "artifact-1") - assert.True(t, util.IsUserErrorCodeMatch(err, codes.NotFound)) -} - -func TestReadArtifact_NoRun_NotFound(t *testing.T) { - store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) - defer store.Close() - manager := NewResourceManager(store, &ResourceManagerOptions{CollectMetrics: false}) - - _, err := manager.ReadArtifact("run-1", "node-1", "artifact-1") - assert.True(t, util.IsUserErrorCodeMatch(err, codes.NotFound)) -} - const ( v2compatPipeline = ` apiVersion: argoproj.io/v1alpha1 diff --git a/backend/src/apiserver/server/run_server.go b/backend/src/apiserver/server/run_server.go index 3fa17d3de1a..8d61fa079fc 100644 --- a/backend/src/apiserver/server/run_server.go +++ b/backend/src/apiserver/server/run_server.go @@ -16,6 +16,8 @@ package server import ( "context" + "fmt" + "io" "google.golang.org/protobuf/types/known/emptypb" @@ -460,28 +462,6 @@ func (s *RunServerV1) ReportRunMetricsV1(ctx context.Context, request *apiv1beta return &apiv1beta1.ReportRunMetricsResponse{Results: apiResults}, nil } -// Reads an artifact. -// Supports v1beta1 behavior. -func (s *RunServerV1) ReadArtifactV1(ctx context.Context, request *apiv1beta1.ReadArtifactRequest) (*apiv1beta1.ReadArtifactResponse, error) { - if s.options.CollectMetrics { - readArtifactRequests.Inc() - } - - err := s.canAccessRun(ctx, request.RunId, &authorizationv1.ResourceAttributes{Verb: common.RbacResourceVerbReadArtifact}) - if err != nil { - return nil, util.Wrap(err, "Failed to authorize the request") - } - - content, err := s.resourceManager.ReadArtifact( - request.GetRunId(), request.GetNodeId(), request.GetArtifactName()) - if err != nil { - return nil, util.Wrapf(err, "failed to read artifact '%+v'", request) - } - return &apiv1beta1.ReadArtifactResponse{ - Data: content, - }, nil -} - // Terminates a run. // Applies common logic on v1beta1 and v2beta1 API. func (s *BaseRunServer) terminateRun(ctx context.Context, runId string) error { @@ -626,28 +606,6 @@ func (s *RunServer) DeleteRun(ctx context.Context, request *apiv2beta1.DeleteRun return &emptypb.Empty{}, nil } -// Reads an artifact. -// Supports v2beta1 behavior. -func (s *RunServer) ReadArtifact(ctx context.Context, request *apiv2beta1.ReadArtifactRequest) (*apiv2beta1.ReadArtifactResponse, error) { - if s.options.CollectMetrics { - readArtifactRequests.Inc() - } - - err := s.canAccessRun(ctx, request.GetRunId(), &authorizationv1.ResourceAttributes{Verb: common.RbacResourceVerbReadArtifact}) - if err != nil { - return nil, util.Wrap(err, "Failed to authorize the request") - } - - content, err := s.resourceManager.ReadArtifact( - request.GetRunId(), request.GetNodeId(), request.GetArtifactName()) - if err != nil { - return nil, util.Wrapf(err, "failed to read artifact '%+v'", request) - } - return &apiv2beta1.ReadArtifactResponse{ - Data: content, - }, nil -} - // Terminates a run. // Supports v2beta1 behavior. func (s *RunServer) TerminateRun(ctx context.Context, request *apiv2beta1.TerminateRunRequest) (*emptypb.Empty, error) { @@ -676,6 +634,51 @@ func (s *RunServer) RetryRun(ctx context.Context, request *apiv2beta1.RetryRunRe return &emptypb.Empty{}, nil } +// Reads artifact data from a run. +// Supports v2beta1 behavior. +func (s *RunServer) ReadArtifact(ctx context.Context, request *apiv2beta1.ReadArtifactRequest) (*apiv2beta1.ReadArtifactResponse, error) { + if s.options.CollectMetrics { + readArtifactRequests.Inc() + } + + err := s.canAccessRun(ctx, request.GetRunId(), &authorizationv1.ResourceAttributes{Verb: common.RbacResourceVerbGet}) + if err != nil { + return nil, util.Wrap(err, "Failed to authorize the request") + } + + run, err := s.resourceManager.GetRun(request.GetRunId()) + if err != nil { + return nil, util.Wrap(err, "Failed to get run") + } + + if run.WorkflowRuntimeManifest == "" { + return nil, util.NewInvalidInputError("V2 IR spec not supported") + } + + execSpec, err := util.NewExecutionSpecJSON(util.ArgoWorkflow, []byte(run.WorkflowRuntimeManifest)) + if err != nil { + return nil, util.Wrap(err, "Failed to parse workflow") + } + + artifactPath := execSpec.ExecutionStatus().FindObjectStoreArtifactKeyOrEmpty(request.GetNodeId(), request.GetArtifactName()) + if artifactPath == "" { + return nil, util.NewResourceNotFoundError("artifact", fmt.Sprintf("run:%s/node:%s/artifact:%s", request.GetRunId(), request.GetNodeId(), request.GetArtifactName())) + } + + reader, err := s.resourceManager.GetObjectStore().GetFileReader(ctx, artifactPath) + if err != nil { + return nil, util.Wrap(err, "Failed to read artifact") + } + defer reader.Close() + + data, err := io.ReadAll(reader) + if err != nil { + return nil, util.Wrap(err, "Failed to read artifact data") + } + + return &apiv2beta1.ReadArtifactResponse{Data: data}, nil +} + // Checks if a user can access a run. // Adds namespace of the parent experiment of a run id, // API group, version, and resource type. diff --git a/backend/src/apiserver/server/run_server_test.go b/backend/src/apiserver/server/run_server_test.go index 3301abaa2d4..ef251a80595 100644 --- a/backend/src/apiserver/server/run_server_test.go +++ b/backend/src/apiserver/server/run_server_test.go @@ -1481,153 +1481,6 @@ func TestCanAccessRun_Unauthenticated(t *testing.T) { ) } -func TestReadArtifactsV1_Succeed(t *testing.T) { - viper.Set(common.MultiUserMode, "true") - defer viper.Set(common.MultiUserMode, "false") - - md := metadata.New(map[string]string{common.GoogleIAPUserIdentityHeader: common.GoogleIAPUserIdentityPrefix + "user@google.com"}) - ctx := metadata.NewIncomingContext(context.Background(), md) - - expectedContent := "test" - filePath := "test/file.txt" - resourceManager, manager, run := initWithOneTimeRun(t) - resourceManager.ObjectStore().AddFile(context.TODO(), []byte(expectedContent), filePath) - workflow := util.NewWorkflow(&v1alpha1.Workflow{ - TypeMeta: v1.TypeMeta{ - APIVersion: "argoproj.io/v1alpha1", - Kind: "Workflow", - }, - ObjectMeta: v1.ObjectMeta{ - Name: "workflow-name", - Namespace: "ns1", - UID: "workflow1", - Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, - CreationTimestamp: v1.NewTime(time.Unix(11, 0).UTC()), - OwnerReferences: []v1.OwnerReference{{ - APIVersion: "kubeflow.org/v1beta1", - Kind: "Workflow", - Name: "workflow-name", - UID: types.UID(run.UUID), - }}, - }, - Status: v1alpha1.WorkflowStatus{ - Nodes: map[string]v1alpha1.NodeStatus{ - "node-1": { - Outputs: &v1alpha1.Outputs{ - Artifacts: []v1alpha1.Artifact{ - { - Name: "artifact-1", - ArtifactLocation: v1alpha1.ArtifactLocation{ - S3: &v1alpha1.S3Artifact{ - Key: filePath, - }, - }, - }, - }, - }, - }, - }, - }, - }) - _, err := manager.ReportWorkflowResource(context.Background(), workflow) - assert.Nil(t, err) - - runServer := createRunServerV1(manager) - artifact := &apiv1beta1.ReadArtifactRequest{ - RunId: run.UUID, - NodeId: "node-1", - ArtifactName: "artifact-1", - } - response, err := runServer.ReadArtifactV1(ctx, artifact) - assert.Nil(t, err) - - expectedResponse := &apiv1beta1.ReadArtifactResponse{ - Data: []byte(expectedContent), - } - assert.Equal(t, expectedResponse, response) -} - -func TestReadArtifactsV1_Unauthorized(t *testing.T) { - viper.Set(common.MultiUserMode, "true") - defer viper.Set(common.MultiUserMode, "false") - userIdentity := "user@google.com" - md := metadata.New(map[string]string{common.GoogleIAPUserIdentityHeader: common.GoogleIAPUserIdentityPrefix + userIdentity}) - ctx := metadata.NewIncomingContext(context.Background(), md) - - clientManager, _, run := initWithOneTimeRun(t) - - // make the following request unauthorized - clientManager.SubjectAccessReviewClientFake = client.NewFakeSubjectAccessReviewClientUnauthorized() - resourceManager := resource.NewResourceManager(clientManager, &resource.ResourceManagerOptions{CollectMetrics: false}) - - runServer := createRunServerV1(resourceManager) - artifact := &apiv1beta1.ReadArtifactRequest{ - RunId: run.UUID, - NodeId: "node-1", - ArtifactName: "artifact-1", - } - _, err := runServer.ReadArtifactV1(ctx, artifact) - assert.NotNil(t, err) - assert.Contains( - t, - err.Error(), - "User 'user@google.com' is not authorized with reason: this is not allowed", - ) -} - -func TestReadArtifactsV1_Run_NotFound(t *testing.T) { - clientManager := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) - manager := resource.NewResourceManager(clientManager, &resource.ResourceManagerOptions{CollectMetrics: false}) - runServer := createRunServerV1(manager) - artifact := &apiv1beta1.ReadArtifactRequest{ - RunId: "Wrong_RUN_UUID", - NodeId: "node-1", - ArtifactName: "artifact-1", - } - _, err := runServer.ReadArtifactV1(context.Background(), artifact) - assert.NotNil(t, err) - err = err.(*util.UserError) - - assert.True(t, util.IsUserErrorCodeMatch(err, codes.NotFound)) -} - -func TestReadArtifactsV1_Resource_NotFound(t *testing.T) { - _, manager, run := initWithOneTimeRun(t) - - workflow := util.NewWorkflow(&v1alpha1.Workflow{ - TypeMeta: v1.TypeMeta{ - APIVersion: "argoproj.io/v1alpha1", - Kind: "Workflow", - }, - ObjectMeta: v1.ObjectMeta{ - Name: "workflow-name", - Namespace: "ns1", - UID: "workflow1", - Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, - CreationTimestamp: v1.NewTime(time.Unix(11, 0).UTC()), - OwnerReferences: []v1.OwnerReference{{ - APIVersion: "kubeflow.org/v1beta1", - Kind: "Workflow", - Name: "workflow-name", - UID: types.UID(run.UUID), - }}, - }, - }) - _, err := manager.ReportWorkflowResource(context.Background(), workflow) - assert.Nil(t, err) - - runServer := createRunServerV1(manager) - //`artifactRequest` search for node that does not exist - artifactRequest := &apiv1beta1.ReadArtifactRequest{ - RunId: run.UUID, - NodeId: "node-1", - ArtifactName: "artifact-1", - } - _, err = runServer.ReadArtifactV1(context.Background(), artifactRequest) - assert.NotNil(t, err) - assert.True(t, util.IsUserErrorCodeMatch(err, codes.NotFound)) -} - func TestReadArtifacts_Succeed(t *testing.T) { viper.Set(common.MultiUserMode, "true") defer viper.Set(common.MultiUserMode, "false") diff --git a/backend/src/apiserver/storage/object_store.go b/backend/src/apiserver/storage/object_store.go index 9e60b4439f6..debb829a50e 100644 --- a/backend/src/apiserver/storage/object_store.go +++ b/backend/src/apiserver/storage/object_store.go @@ -17,11 +17,14 @@ package storage import ( "bytes" "context" + "io" "path" "regexp" + "strings" "github.com/kubeflow/pipelines/backend/src/common/util" minio "github.com/minio/minio-go/v7" + "gocloud.dev/blob" "sigs.k8s.io/yaml" ) @@ -34,6 +37,7 @@ type ObjectStoreInterface interface { AddFile(ctx context.Context, template []byte, filePath string) error DeleteFile(ctx context.Context, filePath string) error GetFile(ctx context.Context, filePath string) ([]byte, error) + GetFileReader(ctx context.Context, filePath string) (io.ReadCloser, error) AddAsYamlFile(ctx context.Context, o interface{}, filePath string) error GetFromYamlFile(ctx context.Context, o interface{}, filePath string) error GetPipelineKey(pipelineId string) string @@ -45,6 +49,7 @@ type MinioObjectStore struct { bucketName string baseFolder string disableMultipart bool + blobBucket *blob.Bucket } // GetPipelineKey adds the configured base folder to pipeline id. @@ -99,6 +104,22 @@ func (m *MinioObjectStore) GetFile(ctx context.Context, filePath string) ([]byte return bytes, nil } +func (m *MinioObjectStore) GetFileReader(ctx context.Context, filePath string) (io.ReadCloser, error) { + if m.blobBucket != nil { + reader, err := m.blobBucket.NewReader(ctx, filePath, nil) + if err != nil { + return nil, util.NewInternalServerError(err, "Failed to get blob reader for %v", filePath) + } + return reader, nil + } + + data, err := m.GetFile(ctx, filePath) + if err != nil { + return nil, err + } + return io.NopCloser(strings.NewReader(string(data))), nil +} + func (m *MinioObjectStore) AddAsYamlFile(ctx context.Context, o interface{}, filePath string) error { bytes, err := yaml.Marshal(o) if err != nil { @@ -126,3 +147,13 @@ func (m *MinioObjectStore) GetFromYamlFile(ctx context.Context, o interface{}, f func NewMinioObjectStore(minioClient MinioClientInterface, bucketName string, baseFolder string, disableMultipart bool) *MinioObjectStore { return &MinioObjectStore{minioClient: minioClient, bucketName: bucketName, baseFolder: baseFolder, disableMultipart: disableMultipart} } + +func NewMinioObjectStoreWithBlob(minioClient MinioClientInterface, bucketName string, baseFolder string, disableMultipart bool, blobBucket *blob.Bucket) *MinioObjectStore { + return &MinioObjectStore{ + minioClient: minioClient, + bucketName: bucketName, + baseFolder: baseFolder, + disableMultipart: disableMultipart, + blobBucket: blobBucket, + } +}