Skip to content

Commit 31cc703

Browse files
committed
REP-5329: Implement multi-retry.
This rewrites much of the retryer so that it can run multiple callbacks in parallel. The parallel retry logic is plugged into the verifier’s document comparison logic. This also synchronizes migration-verifier’s internal list of retryable errors.
1 parent 9230ddf commit 31cc703

File tree

12 files changed

+394
-186
lines changed

12 files changed

+394
-186
lines changed

internal/partitions/partitions.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer
324324
Capped bool `bson:"capped"`
325325
}{}
326326

327-
err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.Info) error {
327+
err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
328328
ri.Log(logger.Logger, "collStats", "source", srcDB.Name(), collName, "Retrieving collection size and document count.")
329329
request := bson.D{
330330
{"aggregate", collName},
@@ -395,7 +395,7 @@ func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger,
395395
}
396396
pipeline = append(pipeline, bson.D{{"$count", "numFilteredDocs"}})
397397

398-
err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.Info) error {
398+
err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
399399
ri.Log(logger.Logger, "count", "source", srcDB.Name(), collName, "Counting filtered documents.")
400400
request := bson.D{
401401
{"aggregate", collName},
@@ -488,7 +488,7 @@ func getOuterIDBound(
488488
}...)
489489

490490
// Get one document containing only the smallest or largest _id value in the collection.
491-
err := retryer.Run(ctx, subLogger, func(ctx context.Context, ri *retry.Info) error {
491+
err := retryer.Run(ctx, subLogger, func(ctx context.Context, ri *retry.FuncInfo) error {
492492
ri.Log(subLogger.Logger, "aggregate", "source", srcDB.Name(), collName, fmt.Sprintf("getting %s _id partition bound", minOrMaxBound))
493493
cursor, cmdErr :=
494494
srcDB.RunCommandCursor(ctx, bson.D{
@@ -577,7 +577,7 @@ func getMidIDBounds(
577577
// Get a cursor for the $sample and $bucketAuto aggregation.
578578
var midIDBounds []interface{}
579579
agRetryer := retryer.WithErrorCodes(util.SampleTooManyDuplicates)
580-
err := agRetryer.Run(ctx, logger, func(ctx context.Context, ri *retry.Info) error {
580+
err := agRetryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
581581
ri.Log(logger.Logger, "aggregate", "source", srcDB.Name(), collName, "Retrieving mid _id partition bounds using $sample.")
582582
cursor, cmdErr :=
583583
srcDB.RunCommandCursor(ctx, bson.D{
@@ -613,7 +613,7 @@ func getMidIDBounds(
613613

614614
// Append the copied bound to the other mid _id bounds.
615615
midIDBounds = append(midIDBounds, bound)
616-
ri.IterationSuccess()
616+
ri.NoteSuccess()
617617
}
618618

619619
return cursor.Err()

internal/retry/error.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,15 @@ func (rde RetryDurationLimitExceededErr) Error() string {
2525
func (rde RetryDurationLimitExceededErr) Unwrap() error {
2626
return rde.lastErr
2727
}
28+
29+
// errgroupErr is an internal error type that we return from errgroup
30+
// callbacks. It allows us to know (reliably) which error is the one
31+
// that triggers the errgroup's failure
32+
type errgroupErr struct {
33+
funcNum int
34+
errFromCallback error
35+
}
36+
37+
func (ege errgroupErr) Error() string {
38+
return fmt.Sprintf("func %d failed: %v", ege.funcNum, ege.errFromCallback)
39+
}

internal/retry/retry.go

Lines changed: 82 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,35 @@ package retry
22

33
import (
44
"context"
5+
"errors"
6+
"fmt"
57
"math/rand"
68
"time"
79

810
"github.com/10gen/migration-verifier/internal/logger"
911
"github.com/10gen/migration-verifier/internal/util"
12+
"github.com/samber/lo"
13+
"golang.org/x/sync/errgroup"
1014
)
1115

12-
type RetryCallback = func(context.Context, *Info) error
16+
type RetryCallback = func(context.Context, *FuncInfo) error
1317

14-
// Run retries f() whenever a transient error happens, up to the retryer's
15-
// configured duration limit.
18+
// Run() runs each given callback in parallel. If none of them fail,
19+
// then no error is returned.
20+
//
21+
// If one of them fails, the other callbacks' contexts are canceled.
22+
// If the error is non-transient, it's returned. If the error is transient,
23+
// though, then the retryer reruns each callback.
24+
//
25+
// The retryer tracks the last time each callback either a) succeeded or b)
26+
// was canceled. Whenever a callback fails, the retryer checks how long it
27+
// has gone since a success/cancellation. If that time period exceeds the
28+
// retryer's duration limit, then the retry loop ends, and a
29+
// RetryDurationLimitExceededErr is returned.
30+
//
31+
// Note that, if a given callback runs multiple potentially-retryable requests,
32+
// each successful request should be noted in the callback's FuncInfo.
33+
// See that struct's documentation for more details.
1634
//
1735
// IMPORTANT: This function should generally NOT be used within a transaction
1836
// callback. It may be used within a transaction callback if and only if:
@@ -26,7 +44,7 @@ type RetryCallback = func(context.Context, *Info) error
2644
// This returns an error if the duration limit is reached, or if f() returns a
2745
// non-transient error.
2846
func (r *Retryer) Run(
29-
ctx context.Context, logger *logger.Logger, f RetryCallback,
47+
ctx context.Context, logger *logger.Logger, f ...RetryCallback,
3048
) error {
3149
return r.runRetryLoop(ctx, logger, f)
3250
}
@@ -35,29 +53,76 @@ func (r *Retryer) Run(
3553
func (r *Retryer) runRetryLoop(
3654
ctx context.Context,
3755
logger *logger.Logger,
38-
f RetryCallback,
56+
funcs []RetryCallback,
3957
) error {
4058
var err error
4159

42-
ri := &Info{
60+
startTime := time.Now()
61+
62+
li := &LoopInfo{
4363
durationLimit: r.retryLimit,
44-
lastResetTime: time.Now(),
4564
}
65+
funcinfos := lo.RepeatBy(
66+
len(funcs),
67+
func(_ int) *FuncInfo {
68+
return &FuncInfo{
69+
lastResetTime: startTime,
70+
loopInfo: li,
71+
}
72+
},
73+
)
4674
sleepTime := minSleepTime
4775

4876
for {
49-
err = f(ctx, ri)
77+
eg, egCtx := errgroup.WithContext(ctx)
78+
for i, curFunc := range funcs {
5079

51-
// If f() returned a transient error, sleep and increase the sleep
52-
// time for the next retry, maxing out at the maxSleepTime.
80+
eg.Go(func() error {
81+
err := curFunc(egCtx, funcinfos[i])
82+
83+
if err != nil {
84+
return errgroupErr{
85+
funcNum: i,
86+
errFromCallback: err,
87+
}
88+
}
89+
90+
return nil
91+
})
92+
}
93+
err = eg.Wait()
94+
95+
// No error? Success!
5396
if err == nil {
5497
return nil
5598
}
5699

57-
if !r.shouldRetryWithSleep(logger, sleepTime, err) {
58-
return err
100+
// Let's get the actual error from the function.
101+
groupErr := errgroupErr{}
102+
if !errors.As(err, &groupErr) {
103+
panic(fmt.Sprintf("Error should be a %T, not %T: %v", groupErr, err, err))
59104
}
60105

106+
// Not a transient error? Fail immediately.
107+
if !r.shouldRetryWithSleep(logger, sleepTime, groupErr.errFromCallback) {
108+
return groupErr.errFromCallback
109+
}
110+
111+
li.attemptNumber++
112+
113+
// Our error is transient. If we've exhausted the allowed time
114+
// then fail.
115+
failedFuncInfo := funcinfos[groupErr.funcNum]
116+
if failedFuncInfo.GetDurationSoFar() > li.durationLimit {
117+
return RetryDurationLimitExceededErr{
118+
attempts: li.attemptNumber,
119+
duration: failedFuncInfo.GetDurationSoFar(),
120+
lastErr: groupErr.errFromCallback,
121+
}
122+
}
123+
124+
// Sleep and increase the sleep time for the next retry,
125+
// up to maxSleepTime.
61126
select {
62127
case <-ctx.Done():
63128
logger.Error().Err(ctx.Err()).Msg("Context was canceled. Aborting retry loop.")
@@ -69,18 +134,12 @@ func (r *Retryer) runRetryLoop(
69134
}
70135
}
71136

72-
ri.attemptNumber++
73-
74-
if ri.shouldResetDuration {
75-
ri.lastResetTime = time.Now()
76-
ri.shouldResetDuration = false
77-
}
137+
now := time.Now()
78138

79-
if ri.GetDurationSoFar() > ri.durationLimit {
80-
return RetryDurationLimitExceededErr{
81-
attempts: ri.attemptNumber,
82-
duration: ri.GetDurationSoFar(),
83-
lastErr: err,
139+
// Set all of the funcs that did *not* fail as having just succeeded.
140+
for i, curInfo := range funcinfos {
141+
if i != groupErr.funcNum {
142+
curInfo.lastResetTime = now
84143
}
85144
}
86145
}

internal/retry/retry_info.go

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,20 @@ import (
77
"github.com/rs/zerolog"
88
)
99

10-
// Info stores information relevant to the retrying done. It should
10+
// LoopInfo stores information relevant to the retrying done. It should
1111
// primarily be used within the closure passed to the retry helpers.
1212
//
1313
// The attempt number is 0-indexed (0 means this is the first attempt).
1414
// The duration tracks the duration of retrying for transient errors only.
15-
type Info struct {
15+
type LoopInfo struct {
1616
attemptNumber int
17-
18-
lastResetTime time.Time
1917
durationLimit time.Duration
18+
}
2019

21-
// Used to reset the time elapsed for long running operations.
22-
shouldResetDuration bool
20+
type FuncInfo struct {
21+
loopInfo *LoopInfo
22+
23+
lastResetTime time.Time
2324
}
2425

2526
// Log will log a debug-level message for the current Info values and the provided strings.
@@ -30,7 +31,7 @@ type Info struct {
3031
//
3132
// Useful for keeping track of DDL commands that access/change the cluster in some way.
3233
// Generally not recommended for CRUD commands, which may result in too many log lines.
33-
func (ri *Info) Log(logger *zerolog.Logger, cmdName string, clientType string, database string, collection string, msg string) {
34+
func (fi *FuncInfo) Log(logger *zerolog.Logger, cmdName string, clientType string, database string, collection string, msg string) {
3435
// Don't log if no logger is provided. Mostly useful for
3536
// integration tests where we don't want additional logs.
3637
if logger == nil {
@@ -51,31 +52,29 @@ func (ri *Info) Log(logger *zerolog.Logger, cmdName string, clientType string, d
5152
event.Str("collection", collection)
5253
}
5354
event.Str("context", msg).
54-
Int("attemptNumber", ri.attemptNumber).
55-
Str("durationSoFar", reportutils.DurationToHMS(ri.GetDurationSoFar())).
56-
Str("durationLimit", reportutils.DurationToHMS(ri.durationLimit)).
55+
Int("attemptNumber", fi.GetAttemptNumber()).
56+
Str("durationSoFar", reportutils.DurationToHMS(fi.GetDurationSoFar())).
57+
Str("durationLimit", reportutils.DurationToHMS(fi.loopInfo.durationLimit)).
5758
Msg("Running retryable function")
5859
}
5960

6061
// GetAttemptNumber returns the Info's current attempt number (0-indexed).
61-
func (ri *Info) GetAttemptNumber() int {
62-
return ri.attemptNumber
62+
func (fi *FuncInfo) GetAttemptNumber() int {
63+
return fi.loopInfo.attemptNumber
6364
}
6465

6566
// GetDurationSoFar returns the Info's current duration so far. This duration
6667
// applies to the duration of retrying for transient errors only.
67-
func (ri *Info) GetDurationSoFar() time.Duration {
68-
return time.Since(ri.lastResetTime)
68+
func (fi *FuncInfo) GetDurationSoFar() time.Duration {
69+
return time.Since(fi.lastResetTime)
6970
}
7071

71-
// IterationSuccess is used to tell the retry util to reset its measurement
72+
// NoteSuccess is used to tell the retry util to reset its measurement
7273
// of how long the closure has been running for. This is useful for long
7374
// running operations that might run successfully for a few days and then fail.
74-
// Essentially, calling this function tells the retry util not to include the
75-
// closure's run time as a part of the overall measurement of how long the
76-
// closure took including retries, since that measurement is used to determine
77-
// whether we want to retry the operation or not. (If the measurement is greater
78-
// than the retry time, we will not retry.)
79-
func (ri *Info) IterationSuccess() {
80-
ri.shouldResetDuration = true
75+
//
76+
// Call this after every successful command in a multi-command callback.
77+
// (It’s useless--but harmless--in a single-command callback.)
78+
func (i *FuncInfo) NoteSuccess() {
79+
i.lastResetTime = time.Now()
8180
}

0 commit comments

Comments
 (0)