@@ -18,6 +18,7 @@ import (
18
18
"time"
19
19
20
20
"github.com/aws/aws-sdk-go-v2/aws"
21
+ "github.com/aws/aws-sdk-go-v2/aws/ratelimit"
21
22
"github.com/aws/aws-sdk-go-v2/aws/retry"
22
23
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
23
24
"github.com/aws/aws-sdk-go-v2/config"
@@ -89,9 +90,6 @@ const (
89
90
scheme = "s3"
90
91
91
92
checksumAlgorithm = types .ChecksumAlgorithmSha256
92
-
93
- // TODO(yevgeniy): Revisit retry logic. Retrying 10 times seems arbitrary.
94
- defaultRetryMaxAttempts = 10
95
93
)
96
94
97
95
// NightlyEnvVarS3Params maps param keys that get added to an S3
@@ -125,16 +123,6 @@ type s3Storage struct {
125
123
cached * s3Client
126
124
}
127
125
128
- // retryMaxAttempts defines how many times we will retry a
129
- // S3 request on a retriable error.
130
- var retryMaxAttempts = defaultRetryMaxAttempts
131
-
132
- // InjectTestingRetryMaxAttempts is used to change the
133
- // default retries for tests that need quick fail.
134
- func InjectTestingRetryMaxAttempts (maxAttempts int ) {
135
- retryMaxAttempts = maxAttempts
136
- }
137
-
138
126
// customRetryer implements the `request.Retryer` interface and allows for
139
127
// customization of the retry behaviour of an AWS client.
140
128
type customRetryer struct {}
@@ -186,6 +174,39 @@ var usePutObject = settings.RegisterBoolSetting(
186
174
false ,
187
175
)
188
176
177
+ var maxRetries = settings .RegisterIntSetting (
178
+ settings .ApplicationLevel ,
179
+ "cloudstorage.s3.max_retries" ,
180
+ "the maximum number of retries per S3 operation" ,
181
+ 10 )
182
+
183
+ // The v2 S3 client includes a client side retry token bucket. The high level
184
+ // behavior of the token bucket is:
185
+ //
186
+ // 1. The token bucket starts full with 500 tokens.
187
+ // 2. Each request that completes on the first attempt adds 1 token to the bucket.
188
+ // 3. Each failed retry consumes 5 tokens from the bucket.
189
+ //
190
+ // When the token bucket runs out, the only way to make forward progress is to
191
+ // start a new request that succeeds on the first attempt. This is sensible for
192
+ // an RPC service that can bubble up a retryable error to the RPC client, but
193
+ // it doesn't make sense in the context of something like backup/restrore,
194
+ // where we have a fixed number of workers that are sending requests until they
195
+ // complete all of their work. Since there are no client side requests to
196
+ // refill the token bucket, a handful of errors can permanently exhaust the
197
+ // token bucket.
198
+ //
199
+ // TODO(jeffswenson): consider deleting this after we've had time to evaluate
200
+ // it in production. This setting mostly exists so we can keep it turned on by
201
+ // default in the backport.
202
+ var enableClientRetryTokenBucket = settings .RegisterBoolSetting (
203
+ settings .ApplicationLevel ,
204
+ "cloudstorage.s3.client_retry_token_bucket.enabled" ,
205
+ "enable the client side retry token bucket in the AWS S3 client" ,
206
+ // TODO(jeffswenson): change this to false in a seperate PR. This is false in
207
+ // the backports to stay true to the backport policy.
208
+ true )
209
+
189
210
// roleProvider contains fields about the role that needs to be assumed
190
211
// in order to access the external storage.
191
212
type roleProvider struct {
@@ -219,8 +240,20 @@ type s3ClientConfig struct {
219
240
220
241
skipChecksum bool
221
242
skipTLSVerify bool
222
- // log.V(2) decides session init params so include it in key.
223
- verbose bool
243
+ logMode aws.ClientLogMode
244
+ }
245
+
246
+ func getLogMode () aws.ClientLogMode {
247
+ switch {
248
+ case log .VDepth (3 , 1 ):
249
+ return awsVLevel3Logging
250
+ case log .VDepth (2 , 1 ):
251
+ return awsVLevel2Logging
252
+ case log .VDepth (1 , 1 ):
253
+ return awsVLevel1Logging
254
+ default :
255
+ return 0
256
+ }
224
257
}
225
258
226
259
func clientConfig (conf * cloudpb.ExternalStorage_S3 ) s3ClientConfig {
@@ -259,7 +292,7 @@ func clientConfig(conf *cloudpb.ExternalStorage_S3) s3ClientConfig {
259
292
secret : conf .Secret ,
260
293
tempToken : conf .TempToken ,
261
294
auth : conf .Auth ,
262
- verbose : log . V ( 2 ),
295
+ logMode : getLogMode ( ),
263
296
assumeRoleProvider : assumeRoleProvider ,
264
297
delegateRoleProviders : delegateRoleProviders ,
265
298
}
@@ -560,7 +593,9 @@ func newLogAdapter(ctx context.Context) *awsLogAdapter {
560
593
}
561
594
}
562
595
563
- var awsVerboseLogging = aws .LogRequestEventMessage | aws .LogResponseEventMessage | aws .LogRetries | aws .LogSigning
596
+ const awsVLevel1Logging = aws .LogRetries | aws .LogDeprecatedUsage
597
+ const awsVLevel2Logging = awsVLevel1Logging | aws .LogRequestEventMessage | aws .LogResponseEventMessage | aws .LogRequest | aws .LogResponse
598
+ const awsVLevel3Logging = awsVLevel2Logging | aws .LogSigning
564
599
565
600
func constructEndpointURI (endpoint string ) (string , error ) {
566
601
parsedURL , err := url .Parse (endpoint )
@@ -586,6 +621,8 @@ func constructEndpointURI(endpoint string) (string, error) {
586
621
// configures the client with it as well as returning it (so the caller can
587
622
// remember it for future calls).
588
623
func (s * s3Storage ) newClient (ctx context.Context ) (s3Client , string , error ) {
624
+ // TODO(jeffswenson): we should include the settings in the cache key so that
625
+ // changing the cluster settings invalidates the cached instance.
589
626
590
627
// Open a span if client creation will do IO/RPCs to find creds/bucket region.
591
628
if s .opts .region == "" || s .opts .auth == cloud .AuthParamImplicit {
@@ -611,16 +648,20 @@ func (s *s3Storage) newClient(ctx context.Context) (s3Client, string, error) {
611
648
}
612
649
addLoadOption (config .WithHTTPClient (client ))
613
650
651
+ retryMaxAttempts := int (maxRetries .Get (& s .settings .SV ))
614
652
addLoadOption (config .WithRetryMaxAttempts (retryMaxAttempts ))
615
653
616
654
addLoadOption (config .WithLogger (newLogAdapter (ctx )))
617
- if s .opts .verbose {
618
- addLoadOption (config .WithClientLogMode (awsVerboseLogging ))
655
+ if s .opts .logMode != 0 {
656
+ addLoadOption (config .WithClientLogMode (s . opts . logMode ))
619
657
}
620
658
config .WithRetryer (func () aws.Retryer {
621
659
return retry .NewStandard (func (opts * retry.StandardOptions ) {
622
660
opts .MaxAttempts = retryMaxAttempts
623
661
opts .Retryables = append (opts .Retryables , & customRetryer {})
662
+ if ! enableClientRetryTokenBucket .Get (& s .settings .SV ) {
663
+ opts .RateLimiter = ratelimit .None
664
+ }
624
665
})
625
666
})
626
667
0 commit comments