Skip to content

upgrade minio to v7 #187

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: stable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions backend/src/apiserver/client/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (

"github.com/cenkalti/backoff"
"github.com/golang/glog"
minio "github.com/minio/minio-go/v6"
credentials "github.com/minio/minio-go/v6/pkg/credentials"
minio "github.com/minio/minio-go/v7"
credentials "github.com/minio/minio-go/v7/pkg/credentials"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -50,7 +50,11 @@ func CreateMinioClient(minioServiceHost string, minioServicePort string,
) (*minio.Client, error) {
endpoint := joinHostPort(minioServiceHost, minioServicePort)
cred := createCredentialProvidersChain(endpoint, accessKey, secretKey)
minioClient, err := minio.NewWithCredentials(endpoint, cred, secure, region)
minioClient, err := minio.New(endpoint, &minio.Options{
Creds: cred,
Secure: secure,
Region: region,
})
if err != nil {
return nil, errors.Wrapf(err, "Error while creating object store client: %+v", err)
}
Expand Down
14 changes: 7 additions & 7 deletions backend/src/apiserver/client_manager/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

"github.com/kubeflow/pipelines/backend/src/v2/metadata"
"github.com/minio/minio-go/v6"

"github.com/cenkalti/backoff"
"github.com/go-sql-driver/mysql"
Expand All @@ -39,6 +38,7 @@ import (
"github.com/kubeflow/pipelines/backend/src/apiserver/storage"
"github.com/kubeflow/pipelines/backend/src/common/util"
k8sapi "github.com/kubeflow/pipelines/backend/src/crd/kubernetes/v2beta1"
"github.com/minio/minio-go/v7"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -292,7 +292,7 @@ func (c *ClientManager) init(options *Options) error {
c.dBStatusStore = storage.NewDBStatusStore(db)
c.defaultExperimentStore = storage.NewDefaultExperimentStore(db)
glog.Info("Initializing Minio client...")
c.objectStore = initMinioClient(common.GetDurationConfig(initConnectionTimeout))
c.objectStore = initMinioClient(options.Context, common.GetDurationConfig(initConnectionTimeout))
glog.Info("Minio client initialized successfully")
// Use default value of client QPS (5) & burst (10) defined in
// k8s.io/client-go/rest/config.go#RESTClientFor
Expand Down Expand Up @@ -637,7 +637,7 @@ func initDBDriver(driverName string, initConnectionTimeout time.Duration) string
return sqlConfig
}

func initMinioClient(initConnectionTimeout time.Duration) storage.ObjectStoreInterface {
func initMinioClient(ctx context.Context, initConnectionTimeout time.Duration) storage.ObjectStoreInterface {
// Create minio client.
minioServiceHost := common.GetStringConfigWithDefault(
"ObjectStoreConfig.Host", os.Getenv(minioServiceHost))
Expand All @@ -655,14 +655,14 @@ func initMinioClient(initConnectionTimeout time.Duration) storage.ObjectStoreInt

minioClient := client.CreateMinioClientOrFatal(minioServiceHost, minioServicePort, accessKey,
secretKey, minioServiceSecure, minioServiceRegion, initConnectionTimeout)
createMinioBucket(minioClient, bucketName, minioServiceRegion)
createMinioBucket(ctx, minioClient, bucketName, minioServiceRegion)

return storage.NewMinioObjectStore(&storage.MinioClient{Client: minioClient}, bucketName, pipelinePath, disableMultipart)
}

func createMinioBucket(minioClient *minio.Client, bucketName, region string) {
func createMinioBucket(ctx context.Context, minioClient *minio.Client, bucketName, region string) {
// Check to see if it exists, and we have permission to access it.
exists, err := minioClient.BucketExists(bucketName)
exists, err := minioClient.BucketExists(ctx, bucketName)
if err != nil {
glog.Fatalf("Failed to check if object store bucket exists. Error: %v", err)
}
Expand All @@ -671,7 +671,7 @@ func createMinioBucket(minioClient *minio.Client, bucketName, region string) {
return
}
// Create bucket if it does not exist
err = minioClient.MakeBucket(bucketName, region)
err = minioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{Region: region})
if err != nil {
glog.Fatalf("Failed to create object store bucket. Error: %v", err)
}
Expand Down
18 changes: 9 additions & 9 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ func (r *ResourceManager) readRunLogFromArchive(workflowManifest string, nodeId
return util.NewInternalServerError(err, "Failed to read logs from archive %v", nodeId)
}

logContent, err := r.objectStore.GetFile(logPath)
logContent, err := r.objectStore.GetFile(context.TODO(), logPath)
if err != nil {
return util.NewInternalServerError(err, "Failed to read logs from archive %v due to error fetching the log file", nodeId)
}
Expand Down Expand Up @@ -1544,13 +1544,13 @@ func (r *ResourceManager) fetchTemplateFromPipelineVersion(pipelineVersion *mode
return bytes, pipelineVersion.PipelineSpecURI, nil
} else {
// Try reading object store from pipeline_spec_uri
template, errUri := r.objectStore.GetFile(pipelineVersion.PipelineSpecURI)
template, errUri := r.objectStore.GetFile(context.TODO(), pipelineVersion.PipelineSpecURI)
if errUri != nil {
// Try reading object store from pipeline_version_id
template, errUUID := r.objectStore.GetFile(r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.UUID)))
template, errUUID := r.objectStore.GetFile(context.TODO(), r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.UUID)))
if errUUID != nil {
// Try reading object store from pipeline_id
template, errPipelineId := r.objectStore.GetFile(r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.PipelineId)))
template, errPipelineId := r.objectStore.GetFile(context.TODO(), r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.PipelineId)))
if errPipelineId != nil {
return nil, "", util.Wrap(
util.Wrap(
Expand Down Expand Up @@ -1639,7 +1639,7 @@ func (r *ResourceManager) ReadArtifact(runID string, nodeID string, artifactName
return nil, util.NewResourceNotFoundError(
"artifact", common.CreateArtifactPath(runID, nodeID, artifactName))
}
return r.objectStore.GetFile(artifactPath)
return r.objectStore.GetFile(context.TODO(), artifactPath)
}

// Fetches the default experiment id.
Expand Down Expand Up @@ -2159,17 +2159,17 @@ func (r *ResourceManager) GetSecret(ctx context.Context, namespace, name string)
}

// GetSignedUrl retrieves a signed url for the associated artifact.
func (r *ResourceManager) GetSignedUrl(bucketConfig *objectstore.Config, secret *corev1.Secret, expirySeconds time.Duration, artifactURI string, queryParams url.Values) (string, error) {
signedUrl, err := r.objectStore.GetSignedUrl(bucketConfig, secret, expirySeconds, artifactURI, queryParams)
func (r *ResourceManager) GetSignedUrl(ctx context.Context, bucketConfig *objectstore.Config, secret *corev1.Secret, expirySeconds time.Duration, artifactURI string, queryParams url.Values) (string, error) {
signedUrl, err := r.objectStore.GetSignedUrl(ctx, bucketConfig, secret, expirySeconds, artifactURI, queryParams)
if err != nil {
return "", err
}
return signedUrl, nil
}

// GetObjectSize retrieves the size of the Artifact's object in bytes.
func (r *ResourceManager) GetObjectSize(bucketConfig *objectstore.Config, secret *corev1.Secret, artifactURI string) (int64, error) {
size, err := r.objectStore.GetObjectSize(bucketConfig, secret, artifactURI)
func (r *ResourceManager) GetObjectSize(ctx context.Context, bucketConfig *objectstore.Config, secret *corev1.Secret, artifactURI string) (int64, error) {
size, err := r.objectStore.GetObjectSize(ctx, bucketConfig, secret, artifactURI)
if err != nil {
return 0, err
}
Expand Down
24 changes: 12 additions & 12 deletions backend/src/apiserver/resource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,31 +67,31 @@ func (m *FakeBadObjectStore) GetPipelineKey(pipelineID string) string {
return pipelineID
}

func (m *FakeBadObjectStore) AddFile(template []byte, filePath string) error {
func (m *FakeBadObjectStore) AddFile(ctx context.Context, template []byte, filePath string) error {
return util.NewInternalServerError(errors.New("Error"), "bad object store")
}

func (m *FakeBadObjectStore) DeleteFile(filePath string) error {
func (m *FakeBadObjectStore) DeleteFile(ctx context.Context, filePath string) error {
return errors.New("Not implemented")
}

func (m *FakeBadObjectStore) GetFile(filePath string) ([]byte, error) {
func (m *FakeBadObjectStore) GetFile(ctx context.Context, filePath string) ([]byte, error) {
return []byte(""), nil
}

func (m *FakeBadObjectStore) AddAsYamlFile(o interface{}, filePath string) error {
func (m *FakeBadObjectStore) AddAsYamlFile(ctx context.Context, o interface{}, filePath string) error {
return util.NewInternalServerError(errors.New("Error"), "bad object store")
}

func (m *FakeBadObjectStore) GetFromYamlFile(o interface{}, filePath string) error {
func (m *FakeBadObjectStore) GetFromYamlFile(ctx context.Context, o interface{}, filePath string) error {
return util.NewInternalServerError(errors.New("Error"), "bad object store")
}

func (m *FakeBadObjectStore) GetSignedUrl(bucketConfig *objectstore.Config, secret *corev1.Secret, expirySeconds time.Duration, artifactURI string, queryParams url.Values) (string, error) {
func (m *FakeBadObjectStore) GetSignedUrl(ctx context.Context, bucketConfig *objectstore.Config, secret *corev1.Secret, expirySeconds time.Duration, artifactURI string, queryParams url.Values) (string, error) {
return "", util.NewInternalServerError(errors.New("Error"), "bad object store")
}

func (m *FakeBadObjectStore) GetObjectSize(bucketConfig *objectstore.Config, secret *corev1.Secret, artifactURI string) (int64, error) {
func (m *FakeBadObjectStore) GetObjectSize(ctx context.Context, bucketConfig *objectstore.Config, secret *corev1.Secret, artifactURI string) (int64, error) {
return 0, util.NewInternalServerError(errors.New("Error"), "bad object store")
}

Expand Down Expand Up @@ -1037,7 +1037,7 @@ func TestGetPipelineTemplate_FromPipelineURI(t *testing.T) {
manager := NewResourceManager(store, &ResourceManagerOptions{CollectMetrics: false})

p, _ := manager.CreatePipeline(createPipelineV1("new_pipeline"))
manager.objectStore.AddFile([]byte(testWorkflow.ToStringForStore()), p.UUID)
manager.objectStore.AddFile(context.TODO(), []byte(testWorkflow.ToStringForStore()), p.UUID)
pv := &model.PipelineVersion{
PipelineId: p.UUID,
Name: "new_version",
Expand Down Expand Up @@ -1070,7 +1070,7 @@ func TestGetPipelineTemplate_FromPipelineVersionId(t *testing.T) {
pipelineStore.SetUUIDGenerator(util.NewFakeUUIDGeneratorOrFatal(FakeUUIDOne, nil))
assert.True(t, ok)

manager.objectStore.AddFile([]byte(testWorkflow.ToStringForStore()), manager.objectStore.GetPipelineKey("1000"))
manager.objectStore.AddFile(context.TODO(), []byte(testWorkflow.ToStringForStore()), manager.objectStore.GetPipelineKey("1000"))
pv2, _ := manager.CreatePipelineVersion(pv)
assert.NotEqual(t, p.UUID, pv2.UUID)

Expand All @@ -1093,7 +1093,7 @@ func TestGetPipelineTemplate_FromPipelineId(t *testing.T) {
PipelineSpecURI: p.UUID,
}

manager.objectStore.AddFile([]byte(testWorkflow.ToStringForStore()), manager.objectStore.GetPipelineKey(p.UUID))
manager.objectStore.AddFile(context.TODO(), []byte(testWorkflow.ToStringForStore()), manager.objectStore.GetPipelineKey(p.UUID))

pipelineStore, ok := manager.pipelineStore.(*storage.PipelineStore)
assert.True(t, ok)
Expand All @@ -1111,7 +1111,7 @@ func TestGetPipelineTemplate_PipelineMetadataNotFound(t *testing.T) {
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
defer store.Close()
template := []byte("workflow: foo")
store.objectStore.AddFile(template, store.objectStore.GetPipelineKey(fmt.Sprint(1)))
store.objectStore.AddFile(context.TODO(), template, store.objectStore.GetPipelineKey(fmt.Sprint(1)))
manager := NewResourceManager(store, &ResourceManagerOptions{CollectMetrics: false})
_, err := manager.GetPipelineLatestTemplate("1")
assert.Equal(t, codes.NotFound, err.(*util.UserError).ExternalStatusCode())
Expand Down Expand Up @@ -3327,7 +3327,7 @@ func TestReadArtifact_Succeed(t *testing.T) {

expectedContent := "test"
filePath := "test/file.txt"
store.ObjectStore().AddFile([]byte(expectedContent), filePath)
store.ObjectStore().AddFile(context.TODO(), []byte(expectedContent), filePath)

// Create a scheduled run
// job, _ := manager.CreateJob(model.Job{
Expand Down
6 changes: 3 additions & 3 deletions backend/src/apiserver/server/artifact_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *ArtifactServer) generateResponseArtifact(
if err != nil {
return nil, err
}
size, err := s.resourceManager.GetObjectSize(bucketConfig, secret, *artifact.Uri)
size, err := s.resourceManager.GetObjectSize(ctx, bucketConfig, secret, *artifact.Uri)
if err != nil {
return nil, err
}
Expand All @@ -243,7 +243,7 @@ func (s *ArtifactServer) generateResponseArtifact(
case apiv2beta1.GetArtifactRequest_DOWNLOAD:
queryParams := make(url.Values)
queryParams.Set("response-content-disposition", "attachment")
shareUrl, err := s.resourceManager.GetSignedUrl(bucketConfig, secret, expiry, *artifact.Uri, queryParams)
shareUrl, err := s.resourceManager.GetSignedUrl(ctx, bucketConfig, secret, expiry, *artifact.Uri, queryParams)
if err != nil {
return nil, err
}
Expand All @@ -252,7 +252,7 @@ func (s *ArtifactServer) generateResponseArtifact(
case apiv2beta1.GetArtifactRequest_RENDER:
queryParams := make(url.Values)
queryParams.Set("response-content-disposition", "inline")
renderUrl, err := s.resourceManager.GetSignedUrl(bucketConfig, secret, expiry, *artifact.Uri, queryParams)
renderUrl, err := s.resourceManager.GetSignedUrl(ctx, bucketConfig, secret, expiry, *artifact.Uri, queryParams)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions backend/src/apiserver/server/run_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,7 +1435,7 @@ func TestReadArtifactsV1_Succeed(t *testing.T) {
expectedContent := "test"
filePath := "test/file.txt"
resourceManager, manager, run := initWithOneTimeRun(t)
resourceManager.ObjectStore().AddFile([]byte(expectedContent), filePath)
resourceManager.ObjectStore().AddFile(context.TODO(), []byte(expectedContent), filePath)
workflow := util.NewWorkflow(&v1alpha1.Workflow{
TypeMeta: v1.TypeMeta{
APIVersion: "argoproj.io/v1alpha1",
Expand Down Expand Up @@ -1582,7 +1582,7 @@ func TestReadArtifacts_Succeed(t *testing.T) {
expectedContent := "test"
filePath := "test/file.txt"
resourceManager, manager, run := initWithOneTimeRun(t)
resourceManager.ObjectStore().AddFile([]byte(expectedContent), filePath)
resourceManager.ObjectStore().AddFile(context.TODO(), []byte(expectedContent), filePath)
workflow := util.NewWorkflow(&v1alpha1.Workflow{
TypeMeta: v1.TypeMeta{
APIVersion: "argoproj.io/v1alpha1",
Expand Down
25 changes: 15 additions & 10 deletions backend/src/apiserver/storage/minio_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,35 @@
package storage

import (
"context"
"io"

minio "github.com/minio/minio-go/v6"
minio "github.com/minio/minio-go/v7"
)

// Create interface for minio client struct, making it more unit testable.
type MinioClientInterface interface {
PutObject(bucketName, objectName string, reader io.Reader, objectSize int64, opts minio.PutObjectOptions) (n int64, err error)
GetObject(bucketName, objectName string, opts minio.GetObjectOptions) (io.Reader, error)
DeleteObject(bucketName, objectName string) error
PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64, opts minio.PutObjectOptions) (n int64, err error)
GetObject(ctx context.Context, bucketName, objectName string, opts minio.GetObjectOptions) (io.Reader, error)
DeleteObject(ctx context.Context, bucketName, objectName string) error
}

type MinioClient struct {
Client *minio.Client
}

func (c *MinioClient) PutObject(bucketName, objectName string, reader io.Reader, objectSize int64, opts minio.PutObjectOptions) (n int64, err error) {
return c.Client.PutObject(bucketName, objectName, reader, objectSize, opts)
func (c *MinioClient) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64, opts minio.PutObjectOptions) (n int64, err error) {
info, err := c.Client.PutObject(ctx, bucketName, objectName, reader, objectSize, opts)
if err != nil {
return 0, err
}
return info.Size, nil
}

func (c *MinioClient) GetObject(bucketName, objectName string, opts minio.GetObjectOptions) (io.Reader, error) {
return c.Client.GetObject(bucketName, objectName, opts)
func (c *MinioClient) GetObject(ctx context.Context, bucketName, objectName string, opts minio.GetObjectOptions) (io.Reader, error) {
return c.Client.GetObject(ctx, bucketName, objectName, opts)
}

func (c *MinioClient) DeleteObject(bucketName, objectName string) error {
return c.Client.RemoveObject(bucketName, objectName)
func (c *MinioClient) DeleteObject(ctx context.Context, bucketName, objectName string) error {
return c.Client.RemoveObject(ctx, bucketName, objectName, minio.RemoveObjectOptions{})
}
9 changes: 5 additions & 4 deletions backend/src/apiserver/storage/minio_client_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package storage

import (
"bytes"
"context"
"io"

"github.com/minio/minio-go/v6"
"github.com/minio/minio-go/v7"
"github.com/pkg/errors"
)

Expand All @@ -32,7 +33,7 @@ func NewFakeMinioClient() *FakeMinioClient {
}
}

func (c *FakeMinioClient) PutObject(bucketName, objectName string, reader io.Reader,
func (c *FakeMinioClient) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader,
objectSize int64, opts minio.PutObjectOptions,
) (int64, error) {
buf := new(bytes.Buffer)
Expand All @@ -41,7 +42,7 @@ func (c *FakeMinioClient) PutObject(bucketName, objectName string, reader io.Rea
return 1, nil
}

func (c *FakeMinioClient) GetObject(bucketName, objectName string,
func (c *FakeMinioClient) GetObject(ctx context.Context, bucketName, objectName string,
opts minio.GetObjectOptions,
) (io.Reader, error) {
if _, ok := c.minioClient[objectName]; !ok {
Expand All @@ -50,7 +51,7 @@ func (c *FakeMinioClient) GetObject(bucketName, objectName string,
return bytes.NewReader(c.minioClient[objectName]), nil
}

func (c *FakeMinioClient) DeleteObject(bucketName, objectName string) error {
func (c *FakeMinioClient) DeleteObject(ctx context.Context, bucketName, objectName string) error {
if _, ok := c.minioClient[objectName]; !ok {
return errors.New("object not found")
}
Expand Down
Loading
Loading