Skip to content

Commit c3c4364

Browse files
committed
cloud/amazon: add option to skip checksums
Our files (SSTs) often have internal checksums anyway, and we have ecplicit checksum files for those that don't that are cloud agnostic, so we do not generally need the extra checksums in s3-specific metadata. Computing them has non-zero cost as well, so users may wish to disable them. Indeed, we almost did so by default a couple years ago before realizing that they are required by the S3 API if certain other features, like object locking, are enabled. So while some users may not be able to or want to disable them, e.g. if they're using object locking, we can provide the option to do so to those who would want it. This may also be useful to users of certain s3-like storage services that have different compatibility with different checksums. Release note: none. Epic: none.
1 parent b1fab87 commit c3c4364

File tree

3 files changed

+60
-6
lines changed

3 files changed

+60
-6
lines changed

pkg/cloud/amazon/s3_storage.go

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ const (
5757
AWSEndpointParam = "AWS_ENDPOINT"
5858
// AWSEndpointParam is the query parameter for UsePathStyle in S3 options.
5959
AWSUsePathStyle = "AWS_USE_PATH_STYLE"
60+
// AWSSkipChecksumParam is the query parameter for SkipChecksum in S3 options.
61+
AWSSkipChecksumParam = "AWS_SKIP_CHECKSUM"
6062

6163
// AWSServerSideEncryptionMode is the query parameter in an AWS URI, for the
6264
// mode to be used for server side encryption. It can either be AES256 or
@@ -200,6 +202,7 @@ type s3ClientConfig struct {
200202
assumeRoleProvider roleProvider
201203
delegateRoleProviders []roleProvider
202204

205+
skipChecksum bool
203206
// log.V(2) decides session init params so include it in key.
204207
verbose bool
205208
}
@@ -232,6 +235,7 @@ func clientConfig(conf *cloudpb.ExternalStorage_S3) s3ClientConfig {
232235
return s3ClientConfig{
233236
endpoint: conf.Endpoint,
234237
usePathStyle: conf.UsePathStyle,
238+
skipChecksum: conf.SkipChecksum,
235239
region: conf.Region,
236240
bucket: conf.Bucket,
237241
accessKey: conf.AccessKey,
@@ -280,6 +284,9 @@ func S3URI(bucket, path string, conf *cloudpb.ExternalStorage_S3) string {
280284
if conf.UsePathStyle {
281285
q.Set(AWSUsePathStyle, "true")
282286
}
287+
if conf.SkipChecksum {
288+
q.Set(AWSSkipChecksumParam, "true")
289+
}
283290
if conf.AssumeRoleProvider.Role != "" {
284291
roleProviderStrings := make([]string, 0, len(conf.DelegateRoleProviders)+1)
285292
for _, p := range conf.DelegateRoleProviders {
@@ -325,6 +332,15 @@ func parseS3URL(uri *url.URL) (cloudpb.ExternalStorage, error) {
325332
return cloudpb.ExternalStorage{}, errors.Wrapf(err, "cannot parse %s as bool", AWSUsePathStyle)
326333
}
327334
}
335+
skipChecksumStr := s3URL.ConsumeParam(AWSSkipChecksumParam)
336+
skipChecksumBool := false
337+
if skipChecksumStr != "" {
338+
var err error
339+
skipChecksumBool, err = strconv.ParseBool(skipChecksumStr)
340+
if err != nil {
341+
return cloudpb.ExternalStorage{}, errors.Wrapf(err, "cannot parse %s as bool", AWSSkipChecksumParam)
342+
}
343+
}
328344

329345
conf.S3Config = &cloudpb.ExternalStorage_S3{
330346
Bucket: s3URL.Host,
@@ -334,6 +350,7 @@ func parseS3URL(uri *url.URL) (cloudpb.ExternalStorage, error) {
334350
TempToken: s3URL.ConsumeParam(AWSTempTokenParam),
335351
Endpoint: s3URL.ConsumeParam(AWSEndpointParam),
336352
UsePathStyle: pathStyleBool,
353+
SkipChecksum: skipChecksumBool,
337354
Region: s3URL.ConsumeParam(S3RegionParam),
338355
Auth: s3URL.ConsumeParam(cloud.AuthParam),
339356
ServerEncMode: s3URL.ConsumeParam(AWSServerSideEncryptionMode),
@@ -590,6 +607,11 @@ func (s *s3Storage) newClient(ctx context.Context) (s3Client, string, error) {
590607
return s3Client{}, "", errors.Wrap(err, "could not initialize an aws config")
591608
}
592609

610+
if s.opts.skipChecksum {
611+
cfg.ResponseChecksumValidation = aws.ResponseChecksumValidationWhenRequired
612+
cfg.RequestChecksumCalculation = aws.RequestChecksumCalculationWhenRequired
613+
}
614+
593615
var endpointURI string
594616
if s.opts.endpoint != "" {
595617
var err error
@@ -728,7 +750,7 @@ func (s *s3Storage) putUploader(ctx context.Context, basename string) (io.WriteC
728750

729751
buf := bytes.NewBuffer(make([]byte, 0, 4<<20))
730752

731-
return &putUploader{
753+
uploader := &putUploader{
732754
b: buf,
733755
input: &s3.PutObjectInput{
734756
Bucket: s.bucket,
@@ -739,7 +761,11 @@ func (s *s3Storage) putUploader(ctx context.Context, basename string) (io.WriteC
739761
ChecksumAlgorithm: checksumAlgorithm,
740762
},
741763
client: client,
742-
}, nil
764+
}
765+
if s.conf.SkipChecksum {
766+
uploader.input.ChecksumAlgorithm = ""
767+
}
768+
return uploader, nil
743769
}
744770

745771
func (s *s3Storage) Writer(ctx context.Context, basename string) (io.WriteCloser, error) {
@@ -757,16 +783,21 @@ func (s *s3Storage) Writer(ctx context.Context, basename string) (io.WriteCloser
757783
return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error {
758784
defer sp.Finish()
759785
// Upload the file to S3.
760-
// TODO(dt): test and tune the uploader parameters.
761-
_, err := uploader.Upload(ctx, &s3.PutObjectInput{
786+
input := &s3.PutObjectInput{
762787
Bucket: s.bucket,
763788
Key: aws.String(path.Join(s.prefix, basename)),
764789
Body: r,
765790
ServerSideEncryption: types.ServerSideEncryption(s.conf.ServerEncMode),
766791
SSEKMSKeyId: nilIfEmpty(s.conf.ServerKMSID),
767792
StorageClass: types.StorageClass(s.conf.StorageClass),
768793
ChecksumAlgorithm: checksumAlgorithm,
769-
})
794+
}
795+
796+
if s.conf.SkipChecksum {
797+
input.ChecksumAlgorithm = ""
798+
}
799+
800+
_, err := uploader.Upload(ctx, input)
770801
err = interpretAWSError(err)
771802
err = errors.Wrap(err, "upload failed")
772803
// Mark with ctx's error for upstream code to not interpret this as
@@ -787,7 +818,9 @@ func (s *s3Storage) openStreamAt(
787818
if err != nil {
788819
return nil, err
789820
}
790-
req := &s3.GetObjectInput{Bucket: s.bucket, Key: aws.String(path.Join(s.prefix, basename))}
821+
req := &s3.GetObjectInput{
822+
Bucket: s.bucket, Key: aws.String(path.Join(s.prefix, basename)),
823+
}
791824
if endPos != 0 {
792825
if pos >= endPos {
793826
return nil, io.EOF

pkg/cloud/amazon/s3_storage_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,22 @@ func TestPutS3(t *testing.T) {
167167
cloudtestutils.CheckExportStore(t, info)
168168
})
169169

170+
t.Run("skip-checksums", func(t *testing.T) {
171+
if locked {
172+
skip.IgnoreLint(t, "object-locked buckets do not support skipping checksums")
173+
}
174+
skipIfNoDefaultConfig(t, ctx)
175+
info := cloudtestutils.StoreInfo{
176+
URI: fmt.Sprintf(
177+
"s3://%s/%s-%d?%s=%s&%s=true",
178+
bucket, "backup-test-skip-checksums", testID,
179+
cloud.AuthParam, cloud.AuthParamImplicit, AWSSkipChecksumParam,
180+
),
181+
User: user,
182+
}
183+
cloudtestutils.CheckExportStore(t, info)
184+
})
185+
170186
t.Run("server-side-encryption-invalid-params", func(t *testing.T) {
171187
skipIfNoDefaultConfig(t, ctx)
172188
// Unsupported server side encryption option.
@@ -357,6 +373,7 @@ func TestPutS3Endpoint(t *testing.T) {
357373
}
358374
cloudtestutils.CheckExportStore(t, info)
359375
})
376+
360377
t.Run("use-path-style", func(t *testing.T) {
361378
// EngFlow machines have no internet access, and queries even to localhost will time out.
362379
// So this test is skipped above.

pkg/cloud/cloudpb/external_storage.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ message ExternalStorage {
6363
string temp_token = 5;
6464
string endpoint = 6;
6565
bool use_path_style=16;
66+
bool skip_checksum=17;
6667
string region = 7;
6768
string auth = 8;
6869
string server_enc_mode = 9;
@@ -91,7 +92,10 @@ message ExternalStorage {
9192
// role chain. These roles will be assumed in the order they appear in the
9293
// list so that the role specified in AssumeRoleProvider can be assumed.
9394
repeated AssumeRoleProvider delegate_role_providers = 15 [(gogoproto.nullable) = false];
95+
96+
// Next ID: 18;
9497
}
98+
9599
message GCS {
96100
string bucket = 1;
97101
string prefix = 2;

0 commit comments

Comments
 (0)