Skip to content

Commit 370ddb7

Browse files
author
Hongyu Zhou
committed
refactor supervisor to use sdk v2 and add checksum
1 parent 47950dd commit 370ddb7

File tree

2 files changed

+53
-20
lines changed

2 files changed

+53
-20
lines changed

pkg/supervisor/archived_snapshot.go

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@ package supervisor
33
import (
44
"bufio"
55
"context"
6+
"crypto/sha256"
7+
"fmt"
68
"io"
79
"net/url"
810
"os"
911
"strings"
1012
"time"
1113

12-
"github.com/aws/aws-sdk-go/aws/session"
13-
"github.com/aws/aws-sdk-go/service/s3/s3manager"
14+
"github.com/aws/aws-sdk-go-v2/config"
15+
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
16+
"github.com/aws/aws-sdk-go-v2/service/s3"
1417
"github.com/pkg/errors"
1518
"github.com/segmentio/events/v2"
1619
"github.com/segmentio/stats/v4"
@@ -52,7 +55,7 @@ type s3Snapshot struct {
5255
Bucket string
5356
Key string
5457
sendToS3Func sendToS3Func
55-
s3Uploader S3Uploader
58+
s3Client S3Client
5659
}
5760

5861
func (c *s3Snapshot) Upload(ctx context.Context, path string) error {
@@ -71,6 +74,13 @@ func (c *s3Snapshot) Upload(ctx context.Context, path string) error {
7174
key = key[1:]
7275
}
7376
var reader io.Reader = bufio.NewReaderSize(f, 1024*32) // use a 32K buffer for reading
77+
78+
h := sha256.New()
79+
if _, err := io.Copy(h, f); err != nil {
80+
events.Log("filed to generate snapshop hash value", err)
81+
}
82+
cs := string(h.Sum(nil))
83+
7484
var gpr *gzipCompressionReader
7585
if strings.HasSuffix(key, ".gz") {
7686
events.Log("Compressing s3 payload with GZIP")
@@ -80,7 +90,7 @@ func (c *s3Snapshot) Upload(ctx context.Context, path string) error {
8090
events.Log("Uploading %{file}s (%d bytes) to %{bucket}s/%{key}s", path, size, c.Bucket, key)
8191

8292
start := time.Now()
83-
if err = c.sendToS3(ctx, key, c.Bucket, reader); err != nil {
93+
if err = c.sendToS3(ctx, key, c.Bucket, reader, cs); err != nil {
8494
return errors.Wrap(err, "send to s3")
8595
}
8696
stats.Observe("ldb-upload-time", time.Since(start), stats.T("compressed", isCompressed(gpr)))
@@ -104,35 +114,56 @@ func isCompressed(gpr *gzipCompressionReader) string {
104114
return "true"
105115
}
106116

107-
func (c *s3Snapshot) sendToS3(ctx context.Context, key string, bucket string, body io.Reader) error {
117+
type BucketBasics struct {
118+
S3Client S3Client
119+
}
120+
121+
func (c *s3Snapshot) sendToS3(ctx context.Context, key string, bucket string, body io.Reader, cs string) error {
108122
if c.sendToS3Func != nil {
109123
return c.sendToS3Func(ctx, key, bucket, body)
110124
}
111-
ul, err := c.getS3Uploader()
125+
126+
client, err := c.getS3Client()
112127
if err != nil {
113128
return err
114129
}
115-
output, err := ul.UploadWithContext(ctx, &s3manager.UploadInput{
116-
Bucket: &bucket,
117-
Key: &key,
118-
Body: body,
130+
131+
var basics = BucketBasics{
132+
S3Client: client,
133+
}
134+
var partMiBs int64 = 16
135+
uploader := manager.NewUploader(basics.S3Client, func(u *manager.Uploader) {
136+
u.PartSize = partMiBs * 1024 * 1024
137+
})
138+
139+
output, err := uploader.Upload(ctx, &s3.PutObjectInput{
140+
Bucket: &bucket,
141+
Key: &key,
142+
Body: body,
143+
ChecksumAlgorithm: "sha256",
144+
ChecksumSHA256: &cs,
119145
})
120146
if err == nil {
121147
events.Log("Wrote to S3 location: %s", output.Location)
148+
} else {
149+
events.Log("Couldn't upload s3 snapshot to %v:%v. Here's why: %v\n",
150+
bucket, key, err)
122151
}
123152
return errors.Wrap(err, "upload with context")
124153
}
125154

126-
func (c *s3Snapshot) getS3Uploader() (S3Uploader, error) {
127-
if c.s3Uploader != nil {
128-
return c.s3Uploader, nil
155+
func (c *s3Snapshot) getS3Client() (S3Client, error) {
156+
if c.s3Client != nil {
157+
return c.s3Client, nil
129158
}
130-
sess, err := session.NewSession()
159+
cfg, err := config.LoadDefaultConfig(context.Background())
160+
131161
if err != nil {
132-
return nil, errors.Wrap(err, "creating aws session")
162+
panic(fmt.Sprintf("failed loading config, %v", err))
133163
}
134-
uploader := s3manager.NewUploader(sess)
135-
return uploader, nil
164+
165+
client := s3.NewFromConfig(cfg)
166+
return client, nil
136167
}
137168

138169
func archivedSnapshotFromURL(URL string) (archivedSnapshot, error) {

pkg/supervisor/s3_uploader.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package supervisor
22

3-
import "github.com/aws/aws-sdk-go/service/s3/s3manager/s3manageriface"
3+
import (
4+
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
5+
)
46

57
//counterfeiter:generate -o fakes/s3_uploader.go . S3Uploader
6-
type S3Uploader interface {
7-
s3manageriface.UploaderAPI
8+
type S3Client interface {
9+
manager.UploadAPIClient
810
}

0 commit comments

Comments
 (0)