Skip to content

Commit 545a21e

Browse files
authored
REP-5329 Improve retryer & apply it more consistently. (#62)
This changeset adds additional error codes to the retryer’s list of transient errors. It also applies the retryer in several new places: - metadata prep at the beginning of check - generation of recheck tasks - clearing of the recheck queue - fetching failed & incomplete tasks - find the next worker task - get verification status - inserting a partition verification task - inserting a recheck task - updating a verification task - creating primary task - setting the primary task to complete A few functions are renamed for clarity as well, and some function calls are rewritten to read (hopefully) a bit more clearly.
1 parent fa1264e commit 545a21e

File tree

10 files changed

+359
-184
lines changed

10 files changed

+359
-184
lines changed

internal/retry/retry.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,17 @@ import (
1515

1616
type RetryCallback = func(context.Context, *FuncInfo) error
1717

18+
// Retry is a convenience that creates a retryer and executes it.
19+
// See RunForTransientErrorsOnly for argument details.
20+
func Retry(
21+
ctx context.Context,
22+
logger *logger.Logger,
23+
callbacks ...RetryCallback,
24+
) error {
25+
retryer := New(DefaultDurationLimit)
26+
return retryer.Run(ctx, logger, callbacks...)
27+
}
28+
1829
// Run() runs each given callback in parallel. If none of them fail,
1930
// then no error is returned.
2031
//

internal/verifier/change_stream_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ func (suite *IntegrationTestSuite) TestCursorKilledResilience() {
349349

350350
failedTasks, incompleteTasks, err := FetchFailedAndIncompleteTasks(
351351
ctx,
352+
verifier.logger,
352353
verifier.verificationTaskCollection(),
353354
verificationTaskVerifyDocuments,
354355
verifier.generation,
@@ -398,6 +399,7 @@ func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {
398399
generation := verifier.generation
399400
failedTasks, incompleteTasks, err := FetchFailedAndIncompleteTasks(
400401
ctx,
402+
verifier.logger,
401403
verifier.verificationTaskCollection(),
402404
verificationTaskVerifyDocuments,
403405
generation,

internal/verifier/check.go

Lines changed: 87 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"fmt"
66
"time"
77

8+
"github.com/10gen/migration-verifier/internal/logger"
9+
"github.com/10gen/migration-verifier/internal/retry"
810
mapset "github.com/deckarep/golang-set/v2"
911
"github.com/pkg/errors"
1012
"go.mongodb.org/mongo-driver/bson"
@@ -34,7 +36,10 @@ func (verifier *Verifier) Check(ctx context.Context, filter map[string]any) {
3436
go func() {
3537
err := verifier.CheckDriver(ctx, filter)
3638
if err != nil {
37-
verifier.logger.Fatal().Err(err).Msgf("Fatal error in generation %d", verifier.generation)
39+
verifier.logger.Fatal().
40+
Int("generation", verifier.generation).
41+
Err(err).
42+
Msg("Fatal error.")
3843
}
3944
}()
4045
verifier.MaybeStartPeriodicHeapProfileCollection(ctx)
@@ -185,19 +190,31 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
185190
return err
186191
}
187192
}
188-
err = verifier.AddMetaIndexes(ctx)
189-
if err != nil {
190-
return err
191-
}
192-
193-
err = verifier.doInMetaTransaction(
193+
err = retry.Retry(
194194
ctx,
195-
func(ctx context.Context, sCtx mongo.SessionContext) error {
196-
return verifier.ResetInProgressTasks(sCtx)
195+
verifier.logger,
196+
func(ctx context.Context, _ *retry.FuncInfo) error {
197+
err = verifier.AddMetaIndexes(ctx)
198+
if err != nil {
199+
return err
200+
}
201+
202+
err = verifier.doInMetaTransaction(
203+
ctx,
204+
func(ctx context.Context, sCtx mongo.SessionContext) error {
205+
return verifier.ResetInProgressTasks(sCtx)
206+
},
207+
)
208+
if err != nil {
209+
return errors.Wrap(err, "failed to reset any in-progress tasks")
210+
}
211+
212+
return nil
197213
},
198214
)
215+
199216
if err != nil {
200-
return errors.Wrap(err, "failed to reset any in-progress tasks")
217+
return err
201218
}
202219

203220
verifier.logger.Debug().Msg("Starting Check")
@@ -304,13 +321,23 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
304321
}
305322
verifier.generation++
306323
verifier.phase = Recheck
307-
err = verifier.GenerateRecheckTasks(ctx)
324+
325+
// Generation of recheck tasks can partial-fail. The following will
326+
// cause a full redo in that case, which is inefficient but simple.
327+
// Such failures seem unlikely anyhow.
328+
err = retry.Retry(
329+
ctx,
330+
verifier.logger,
331+
func(ctx context.Context, _ *retry.FuncInfo) error {
332+
return verifier.GenerateRecheckTasksWhileLocked(ctx)
333+
},
334+
)
308335
if err != nil {
309336
verifier.mux.Unlock()
310337
return err
311338
}
312339

313-
err = verifier.ClearRecheckDocs(ctx)
340+
err = verifier.ClearRecheckDocsWhileLocked(ctx)
314341
if err != nil {
315342
verifier.logger.Warn().
316343
Err(err).
@@ -367,7 +394,7 @@ func (verifier *Verifier) CreateInitialTasks(ctx context.Context) error {
367394
)
368395
}
369396
}
370-
isPrimary, err := verifier.CheckIsPrimary(ctx)
397+
isPrimary, err := verifier.CreatePrimaryTaskIfNeeded(ctx)
371398
if err != nil {
372399
return err
373400
}
@@ -401,30 +428,44 @@ func (verifier *Verifier) CreateInitialTasks(ctx context.Context) error {
401428
return nil
402429
}
403430

404-
func FetchFailedAndIncompleteTasks(ctx context.Context, coll *mongo.Collection, taskType verificationTaskType, generation int) ([]VerificationTask, []VerificationTask, error) {
431+
func FetchFailedAndIncompleteTasks(
432+
ctx context.Context,
433+
logger *logger.Logger,
434+
coll *mongo.Collection,
435+
taskType verificationTaskType,
436+
generation int,
437+
) ([]VerificationTask, []VerificationTask, error) {
405438
var FailedTasks, allTasks, IncompleteTasks []VerificationTask
406439

407-
cur, err := coll.Find(ctx, bson.D{
408-
bson.E{Key: "type", Value: taskType},
409-
bson.E{Key: "generation", Value: generation},
410-
})
411-
if err != nil {
412-
return FailedTasks, IncompleteTasks, err
413-
}
440+
err := retry.Retry(
441+
ctx,
442+
logger,
443+
func(ctx context.Context, _ *retry.FuncInfo) error {
444+
cur, err := coll.Find(ctx, bson.D{
445+
bson.E{Key: "type", Value: taskType},
446+
bson.E{Key: "generation", Value: generation},
447+
})
448+
if err != nil {
449+
return err
450+
}
414451

415-
err = cur.All(ctx, &allTasks)
416-
if err != nil {
417-
return FailedTasks, IncompleteTasks, err
418-
}
419-
for _, t := range allTasks {
420-
if failedStatuses.Contains(t.Status) {
421-
FailedTasks = append(FailedTasks, t)
422-
} else if t.Status != verificationTaskCompleted {
423-
IncompleteTasks = append(IncompleteTasks, t)
424-
}
425-
}
452+
err = cur.All(ctx, &allTasks)
453+
if err != nil {
454+
return err
455+
}
456+
for _, t := range allTasks {
457+
if failedStatuses.Contains(t.Status) {
458+
FailedTasks = append(FailedTasks, t)
459+
} else if t.Status != verificationTaskCompleted {
460+
IncompleteTasks = append(IncompleteTasks, t)
461+
}
462+
}
426463

427-
return FailedTasks, IncompleteTasks, nil
464+
return nil
465+
},
466+
)
467+
468+
return FailedTasks, IncompleteTasks, err
428469
}
429470

430471
// work is the logic for an individual worker thread.
@@ -438,27 +479,30 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
438479
Msg("Worker finished.")
439480

440481
for {
441-
task, err := verifier.FindNextVerifyTaskAndUpdate(ctx)
442-
if errors.Is(err, mongo.ErrNoDocuments) {
482+
taskOpt, err := verifier.FindNextVerifyTaskAndUpdate(ctx)
483+
if err != nil {
484+
return errors.Wrap(
485+
err,
486+
"failed to seek next task",
487+
)
488+
}
489+
490+
task, gotTask := taskOpt.Get()
491+
if !gotTask {
443492
duration := verifier.workerSleepDelayMillis * time.Millisecond
444493

445494
if duration > 0 {
446495
time.Sleep(duration)
447496
}
448497

449498
continue
450-
} else if err != nil {
451-
return errors.Wrap(
452-
err,
453-
"failed to seek next task",
454-
)
455499
}
456500

457-
verifier.workerTracker.Set(workerNum, *task)
501+
verifier.workerTracker.Set(workerNum, task)
458502

459503
switch task.Type {
460504
case verificationTaskVerifyCollection:
461-
err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, task)
505+
err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, &task)
462506
verifier.workerTracker.Unset(workerNum)
463507

464508
if err != nil {
@@ -471,7 +515,7 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
471515
}
472516
}
473517
case verificationTaskVerifyDocuments:
474-
err := verifier.ProcessVerifyTask(ctx, workerNum, task)
518+
err := verifier.ProcessVerifyTask(ctx, workerNum, &task)
475519
verifier.workerTracker.Unset(workerNum)
476520

477521
if err != nil {

internal/verifier/migration_verifier.go

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,26 +1232,36 @@ func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*Verificat
12321232
taskCollection := verifier.verificationTaskCollection()
12331233
generation, _ := verifier.getGeneration()
12341234

1235-
aggregation := []bson.M{
1236-
{
1237-
"$match": bson.M{
1238-
"type": bson.M{"$ne": "primary"},
1239-
"generation": generation,
1240-
},
1241-
},
1242-
{
1243-
"$group": bson.M{
1244-
"_id": "$status",
1245-
"count": bson.M{"$sum": 1},
1246-
},
1247-
},
1248-
}
1249-
cursor, err := taskCollection.Aggregate(ctx, aggregation)
1250-
if err != nil {
1251-
return nil, err
1252-
}
12531235
var results []bson.Raw
1254-
err = cursor.All(ctx, &results)
1236+
1237+
err := retry.Retry(
1238+
ctx,
1239+
verifier.logger,
1240+
func(ctx context.Context, _ *retry.FuncInfo) error {
1241+
cursor, err := taskCollection.Aggregate(
1242+
ctx,
1243+
[]bson.M{
1244+
{
1245+
"$match": bson.M{
1246+
"type": bson.M{"$ne": "primary"},
1247+
"generation": generation,
1248+
},
1249+
},
1250+
{
1251+
"$group": bson.M{
1252+
"_id": "$status",
1253+
"count": bson.M{"$sum": 1},
1254+
},
1255+
},
1256+
},
1257+
)
1258+
if err != nil {
1259+
return err
1260+
}
1261+
1262+
return cursor.All(ctx, &results)
1263+
},
1264+
)
12551265
if err != nil {
12561266
return nil, err
12571267
}

internal/verifier/migration_verifier_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Recheck() {
179179
func() {
180180
verifier.mux.Lock()
181181
defer func() { verifier.mux.Unlock() }()
182-
suite.Require().NoError(verifier.GenerateRecheckTasks(ctx))
182+
suite.Require().NoError(verifier.GenerateRecheckTasksWhileLocked(ctx))
183183
}()
184184

185185
stats, err := verifier.GetNamespaceStatistics(ctx)
@@ -436,7 +436,7 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() {
436436
verifier.mux.Lock()
437437
defer verifier.mux.Unlock()
438438

439-
err = verifier.GenerateRecheckTasks(ctx)
439+
err = verifier.GenerateRecheckTasksWhileLocked(ctx)
440440
suite.Require().NoError(err)
441441
}()
442442

0 commit comments

Comments
 (0)