Skip to content

Commit 8e935aa

Browse files
Merge pull request #185 from HumairAK/minio-upgrade-odh
upgrade minio to v7
2 parents 8a08de2 + c44189a commit 8e935aa

File tree

13 files changed

+139
-125
lines changed

13 files changed

+139
-125
lines changed

backend/src/apiserver/client/minio.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121

2222
"github.com/cenkalti/backoff"
2323
"github.com/golang/glog"
24-
minio "github.com/minio/minio-go/v6"
25-
credentials "github.com/minio/minio-go/v6/pkg/credentials"
24+
minio "github.com/minio/minio-go/v7"
25+
credentials "github.com/minio/minio-go/v7/pkg/credentials"
2626
"github.com/pkg/errors"
2727
)
2828

@@ -50,7 +50,11 @@ func CreateMinioClient(minioServiceHost string, minioServicePort string,
5050
) (*minio.Client, error) {
5151
endpoint := joinHostPort(minioServiceHost, minioServicePort)
5252
cred := createCredentialProvidersChain(endpoint, accessKey, secretKey)
53-
minioClient, err := minio.NewWithCredentials(endpoint, cred, secure, region)
53+
minioClient, err := minio.New(endpoint, &minio.Options{
54+
Creds: cred,
55+
Secure: secure,
56+
Region: region,
57+
})
5458
if err != nil {
5559
return nil, errors.Wrapf(err, "Error while creating object store client: %+v", err)
5660
}

backend/src/apiserver/client_manager/client_manager.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"time"
2525

2626
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
27-
"github.com/minio/minio-go/v6"
2827

2928
"github.com/cenkalti/backoff"
3029
"github.com/go-sql-driver/mysql"
@@ -39,6 +38,7 @@ import (
3938
"github.com/kubeflow/pipelines/backend/src/apiserver/storage"
4039
"github.com/kubeflow/pipelines/backend/src/common/util"
4140
k8sapi "github.com/kubeflow/pipelines/backend/src/crd/kubernetes/v2beta1"
41+
"github.com/minio/minio-go/v7"
4242
"k8s.io/apimachinery/pkg/runtime"
4343
"sigs.k8s.io/controller-runtime/pkg/cache"
4444
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
@@ -292,7 +292,7 @@ func (c *ClientManager) init(options *Options) error {
292292
c.dBStatusStore = storage.NewDBStatusStore(db)
293293
c.defaultExperimentStore = storage.NewDefaultExperimentStore(db)
294294
glog.Info("Initializing Minio client...")
295-
c.objectStore = initMinioClient(common.GetDurationConfig(initConnectionTimeout))
295+
c.objectStore = initMinioClient(options.Context, common.GetDurationConfig(initConnectionTimeout))
296296
glog.Info("Minio client initialized successfully")
297297
// Use default value of client QPS (5) & burst (10) defined in
298298
// k8s.io/client-go/rest/config.go#RESTClientFor
@@ -637,7 +637,7 @@ func initDBDriver(driverName string, initConnectionTimeout time.Duration) string
637637
return sqlConfig
638638
}
639639

640-
func initMinioClient(initConnectionTimeout time.Duration) storage.ObjectStoreInterface {
640+
func initMinioClient(ctx context.Context, initConnectionTimeout time.Duration) storage.ObjectStoreInterface {
641641
// Create minio client.
642642
minioServiceHost := common.GetStringConfigWithDefault(
643643
"ObjectStoreConfig.Host", os.Getenv(minioServiceHost))
@@ -655,14 +655,14 @@ func initMinioClient(initConnectionTimeout time.Duration) storage.ObjectStoreInt
655655

656656
minioClient := client.CreateMinioClientOrFatal(minioServiceHost, minioServicePort, accessKey,
657657
secretKey, minioServiceSecure, minioServiceRegion, initConnectionTimeout)
658-
createMinioBucket(minioClient, bucketName, minioServiceRegion)
658+
createMinioBucket(ctx, minioClient, bucketName, minioServiceRegion)
659659

660660
return storage.NewMinioObjectStore(&storage.MinioClient{Client: minioClient}, bucketName, pipelinePath, disableMultipart)
661661
}
662662

663-
func createMinioBucket(minioClient *minio.Client, bucketName, region string) {
663+
func createMinioBucket(ctx context.Context, minioClient *minio.Client, bucketName, region string) {
664664
// Check to see if it exists, and we have permission to access it.
665-
exists, err := minioClient.BucketExists(bucketName)
665+
exists, err := minioClient.BucketExists(ctx, bucketName)
666666
if err != nil {
667667
glog.Fatalf("Failed to check if object store bucket exists. Error: %v", err)
668668
}
@@ -671,7 +671,7 @@ func createMinioBucket(minioClient *minio.Client, bucketName, region string) {
671671
return
672672
}
673673
// Create bucket if it does not exist
674-
err = minioClient.MakeBucket(bucketName, region)
674+
err = minioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{Region: region})
675675
if err != nil {
676676
glog.Fatalf("Failed to create object store bucket. Error: %v", err)
677677
}

backend/src/apiserver/resource/resource_manager.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,7 +1006,7 @@ func (r *ResourceManager) readRunLogFromArchive(workflowManifest string, nodeId
10061006
return util.NewInternalServerError(err, "Failed to read logs from archive %v", nodeId)
10071007
}
10081008

1009-
logContent, err := r.objectStore.GetFile(logPath)
1009+
logContent, err := r.objectStore.GetFile(context.TODO(), logPath)
10101010
if err != nil {
10111011
return util.NewInternalServerError(err, "Failed to read logs from archive %v due to error fetching the log file", nodeId)
10121012
}
@@ -1544,13 +1544,13 @@ func (r *ResourceManager) fetchTemplateFromPipelineVersion(pipelineVersion *mode
15441544
return bytes, pipelineVersion.PipelineSpecURI, nil
15451545
} else {
15461546
// Try reading object store from pipeline_spec_uri
1547-
template, errUri := r.objectStore.GetFile(pipelineVersion.PipelineSpecURI)
1547+
template, errUri := r.objectStore.GetFile(context.TODO(), pipelineVersion.PipelineSpecURI)
15481548
if errUri != nil {
15491549
// Try reading object store from pipeline_version_id
1550-
template, errUUID := r.objectStore.GetFile(r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.UUID)))
1550+
template, errUUID := r.objectStore.GetFile(context.TODO(), r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.UUID)))
15511551
if errUUID != nil {
15521552
// Try reading object store from pipeline_id
1553-
template, errPipelineId := r.objectStore.GetFile(r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.PipelineId)))
1553+
template, errPipelineId := r.objectStore.GetFile(context.TODO(), r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.PipelineId)))
15541554
if errPipelineId != nil {
15551555
return nil, "", util.Wrap(
15561556
util.Wrap(
@@ -1639,7 +1639,7 @@ func (r *ResourceManager) ReadArtifact(runID string, nodeID string, artifactName
16391639
return nil, util.NewResourceNotFoundError(
16401640
"artifact", common.CreateArtifactPath(runID, nodeID, artifactName))
16411641
}
1642-
return r.objectStore.GetFile(artifactPath)
1642+
return r.objectStore.GetFile(context.TODO(), artifactPath)
16431643
}
16441644

16451645
// Fetches the default experiment id.
@@ -2159,17 +2159,17 @@ func (r *ResourceManager) GetSecret(ctx context.Context, namespace, name string)
21592159
}
21602160

21612161
// GetSignedUrl retrieves a signed url for the associated artifact.
2162-
func (r *ResourceManager) GetSignedUrl(bucketConfig *objectstore.Config, secret *corev1.Secret, expirySeconds time.Duration, artifactURI string, queryParams url.Values) (string, error) {
2163-
signedUrl, err := r.objectStore.GetSignedUrl(bucketConfig, secret, expirySeconds, artifactURI, queryParams)
2162+
func (r *ResourceManager) GetSignedUrl(ctx context.Context, bucketConfig *objectstore.Config, secret *corev1.Secret, expirySeconds time.Duration, artifactURI string, queryParams url.Values) (string, error) {
2163+
signedUrl, err := r.objectStore.GetSignedUrl(ctx, bucketConfig, secret, expirySeconds, artifactURI, queryParams)
21642164
if err != nil {
21652165
return "", err
21662166
}
21672167
return signedUrl, nil
21682168
}
21692169

21702170
// GetObjectSize retrieves the size of the Artifact's object in bytes.
2171-
func (r *ResourceManager) GetObjectSize(bucketConfig *objectstore.Config, secret *corev1.Secret, artifactURI string) (int64, error) {
2172-
size, err := r.objectStore.GetObjectSize(bucketConfig, secret, artifactURI)
2171+
func (r *ResourceManager) GetObjectSize(ctx context.Context, bucketConfig *objectstore.Config, secret *corev1.Secret, artifactURI string) (int64, error) {
2172+
size, err := r.objectStore.GetObjectSize(ctx, bucketConfig, secret, artifactURI)
21732173
if err != nil {
21742174
return 0, err
21752175
}

backend/src/apiserver/resource/resource_manager_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -67,31 +67,31 @@ func (m *FakeBadObjectStore) GetPipelineKey(pipelineID string) string {
6767
return pipelineID
6868
}
6969

70-
func (m *FakeBadObjectStore) AddFile(template []byte, filePath string) error {
70+
func (m *FakeBadObjectStore) AddFile(ctx context.Context, template []byte, filePath string) error {
7171
return util.NewInternalServerError(errors.New("Error"), "bad object store")
7272
}
7373

74-
func (m *FakeBadObjectStore) DeleteFile(filePath string) error {
74+
func (m *FakeBadObjectStore) DeleteFile(ctx context.Context, filePath string) error {
7575
return errors.New("Not implemented")
7676
}
7777

78-
func (m *FakeBadObjectStore) GetFile(filePath string) ([]byte, error) {
78+
func (m *FakeBadObjectStore) GetFile(ctx context.Context, filePath string) ([]byte, error) {
7979
return []byte(""), nil
8080
}
8181

82-
func (m *FakeBadObjectStore) AddAsYamlFile(o interface{}, filePath string) error {
82+
func (m *FakeBadObjectStore) AddAsYamlFile(ctx context.Context, o interface{}, filePath string) error {
8383
return util.NewInternalServerError(errors.New("Error"), "bad object store")
8484
}
8585

86-
func (m *FakeBadObjectStore) GetFromYamlFile(o interface{}, filePath string) error {
86+
func (m *FakeBadObjectStore) GetFromYamlFile(ctx context.Context, o interface{}, filePath string) error {
8787
return util.NewInternalServerError(errors.New("Error"), "bad object store")
8888
}
8989

90-
func (m *FakeBadObjectStore) GetSignedUrl(bucketConfig *objectstore.Config, secret *corev1.Secret, expirySeconds time.Duration, artifactURI string, queryParams url.Values) (string, error) {
90+
func (m *FakeBadObjectStore) GetSignedUrl(ctx context.Context, bucketConfig *objectstore.Config, secret *corev1.Secret, expirySeconds time.Duration, artifactURI string, queryParams url.Values) (string, error) {
9191
return "", util.NewInternalServerError(errors.New("Error"), "bad object store")
9292
}
9393

94-
func (m *FakeBadObjectStore) GetObjectSize(bucketConfig *objectstore.Config, secret *corev1.Secret, artifactURI string) (int64, error) {
94+
func (m *FakeBadObjectStore) GetObjectSize(ctx context.Context, bucketConfig *objectstore.Config, secret *corev1.Secret, artifactURI string) (int64, error) {
9595
return 0, util.NewInternalServerError(errors.New("Error"), "bad object store")
9696
}
9797

@@ -1037,7 +1037,7 @@ func TestGetPipelineTemplate_FromPipelineURI(t *testing.T) {
10371037
manager := NewResourceManager(store, &ResourceManagerOptions{CollectMetrics: false})
10381038

10391039
p, _ := manager.CreatePipeline(createPipelineV1("new_pipeline"))
1040-
manager.objectStore.AddFile([]byte(testWorkflow.ToStringForStore()), p.UUID)
1040+
manager.objectStore.AddFile(context.TODO(), []byte(testWorkflow.ToStringForStore()), p.UUID)
10411041
pv := &model.PipelineVersion{
10421042
PipelineId: p.UUID,
10431043
Name: "new_version",
@@ -1070,7 +1070,7 @@ func TestGetPipelineTemplate_FromPipelineVersionId(t *testing.T) {
10701070
pipelineStore.SetUUIDGenerator(util.NewFakeUUIDGeneratorOrFatal(FakeUUIDOne, nil))
10711071
assert.True(t, ok)
10721072

1073-
manager.objectStore.AddFile([]byte(testWorkflow.ToStringForStore()), manager.objectStore.GetPipelineKey("1000"))
1073+
manager.objectStore.AddFile(context.TODO(), []byte(testWorkflow.ToStringForStore()), manager.objectStore.GetPipelineKey("1000"))
10741074
pv2, _ := manager.CreatePipelineVersion(pv)
10751075
assert.NotEqual(t, p.UUID, pv2.UUID)
10761076

@@ -1093,7 +1093,7 @@ func TestGetPipelineTemplate_FromPipelineId(t *testing.T) {
10931093
PipelineSpecURI: p.UUID,
10941094
}
10951095

1096-
manager.objectStore.AddFile([]byte(testWorkflow.ToStringForStore()), manager.objectStore.GetPipelineKey(p.UUID))
1096+
manager.objectStore.AddFile(context.TODO(), []byte(testWorkflow.ToStringForStore()), manager.objectStore.GetPipelineKey(p.UUID))
10971097

10981098
pipelineStore, ok := manager.pipelineStore.(*storage.PipelineStore)
10991099
assert.True(t, ok)
@@ -1111,7 +1111,7 @@ func TestGetPipelineTemplate_PipelineMetadataNotFound(t *testing.T) {
11111111
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
11121112
defer store.Close()
11131113
template := []byte("workflow: foo")
1114-
store.objectStore.AddFile(template, store.objectStore.GetPipelineKey(fmt.Sprint(1)))
1114+
store.objectStore.AddFile(context.TODO(), template, store.objectStore.GetPipelineKey(fmt.Sprint(1)))
11151115
manager := NewResourceManager(store, &ResourceManagerOptions{CollectMetrics: false})
11161116
_, err := manager.GetPipelineLatestTemplate("1")
11171117
assert.Equal(t, codes.NotFound, err.(*util.UserError).ExternalStatusCode())
@@ -3327,7 +3327,7 @@ func TestReadArtifact_Succeed(t *testing.T) {
33273327

33283328
expectedContent := "test"
33293329
filePath := "test/file.txt"
3330-
store.ObjectStore().AddFile([]byte(expectedContent), filePath)
3330+
store.ObjectStore().AddFile(context.TODO(), []byte(expectedContent), filePath)
33313331

33323332
// Create a scheduled run
33333333
// job, _ := manager.CreateJob(model.Job{

backend/src/apiserver/server/artifact_server.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func (s *ArtifactServer) generateResponseArtifact(
222222
if err != nil {
223223
return nil, err
224224
}
225-
size, err := s.resourceManager.GetObjectSize(bucketConfig, secret, *artifact.Uri)
225+
size, err := s.resourceManager.GetObjectSize(ctx, bucketConfig, secret, *artifact.Uri)
226226
if err != nil {
227227
return nil, err
228228
}
@@ -243,7 +243,7 @@ func (s *ArtifactServer) generateResponseArtifact(
243243
case apiv2beta1.GetArtifactRequest_DOWNLOAD:
244244
queryParams := make(url.Values)
245245
queryParams.Set("response-content-disposition", "attachment")
246-
shareUrl, err := s.resourceManager.GetSignedUrl(bucketConfig, secret, expiry, *artifact.Uri, queryParams)
246+
shareUrl, err := s.resourceManager.GetSignedUrl(ctx, bucketConfig, secret, expiry, *artifact.Uri, queryParams)
247247
if err != nil {
248248
return nil, err
249249
}
@@ -252,7 +252,7 @@ func (s *ArtifactServer) generateResponseArtifact(
252252
case apiv2beta1.GetArtifactRequest_RENDER:
253253
queryParams := make(url.Values)
254254
queryParams.Set("response-content-disposition", "inline")
255-
renderUrl, err := s.resourceManager.GetSignedUrl(bucketConfig, secret, expiry, *artifact.Uri, queryParams)
255+
renderUrl, err := s.resourceManager.GetSignedUrl(ctx, bucketConfig, secret, expiry, *artifact.Uri, queryParams)
256256
if err != nil {
257257
return nil, err
258258
}

backend/src/apiserver/server/run_server_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1435,7 +1435,7 @@ func TestReadArtifactsV1_Succeed(t *testing.T) {
14351435
expectedContent := "test"
14361436
filePath := "test/file.txt"
14371437
resourceManager, manager, run := initWithOneTimeRun(t)
1438-
resourceManager.ObjectStore().AddFile([]byte(expectedContent), filePath)
1438+
resourceManager.ObjectStore().AddFile(context.TODO(), []byte(expectedContent), filePath)
14391439
workflow := util.NewWorkflow(&v1alpha1.Workflow{
14401440
TypeMeta: v1.TypeMeta{
14411441
APIVersion: "argoproj.io/v1alpha1",
@@ -1582,7 +1582,7 @@ func TestReadArtifacts_Succeed(t *testing.T) {
15821582
expectedContent := "test"
15831583
filePath := "test/file.txt"
15841584
resourceManager, manager, run := initWithOneTimeRun(t)
1585-
resourceManager.ObjectStore().AddFile([]byte(expectedContent), filePath)
1585+
resourceManager.ObjectStore().AddFile(context.TODO(), []byte(expectedContent), filePath)
15861586
workflow := util.NewWorkflow(&v1alpha1.Workflow{
15871587
TypeMeta: v1.TypeMeta{
15881588
APIVersion: "argoproj.io/v1alpha1",

backend/src/apiserver/storage/minio_client.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,35 @@
1515
package storage
1616

1717
import (
18+
"context"
1819
"io"
1920

20-
minio "github.com/minio/minio-go/v6"
21+
minio "github.com/minio/minio-go/v7"
2122
)
2223

2324
// Create interface for minio client struct, making it more unit testable.
2425
type MinioClientInterface interface {
25-
PutObject(bucketName, objectName string, reader io.Reader, objectSize int64, opts minio.PutObjectOptions) (n int64, err error)
26-
GetObject(bucketName, objectName string, opts minio.GetObjectOptions) (io.Reader, error)
27-
DeleteObject(bucketName, objectName string) error
26+
PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64, opts minio.PutObjectOptions) (n int64, err error)
27+
GetObject(ctx context.Context, bucketName, objectName string, opts minio.GetObjectOptions) (io.Reader, error)
28+
DeleteObject(ctx context.Context, bucketName, objectName string) error
2829
}
2930

3031
type MinioClient struct {
3132
Client *minio.Client
3233
}
3334

34-
func (c *MinioClient) PutObject(bucketName, objectName string, reader io.Reader, objectSize int64, opts minio.PutObjectOptions) (n int64, err error) {
35-
return c.Client.PutObject(bucketName, objectName, reader, objectSize, opts)
35+
func (c *MinioClient) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64, opts minio.PutObjectOptions) (n int64, err error) {
36+
info, err := c.Client.PutObject(ctx, bucketName, objectName, reader, objectSize, opts)
37+
if err != nil {
38+
return 0, err
39+
}
40+
return info.Size, nil
3641
}
3742

38-
func (c *MinioClient) GetObject(bucketName, objectName string, opts minio.GetObjectOptions) (io.Reader, error) {
39-
return c.Client.GetObject(bucketName, objectName, opts)
43+
func (c *MinioClient) GetObject(ctx context.Context, bucketName, objectName string, opts minio.GetObjectOptions) (io.Reader, error) {
44+
return c.Client.GetObject(ctx, bucketName, objectName, opts)
4045
}
4146

42-
func (c *MinioClient) DeleteObject(bucketName, objectName string) error {
43-
return c.Client.RemoveObject(bucketName, objectName)
47+
func (c *MinioClient) DeleteObject(ctx context.Context, bucketName, objectName string) error {
48+
return c.Client.RemoveObject(ctx, bucketName, objectName, minio.RemoveObjectOptions{})
4449
}

backend/src/apiserver/storage/minio_client_fake.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ package storage
1616

1717
import (
1818
"bytes"
19+
"context"
1920
"io"
2021

21-
"github.com/minio/minio-go/v6"
22+
"github.com/minio/minio-go/v7"
2223
"github.com/pkg/errors"
2324
)
2425

@@ -32,7 +33,7 @@ func NewFakeMinioClient() *FakeMinioClient {
3233
}
3334
}
3435

35-
func (c *FakeMinioClient) PutObject(bucketName, objectName string, reader io.Reader,
36+
func (c *FakeMinioClient) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader,
3637
objectSize int64, opts minio.PutObjectOptions,
3738
) (int64, error) {
3839
buf := new(bytes.Buffer)
@@ -41,7 +42,7 @@ func (c *FakeMinioClient) PutObject(bucketName, objectName string, reader io.Rea
4142
return 1, nil
4243
}
4344

44-
func (c *FakeMinioClient) GetObject(bucketName, objectName string,
45+
func (c *FakeMinioClient) GetObject(ctx context.Context, bucketName, objectName string,
4546
opts minio.GetObjectOptions,
4647
) (io.Reader, error) {
4748
if _, ok := c.minioClient[objectName]; !ok {
@@ -50,7 +51,7 @@ func (c *FakeMinioClient) GetObject(bucketName, objectName string,
5051
return bytes.NewReader(c.minioClient[objectName]), nil
5152
}
5253

53-
func (c *FakeMinioClient) DeleteObject(bucketName, objectName string) error {
54+
func (c *FakeMinioClient) DeleteObject(ctx context.Context, bucketName, objectName string) error {
5455
if _, ok := c.minioClient[objectName]; !ok {
5556
return errors.New("object not found")
5657
}

0 commit comments

Comments
 (0)