Skip to content

Commit ed19d5e

Browse files
committed
Implement read retries
1 parent 7858e1d commit ed19d5e

File tree

4 files changed

+111
-4
lines changed

4 files changed

+111
-4
lines changed

internal/backend_s3.go

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,19 @@ func (s *S3Backend) getRequestId(r *request.Request) string {
512512
r.HTTPResponse.Header.Get("x-amz-id-2")
513513
}
514514

515-
func (s *S3Backend) HeadBlob(param *HeadBlobInput) (*HeadBlobOutput, error) {
515+
// FIXME: Move retries to common code from S3
516+
func (s *S3Backend) HeadBlob(req *HeadBlobInput) (resp *HeadBlobOutput, err error) {
517+
s.readBackoff(func(attempt int) error {
518+
resp, err = s.tryHeadBlob(req)
519+
if err != nil && shouldRetry(err) {
520+
s3Log.Errorf("Error getting metadata of %v (attempt %v): %v\n", req.Key, attempt, err)
521+
}
522+
return err
523+
})
524+
return
525+
}
526+
527+
func (s *S3Backend) tryHeadBlob(param *HeadBlobInput) (*HeadBlobOutput, error) {
516528
head := s3.HeadObjectInput{Bucket: &s.bucket,
517529
Key: &param.Key,
518530
}
@@ -542,7 +554,20 @@ func (s *S3Backend) HeadBlob(param *HeadBlobInput) (*HeadBlobOutput, error) {
542554
}, nil
543555
}
544556

545-
func (s *S3Backend) ListBlobs(param *ListBlobsInput) (*ListBlobsOutput, error) {
557+
// FIXME: Move retries to common code from S3
558+
func (s *S3Backend) ListBlobs(req *ListBlobsInput) (resp *ListBlobsOutput, err error) {
559+
s.readBackoff(func(attempt int) error {
560+
resp, err = s.tryListBlobs(req)
561+
if err != nil && shouldRetry(err) {
562+
s3Log.Errorf("Error listing objects with prefix=%v delimiter=%v start-after=%v max-keys=%v (attempt %v): %v\n",
563+
NilStr(req.Prefix), NilStr(req.Delimiter), NilStr(req.StartAfter), NilUInt32(req.MaxKeys), attempt, err)
564+
}
565+
return err
566+
})
567+
return
568+
}
569+
570+
func (s *S3Backend) tryListBlobs(param *ListBlobsInput) (*ListBlobsOutput, error) {
546571
var maxKeys *int64
547572

548573
if param.MaxKeys != nil {
@@ -868,7 +893,49 @@ func (s *S3Backend) CopyBlob(param *CopyBlobInput) (*CopyBlobOutput, error) {
868893
return &CopyBlobOutput{s.getRequestId(req)}, nil
869894
}
870895

871-
func (s *S3Backend) GetBlob(param *GetBlobInput) (*GetBlobOutput, error) {
896+
func shouldRetry(err error) bool {
897+
err = mapAwsError(err)
898+
return err != syscall.ENOENT && err != syscall.EINVAL &&
899+
err != syscall.EACCES && err != syscall.ENOTSUP && err != syscall.ERANGE
900+
}
901+
902+
// FIXME: Add similar write backoff (now it's handled by file/dir code)
903+
func (s *S3Backend) readBackoff(try func(attempt int) error) (err error) {
904+
interval := s.flags.ReadRetryInterval
905+
attempt := 1
906+
for {
907+
err = try(attempt)
908+
if err != nil {
909+
if shouldRetry(err) && (s.flags.ReadRetryAttempts < 1 || attempt < s.flags.ReadRetryAttempts) {
910+
attempt++
911+
time.Sleep(interval)
912+
interval = time.Duration(s.flags.ReadRetryMultiplier * float64(interval))
913+
if interval > s.flags.ReadRetryMax {
914+
interval = s.flags.ReadRetryMax
915+
}
916+
} else {
917+
break
918+
}
919+
} else {
920+
break
921+
}
922+
}
923+
return
924+
}
925+
926+
// FIXME: Move retries to common code from S3
927+
func (s *S3Backend) GetBlob(req *GetBlobInput) (resp *GetBlobOutput, err error) {
928+
s.readBackoff(func(attempt int) error {
929+
resp, err = s.tryGetBlob(req)
930+
if err != nil && shouldRetry(err) {
931+
log.Errorf("Error reading %v +%v of %v (attempt %v): %v", req.Start, req.Count, req.Key, attempt, err)
932+
}
933+
return err
934+
})
935+
return
936+
}
937+
938+
func (s *S3Backend) tryGetBlob(param *GetBlobInput) (*GetBlobOutput, error) {
872939
get := s3.GetObjectInput{
873940
Bucket: &s.bucket,
874941
Key: &param.Key,

internal/cfg/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ type FlagStorage struct {
6767
MaxParallelCopy int
6868
StatCacheTTL time.Duration
6969
HTTPTimeout time.Duration
70+
ReadRetryInterval time.Duration
71+
ReadRetryMultiplier float64
72+
ReadRetryMax time.Duration
73+
ReadRetryAttempts int
7074
RetryInterval time.Duration
7175
ReadAheadKB uint64
7276
SmallReadCount uint64

internal/cfg/flags.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,31 @@ MISC OPTIONS:
500500
cli.DurationFlag{
501501
Name: "retry-interval",
502502
Value: 30 * time.Second,
503-
Usage: "Retry unsuccessful flushes after this amount of time",
503+
Usage: "Retry unsuccessful writes after this time",
504+
},
505+
506+
cli.DurationFlag{
507+
Name: "read-retry-interval",
508+
Value: 1 * time.Second,
509+
Usage: "Initial interval for retrying unsuccessful reads",
510+
},
511+
512+
cli.Float64Flag{
513+
Name: "read-retry-mul",
514+
Value: 2,
515+
Usage: "Increase read retry interval this number of times on each unsuccessful attempt",
516+
},
517+
518+
cli.DurationFlag{
519+
Name: "read-retry-max-interval",
520+
Value: 60 * time.Second,
521+
Usage: "Maximum interval for retrying unsuccessful reads",
522+
},
523+
524+
cli.DurationFlag{
525+
Name: "read-retry-attempts",
526+
Value: 0,
527+
Usage: "Maximum read retry attempts (0 means unlimited)",
504528
},
505529

506530
cli.IntFlag{
@@ -758,6 +782,10 @@ func PopulateFlags(c *cli.Context) (ret *FlagStorage) {
758782
StatCacheTTL: c.Duration("stat-cache-ttl"),
759783
HTTPTimeout: c.Duration("http-timeout"),
760784
RetryInterval: c.Duration("retry-interval"),
785+
ReadRetryInterval: c.Duration("read-retry-interval"),
786+
ReadRetryMultiplier: c.Float64("read-retry-mul"),
787+
ReadRetryMax: c.Duration("read-retry-max-interval"),
788+
ReadRetryAttempts: c.Int("read-retry-attempts"),
761789
ReadAheadKB: uint64(c.Int("read-ahead")),
762790
SmallReadCount: uint64(c.Int("small-read-count")),
763791
SmallReadCutoffKB: uint64(c.Int("small-read-cutoff")),

internal/utils.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,14 @@ func PTime(v time.Time) *time.Time {
115115
return &v
116116
}
117117

118+
func NilUInt32(v *uint32) uint32 {
119+
if v == nil {
120+
return 0
121+
} else {
122+
return *v
123+
}
124+
}
125+
118126
func NilInt64(v *int64) int64 {
119127
if v == nil {
120128
return 0

0 commit comments

Comments
 (0)