Skip to content

Commit fa1264e

Browse files
authored
REP-5329 Fix retry on document reads. (mongodb-labs#69)
Commit 31cc703 was broken because it didn’t actually recreate the callbacks as needed to redo the reads. (Unfortunately there’s little good way to test this directly, but mongosync’s CI testing caught it.) This changeset fixes that by adding a callback to the retryer that runs at the start of each iteration.
1 parent ad69b4d commit fa1264e

File tree

6 files changed

+66
-32
lines changed

6 files changed

+66
-32
lines changed

internal/partitions/partitions.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ const (
119119
func PartitionCollectionWithSize(
120120
ctx context.Context,
121121
uuidEntry *uuidutil.NamespaceAndUUID,
122-
retryer retry.Retryer,
122+
retryer *retry.Retryer,
123123
srcClient *mongo.Client,
124124
replicatorList []Replicator,
125125
subLogger *logger.Logger,
@@ -137,7 +137,7 @@ func PartitionCollectionWithSize(
137137
partitions, docCount, byteCount, err := PartitionCollectionWithParameters(
138138
ctx,
139139
uuidEntry,
140-
&retryer,
140+
retryer,
141141
srcClient,
142142
replicatorList,
143143
defaultSampleRate,
@@ -153,7 +153,7 @@ func PartitionCollectionWithSize(
153153
return PartitionCollectionWithParameters(
154154
ctx,
155155
uuidEntry,
156-
&retryer,
156+
retryer,
157157
srcClient,
158158
replicatorList,
159159
defaultSampleRate,

internal/retry/retry.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ type RetryCallback = func(context.Context, *FuncInfo) error
4444
// This returns an error if the duration limit is reached, or if f() returns a
4545
// non-transient error.
4646
func (r *Retryer) Run(
47-
ctx context.Context, logger *logger.Logger, f ...RetryCallback,
47+
ctx context.Context, logger *logger.Logger, funcs ...RetryCallback,
4848
) error {
49-
return r.runRetryLoop(ctx, logger, f)
49+
return r.runRetryLoop(ctx, logger, funcs)
5050
}
5151

5252
// runRetryLoop contains the core logic for the retry loops.
@@ -74,8 +74,18 @@ func (r *Retryer) runRetryLoop(
7474
sleepTime := minSleepTime
7575

7676
for {
77+
if beforeFunc, hasBefore := r.before.Get(); hasBefore {
78+
beforeFunc()
79+
}
80+
7781
eg, egCtx := errgroup.WithContext(ctx)
7882
for i, curFunc := range funcs {
83+
if curFunc == nil {
84+
panic("curFunc should be non-nil")
85+
}
86+
if funcinfos[i] == nil {
87+
panic(fmt.Sprintf("funcinfos[%d] should be non-nil", i))
88+
}
7989

8090
eg.Go(func() error {
8191
err := curFunc(egCtx, funcinfos[i])

internal/retry/retryer.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,27 @@ package retry
22

33
import (
44
"time"
5+
6+
"github.com/10gen/migration-verifier/option"
57
)
68

79
// Retryer handles retrying operations that fail because of network failures.
810
type Retryer struct {
911
retryLimit time.Duration
1012
retryRandomly bool
13+
before option.Option[func()]
1114
additionalErrorCodes []int
1215
}
1316

1417
// New returns a new retryer.
15-
func New(retryLimit time.Duration) Retryer {
18+
func New(retryLimit time.Duration) *Retryer {
1619
return NewWithRandomlyRetries(retryLimit, false)
1720
}
1821

1922
// NewWithRandomlyRetries returns a new retryer, but allows the option of setting the
2023
// retryRandomly field.
21-
func NewWithRandomlyRetries(retryLimit time.Duration, retryRandomly bool) Retryer {
22-
return Retryer{
24+
func NewWithRandomlyRetries(retryLimit time.Duration, retryRandomly bool) *Retryer {
25+
return &Retryer{
2326
retryLimit: retryLimit,
2427
retryRandomly: retryRandomly,
2528
}
@@ -29,9 +32,21 @@ func NewWithRandomlyRetries(retryLimit time.Duration, retryRandomly bool) Retrye
2932
// this method. This allows for a single function to customize the codes it
3033
// wants to retry on. Note that if the Retryer already has additional custom
3134
// error codes set, these are _replaced_ when this method is called.
32-
func (r Retryer) WithErrorCodes(codes ...int) Retryer {
33-
r2 := r
35+
func (r *Retryer) WithErrorCodes(codes ...int) *Retryer {
36+
r2 := *r
3437
r2.additionalErrorCodes = codes
3538

36-
return r2
39+
return &r2
40+
}
41+
42+
// WithBefore sets a callback that always runs before any retryer callback.
43+
//
44+
// This is useful if there are multiple callbacks and you need to reset some
45+
// condition before each retryer iteration. (In the single-callback case it’s
46+
// largely redundant.)
47+
func (r *Retryer) WithBefore(todo func()) *Retryer {
48+
r2 := *r
49+
r2.before = option.Some(todo)
50+
51+
return &r2
3752
}

internal/uuidutil/get_uuid.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type NamespaceAndUUID struct {
2727
CollName string
2828
}
2929

30-
func GetCollectionNamespaceAndUUID(ctx context.Context, logger *logger.Logger, retryer retry.Retryer, db *mongo.Database, collName string) (*NamespaceAndUUID, error) {
30+
func GetCollectionNamespaceAndUUID(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, db *mongo.Database, collName string) (*NamespaceAndUUID, error) {
3131
binaryUUID, uuidErr := GetCollectionUUID(ctx, logger, retryer, db, collName)
3232
if uuidErr != nil {
3333
return nil, uuidErr
@@ -39,7 +39,7 @@ func GetCollectionNamespaceAndUUID(ctx context.Context, logger *logger.Logger, r
3939
}, nil
4040
}
4141

42-
func GetCollectionUUID(ctx context.Context, logger *logger.Logger, retryer retry.Retryer, db *mongo.Database, collName string) (*primitive.Binary, error) {
42+
func GetCollectionUUID(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, db *mongo.Database, collName string) (*primitive.Binary, error) {
4343
filter := bson.D{{"name", collName}}
4444
opts := options.ListCollections().SetNameOnly(false)
4545

internal/verifier/compare.go

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,31 +25,40 @@ func (verifier *Verifier) FetchAndCompareDocuments(
2525
types.ByteCount,
2626
error,
2727
) {
28-
srcChannel, dstChannel, readSrcCallback, readDstCallback := verifier.getFetcherChannelsAndCallbacks(task)
28+
var srcChannel, dstChannel <-chan bson.Raw
29+
var readSrcCallback, readDstCallback func(context.Context, *retry.FuncInfo) error
2930

3031
results := []VerificationResult{}
3132
var docCount types.DocumentCount
3233
var byteCount types.ByteCount
3334

3435
retryer := retry.New(retry.DefaultDurationLimit)
3536

36-
err := retryer.Run(
37-
givenCtx,
38-
verifier.logger,
39-
readSrcCallback,
40-
readDstCallback,
41-
func(ctx context.Context, _ *retry.FuncInfo) error {
42-
var err error
43-
results, docCount, byteCount, err = verifier.compareDocsFromChannels(
44-
ctx,
45-
task,
46-
srcChannel,
47-
dstChannel,
48-
)
37+
err := retryer.
38+
WithBefore(func() {
39+
srcChannel, dstChannel, readSrcCallback, readDstCallback = verifier.getFetcherChannelsAndCallbacks(task)
40+
}).
41+
Run(
42+
givenCtx,
43+
verifier.logger,
44+
func(ctx context.Context, fi *retry.FuncInfo) error {
45+
return readSrcCallback(ctx, fi)
46+
},
47+
func(ctx context.Context, fi *retry.FuncInfo) error {
48+
return readDstCallback(ctx, fi)
49+
},
50+
func(ctx context.Context, _ *retry.FuncInfo) error {
51+
var err error
52+
results, docCount, byteCount, err = verifier.compareDocsFromChannels(
53+
ctx,
54+
task,
55+
srcChannel,
56+
dstChannel,
57+
)
4958

50-
return err
51-
},
52-
)
59+
return err
60+
},
61+
)
5362

5463
return results, docCount, byteCount, err
5564
}

internal/verifier/mongos_refresh.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func RefreshAllMongosInstances(
137137
func getAnyExistingShardConnectionStr(
138138
ctx context.Context,
139139
l *logger.Logger,
140-
r retry.Retryer,
140+
r *retry.Retryer,
141141
client *mongo.Client,
142142
) (string, error) {
143143
res, err := runListShards(ctx, l, r, client)
@@ -169,7 +169,7 @@ func getAnyExistingShardConnectionStr(
169169
func runListShards(
170170
ctx context.Context,
171171
l *logger.Logger,
172-
r retry.Retryer,
172+
r *retry.Retryer,
173173
client *mongo.Client,
174174
) (*mongo.SingleResult, error) {
175175
var res *mongo.SingleResult

0 commit comments

Comments
 (0)