Skip to content

Commit 948c454

Browse files
committed
REP-5329: Implement multi-retry.
This rewrites much of the retryer so that it can run multiple callbacks in parallel. The parallel retry logic is plugged into the verifier’s document comparison logic.
1 parent 17d6f39 commit 948c454

File tree

10 files changed

+318
-125
lines changed

10 files changed

+318
-125
lines changed

internal/partitions/partitions.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer
324324
Capped bool `bson:"capped"`
325325
}{}
326326

327-
err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.Info) error {
327+
err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
328328
ri.Log(logger.Logger, "collStats", "source", srcDB.Name(), collName, "Retrieving collection size and document count.")
329329
request := bson.D{
330330
{"aggregate", collName},
@@ -395,7 +395,7 @@ func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger,
395395
}
396396
pipeline = append(pipeline, bson.D{{"$count", "numFilteredDocs"}})
397397

398-
err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.Info) error {
398+
err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
399399
ri.Log(logger.Logger, "count", "source", srcDB.Name(), collName, "Counting filtered documents.")
400400
request := bson.D{
401401
{"aggregate", collName},
@@ -488,7 +488,7 @@ func getOuterIDBound(
488488
}...)
489489

490490
// Get one document containing only the smallest or largest _id value in the collection.
491-
err := retryer.Run(ctx, subLogger, func(ctx context.Context, ri *retry.Info) error {
491+
err := retryer.Run(ctx, subLogger, func(ctx context.Context, ri *retry.FuncInfo) error {
492492
ri.Log(subLogger.Logger, "aggregate", "source", srcDB.Name(), collName, fmt.Sprintf("getting %s _id partition bound", minOrMaxBound))
493493
cursor, cmdErr :=
494494
srcDB.RunCommandCursor(ctx, bson.D{
@@ -577,7 +577,7 @@ func getMidIDBounds(
577577
// Get a cursor for the $sample and $bucketAuto aggregation.
578578
var midIDBounds []interface{}
579579
agRetryer := retryer.WithErrorCodes(util.SampleTooManyDuplicates)
580-
err := agRetryer.Run(ctx, logger, func(ctx context.Context, ri *retry.Info) error {
580+
err := agRetryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
581581
ri.Log(logger.Logger, "aggregate", "source", srcDB.Name(), collName, "Retrieving mid _id partition bounds using $sample.")
582582
cursor, cmdErr :=
583583
srcDB.RunCommandCursor(ctx, bson.D{
@@ -613,7 +613,7 @@ func getMidIDBounds(
613613

614614
// Append the copied bound to the other mid _id bounds.
615615
midIDBounds = append(midIDBounds, bound)
616-
ri.IterationSuccess()
616+
ri.NoteSuccess()
617617
}
618618

619619
return cursor.Err()

internal/retry/error.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,15 @@ func (rde RetryDurationLimitExceededErr) Error() string {
2525
func (rde RetryDurationLimitExceededErr) Unwrap() error {
2626
return rde.lastErr
2727
}
28+
29+
// errgroupErr is an internal error type that we return from errgroup
30+
// callbacks. It allows us to know (reliably) which error is the one
31+
// that triggers the errgroup's failure
32+
type errgroupErr struct {
33+
funcNum int
34+
errFromCallback error
35+
}
36+
37+
func (ege errgroupErr) Error() string {
38+
return fmt.Sprintf("func %d failed: %v", ege.funcNum, ege.errFromCallback)
39+
}

internal/retry/retry.go

Lines changed: 82 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,35 @@ package retry
22

33
import (
44
"context"
5+
"errors"
6+
"fmt"
57
"math/rand"
68
"time"
79

810
"github.com/10gen/migration-verifier/internal/logger"
911
"github.com/10gen/migration-verifier/internal/util"
12+
"github.com/samber/lo"
13+
"golang.org/x/sync/errgroup"
1014
)
1115

12-
type RetryCallback = func(context.Context, *Info) error
16+
type RetryCallback = func(context.Context, *FuncInfo) error
1317

14-
// Run retries f() whenever a transient error happens, up to the retryer's
15-
// configured duration limit.
18+
// Run() runs each given callback in parallel. If none of them fail,
19+
// then no error is returned.
20+
//
21+
// If one of them fails, the other callbacks' contexts are canceled.
22+
// If the error is non-transient, it's returned. If the error is transient,
23+
// though, then the retryer reruns each callback.
24+
//
25+
// The retryer tracks the last time each callback either a) succeeded or b)
26+
// was canceled. Whenever a callback fails, the retryer checks how long it
27+
// has gone since a success/cancellation. If that time period exceeds the
28+
// retryer's duration limit, then the retry loop ends, and a
29+
// RetryDurationLimitExceededErr is returned.
30+
//
31+
// Note that, if a given callback runs multiple potentially-retryable requests,
32+
// each successful request should be noted in the callback's FuncInfo.
33+
// See that struct's documentation for more details.
1634
//
1735
// IMPORTANT: This function should generally NOT be used within a transaction
1836
// callback. It may be used within a transaction callback if and only if:
@@ -26,7 +44,7 @@ type RetryCallback = func(context.Context, *Info) error
2644
// This returns an error if the duration limit is reached, or if f() returns a
2745
// non-transient error.
2846
func (r *Retryer) Run(
29-
ctx context.Context, logger *logger.Logger, f RetryCallback,
47+
ctx context.Context, logger *logger.Logger, f ...RetryCallback,
3048
) error {
3149
return r.runRetryLoop(ctx, logger, f)
3250
}
@@ -35,29 +53,76 @@ func (r *Retryer) Run(
3553
func (r *Retryer) runRetryLoop(
3654
ctx context.Context,
3755
logger *logger.Logger,
38-
f RetryCallback,
56+
funcs []RetryCallback,
3957
) error {
4058
var err error
4159

42-
ri := &Info{
60+
startTime := time.Now()
61+
62+
li := &LoopInfo{
4363
durationLimit: r.retryLimit,
44-
lastResetTime: time.Now(),
4564
}
65+
funcinfos := lo.RepeatBy(
66+
len(funcs),
67+
func(_ int) *FuncInfo {
68+
return &FuncInfo{
69+
lastResetTime: startTime,
70+
loopInfo: li,
71+
}
72+
},
73+
)
4674
sleepTime := minSleepTime
4775

4876
for {
49-
err = f(ctx, ri)
77+
eg, egCtx := errgroup.WithContext(ctx)
78+
for i, curFunc := range funcs {
5079

51-
// If f() returned a transient error, sleep and increase the sleep
52-
// time for the next retry, maxing out at the maxSleepTime.
80+
eg.Go(func() error {
81+
err := curFunc(egCtx, funcinfos[i])
82+
83+
if err != nil {
84+
return errgroupErr{
85+
funcNum: i,
86+
errFromCallback: err,
87+
}
88+
}
89+
90+
return nil
91+
})
92+
}
93+
err = eg.Wait()
94+
95+
// No error? Success!
5396
if err == nil {
5497
return nil
5598
}
5699

57-
if !r.shouldRetryWithSleep(logger, sleepTime, err) {
58-
return err
100+
// Let's get the actual error from the function.
101+
groupErr := errgroupErr{}
102+
if !errors.As(err, &groupErr) {
103+
panic(fmt.Sprintf("Error should be a %T, not %T: %v", groupErr, err, err))
59104
}
60105

106+
// Not a transient error? Fail immediately.
107+
if !r.shouldRetryWithSleep(logger, sleepTime, groupErr.errFromCallback) {
108+
return groupErr.errFromCallback
109+
}
110+
111+
li.attemptNumber++
112+
113+
// Our error is transient. If we've exhausted the allowed time
114+
// then fail.
115+
failedFuncInfo := funcinfos[groupErr.funcNum]
116+
if failedFuncInfo.GetDurationSoFar() > li.durationLimit {
117+
return RetryDurationLimitExceededErr{
118+
attempts: li.attemptNumber,
119+
duration: failedFuncInfo.GetDurationSoFar(),
120+
lastErr: groupErr.errFromCallback,
121+
}
122+
}
123+
124+
// Sleep and increase the sleep time for the next retry,
125+
// up to maxSleepTime.
61126
select {
62127
case <-ctx.Done():
63128
logger.Error().Err(ctx.Err()).Msg("Context was canceled. Aborting retry loop.")
@@ -69,18 +134,12 @@ func (r *Retryer) runRetryLoop(
69134
}
70135
}
71136

72-
ri.attemptNumber++
73-
74-
if ri.shouldResetDuration {
75-
ri.lastResetTime = time.Now()
76-
ri.shouldResetDuration = false
77-
}
137+
now := time.Now()
78138

79-
if ri.GetDurationSoFar() > ri.durationLimit {
80-
return RetryDurationLimitExceededErr{
81-
attempts: ri.attemptNumber,
82-
duration: ri.GetDurationSoFar(),
83-
lastErr: err,
139+
// Set all of the funcs that did *not* fail as having just succeeded.
140+
for i, curInfo := range funcinfos {
141+
if i != groupErr.funcNum {
142+
curInfo.lastResetTime = now
84143
}
85144
}
86145
}

internal/retry/retry_info.go

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,20 @@ import (
77
"github.com/rs/zerolog"
88
)
99

10-
// Info stores information relevant to the retrying done. It should
10+
// LoopInfo stores information relevant to the retrying done. It should
1111
// primarily be used within the closure passed to the retry helpers.
1212
//
1313
// The attempt number is 0-indexed (0 means this is the first attempt).
1414
// The duration tracks the duration of retrying for transient errors only.
15-
type Info struct {
15+
type LoopInfo struct {
1616
attemptNumber int
17-
18-
lastResetTime time.Time
1917
durationLimit time.Duration
18+
}
2019

21-
// Used to reset the time elapsed for long running operations.
22-
shouldResetDuration bool
20+
type FuncInfo struct {
21+
loopInfo *LoopInfo
22+
23+
lastResetTime time.Time
2324
}
2425

2526
// Log will log a debug-level message for the current Info values and the provided strings.
@@ -30,7 +31,7 @@ type Info struct {
3031
//
3132
// Useful for keeping track of DDL commands that access/change the cluster in some way.
3233
// Generally not recommended for CRUD commands, which may result in too many log lines.
33-
func (ri *Info) Log(logger *zerolog.Logger, cmdName string, clientType string, database string, collection string, msg string) {
34+
func (fi *FuncInfo) Log(logger *zerolog.Logger, cmdName string, clientType string, database string, collection string, msg string) {
3435
// Don't log if no logger is provided. Mostly useful for
3536
// integration tests where we don't want additional logs.
3637
if logger == nil {
@@ -51,31 +52,29 @@ func (ri *Info) Log(logger *zerolog.Logger, cmdName string, clientType string, d
5152
event.Str("collection", collection)
5253
}
5354
event.Str("context", msg).
54-
Int("attemptNumber", ri.attemptNumber).
55-
Str("durationSoFar", reportutils.DurationToHMS(ri.GetDurationSoFar())).
56-
Str("durationLimit", reportutils.DurationToHMS(ri.durationLimit)).
55+
Int("attemptNumber", fi.GetAttemptNumber()).
56+
Str("durationSoFar", reportutils.DurationToHMS(fi.GetDurationSoFar())).
57+
Str("durationLimit", reportutils.DurationToHMS(fi.loopInfo.durationLimit)).
5758
Msg("Running retryable function")
5859
}
5960

6061
// GetAttemptNumber returns the Info's current attempt number (0-indexed).
61-
func (ri *Info) GetAttemptNumber() int {
62-
return ri.attemptNumber
62+
func (fi *FuncInfo) GetAttemptNumber() int {
63+
return fi.loopInfo.attemptNumber
6364
}
6465

6566
// GetDurationSoFar returns the Info's current duration so far. This duration
6667
// applies to the duration of retrying for transient errors only.
67-
func (ri *Info) GetDurationSoFar() time.Duration {
68-
return time.Since(ri.lastResetTime)
68+
func (fi *FuncInfo) GetDurationSoFar() time.Duration {
69+
return time.Since(fi.lastResetTime)
6970
}
7071

71-
// IterationSuccess is used to tell the retry util to reset its measurement
72+
// NoteSuccess is used to tell the retry util to reset its measurement
7273
// of how long the closure has been running for. This is useful for long
7374
// running operations that might run successfully for a few days and then fail.
74-
// Essentially, calling this function tells the retry util not to include the
75-
// closure's run time as a part of the overall measurement of how long the
76-
// closure took including retries, since that measurement is used to determine
77-
// whether we want to retry the operation or not. (If the measurement is greater
78-
// than the retry time, we will not retry.)
79-
func (ri *Info) IterationSuccess() {
80-
ri.shouldResetDuration = true
75+
//
76+
// Call this after every successful command in a multi-command callback.
77+
// (It’s useless--but harmless--in a single-command callback.)
78+
func (i *FuncInfo) NoteSuccess() {
79+
i.lastResetTime = time.Now()
8180
}

0 commit comments

Comments
 (0)