Skip to content

Commit 979fb74

Browse files
committed
recreate changes from commit 59e21bd
1 parent 31cc703 commit 979fb74

File tree

10 files changed

+359
-184
lines changed

10 files changed

+359
-184
lines changed

internal/retry/retry.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,17 @@ import (
1515

1616
type RetryCallback = func(context.Context, *FuncInfo) error
1717

18+
// Retry is a convenience that creates a retryer and executes it.
19+
// See RunForTransientErrorsOnly for argument details.
20+
func Retry(
21+
ctx context.Context,
22+
logger *logger.Logger,
23+
callbacks ...RetryCallback,
24+
) error {
25+
retryer := New(DefaultDurationLimit)
26+
return retryer.Run(ctx, logger, callbacks...)
27+
}
28+
1829
// Run() runs each given callback in parallel. If none of them fail,
1930
// then no error is returned.
2031
//

internal/verifier/change_stream_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ func (suite *IntegrationTestSuite) TestCursorKilledResilience() {
346346

347347
failedTasks, incompleteTasks, err := FetchFailedAndIncompleteTasks(
348348
ctx,
349+
verifier.logger,
349350
verifier.verificationTaskCollection(),
350351
verificationTaskVerifyDocuments,
351352
verifier.generation,
@@ -395,6 +396,7 @@ func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {
395396
generation := verifier.generation
396397
failedTasks, incompleteTasks, err := FetchFailedAndIncompleteTasks(
397398
ctx,
399+
verifier.logger,
398400
verifier.verificationTaskCollection(),
399401
verificationTaskVerifyDocuments,
400402
generation,

internal/verifier/check.go

Lines changed: 87 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"fmt"
66
"time"
77

8+
"github.com/10gen/migration-verifier/internal/logger"
9+
"github.com/10gen/migration-verifier/internal/retry"
810
mapset "github.com/deckarep/golang-set/v2"
911
"github.com/pkg/errors"
1012
"go.mongodb.org/mongo-driver/bson"
@@ -34,7 +36,10 @@ func (verifier *Verifier) Check(ctx context.Context, filter map[string]any) {
3436
go func() {
3537
err := verifier.CheckDriver(ctx, filter)
3638
if err != nil {
37-
verifier.logger.Fatal().Err(err).Msgf("Fatal error in generation %d", verifier.generation)
39+
verifier.logger.Fatal().
40+
Int("generation", verifier.generation).
41+
Err(err).
42+
Msg("Fatal error.")
3843
}
3944
}()
4045
verifier.MaybeStartPeriodicHeapProfileCollection(ctx)
@@ -182,19 +187,31 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
182187
return err
183188
}
184189
}
185-
err = verifier.AddMetaIndexes(ctx)
186-
if err != nil {
187-
return err
188-
}
189-
190-
err = verifier.doInMetaTransaction(
190+
err = retry.Retry(
191191
ctx,
192-
func(ctx context.Context, sCtx mongo.SessionContext) error {
193-
return verifier.ResetInProgressTasks(sCtx)
192+
verifier.logger,
193+
func(ctx context.Context, _ *retry.FuncInfo) error {
194+
err = verifier.AddMetaIndexes(ctx)
195+
if err != nil {
196+
return err
197+
}
198+
199+
err = verifier.doInMetaTransaction(
200+
ctx,
201+
func(ctx context.Context, sCtx mongo.SessionContext) error {
202+
return verifier.ResetInProgressTasks(sCtx)
203+
},
204+
)
205+
if err != nil {
206+
return errors.Wrap(err, "failed to reset any in-progress tasks")
207+
}
208+
209+
return nil
194210
},
195211
)
212+
196213
if err != nil {
197-
return errors.Wrap(err, "failed to reset any in-progress tasks")
214+
return err
198215
}
199216

200217
verifier.logger.Debug().Msg("Starting Check")
@@ -293,13 +310,23 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
293310
}
294311
verifier.generation++
295312
verifier.phase = Recheck
296-
err = verifier.GenerateRecheckTasks(ctx)
313+
314+
// Generation of recheck tasks can partial-fail. The following will
315+
// cause a full redo in that case, which is inefficient but simple.
316+
// Such failures seem unlikely anyhow.
317+
err = retry.Retry(
318+
ctx,
319+
verifier.logger,
320+
func(ctx context.Context, _ *retry.FuncInfo) error {
321+
return verifier.GenerateRecheckTasksWhileLocked(ctx)
322+
},
323+
)
297324
if err != nil {
298325
verifier.mux.Unlock()
299326
return err
300327
}
301328

302-
err = verifier.ClearRecheckDocs(ctx)
329+
err = verifier.ClearRecheckDocsWhileLocked(ctx)
303330
if err != nil {
304331
verifier.logger.Warn().
305332
Err(err).
@@ -356,7 +383,7 @@ func (verifier *Verifier) CreateInitialTasks(ctx context.Context) error {
356383
)
357384
}
358385
}
359-
isPrimary, err := verifier.CheckIsPrimary(ctx)
386+
isPrimary, err := verifier.CreatePrimaryTaskIfNeeded(ctx)
360387
if err != nil {
361388
return err
362389
}
@@ -390,30 +417,44 @@ func (verifier *Verifier) CreateInitialTasks(ctx context.Context) error {
390417
return nil
391418
}
392419

393-
func FetchFailedAndIncompleteTasks(ctx context.Context, coll *mongo.Collection, taskType verificationTaskType, generation int) ([]VerificationTask, []VerificationTask, error) {
420+
func FetchFailedAndIncompleteTasks(
421+
ctx context.Context,
422+
logger *logger.Logger,
423+
coll *mongo.Collection,
424+
taskType verificationTaskType,
425+
generation int,
426+
) ([]VerificationTask, []VerificationTask, error) {
394427
var FailedTasks, allTasks, IncompleteTasks []VerificationTask
395428

396-
cur, err := coll.Find(ctx, bson.D{
397-
bson.E{Key: "type", Value: taskType},
398-
bson.E{Key: "generation", Value: generation},
399-
})
400-
if err != nil {
401-
return FailedTasks, IncompleteTasks, err
402-
}
429+
err := retry.Retry(
430+
ctx,
431+
logger,
432+
func(ctx context.Context, _ *retry.FuncInfo) error {
433+
cur, err := coll.Find(ctx, bson.D{
434+
bson.E{Key: "type", Value: taskType},
435+
bson.E{Key: "generation", Value: generation},
436+
})
437+
if err != nil {
438+
return err
439+
}
403440

404-
err = cur.All(ctx, &allTasks)
405-
if err != nil {
406-
return FailedTasks, IncompleteTasks, err
407-
}
408-
for _, t := range allTasks {
409-
if failedStatus.Contains(t.Status) {
410-
FailedTasks = append(FailedTasks, t)
411-
} else if t.Status != verificationTaskCompleted {
412-
IncompleteTasks = append(IncompleteTasks, t)
413-
}
414-
}
441+
err = cur.All(ctx, &allTasks)
442+
if err != nil {
443+
return err
444+
}
445+
for _, t := range allTasks {
446+
if failedStatus.Contains(t.Status) {
447+
FailedTasks = append(FailedTasks, t)
448+
} else if t.Status != verificationTaskCompleted {
449+
IncompleteTasks = append(IncompleteTasks, t)
450+
}
451+
}
415452

416-
return FailedTasks, IncompleteTasks, nil
453+
return nil
454+
},
455+
)
456+
457+
return FailedTasks, IncompleteTasks, err
417458
}
418459

419460
// work is the logic for an individual worker thread.
@@ -427,27 +468,30 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
427468
Msg("Worker finished.")
428469

429470
for {
430-
task, err := verifier.FindNextVerifyTaskAndUpdate(ctx)
431-
if errors.Is(err, mongo.ErrNoDocuments) {
471+
taskOpt, err := verifier.FindNextVerifyTaskAndUpdate(ctx)
472+
if err != nil {
473+
return errors.Wrap(
474+
err,
475+
"failed to seek next task",
476+
)
477+
}
478+
479+
task, gotTask := taskOpt.Get()
480+
if !gotTask {
432481
duration := verifier.workerSleepDelayMillis * time.Millisecond
433482

434483
if duration > 0 {
435484
time.Sleep(duration)
436485
}
437486

438487
continue
439-
} else if err != nil {
440-
return errors.Wrap(
441-
err,
442-
"failed to seek next task",
443-
)
444488
}
445489

446-
verifier.workerTracker.Set(workerNum, *task)
490+
verifier.workerTracker.Set(workerNum, task)
447491

448492
switch task.Type {
449493
case verificationTaskVerifyCollection:
450-
err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, task)
494+
err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, &task)
451495
verifier.workerTracker.Unset(workerNum)
452496

453497
if err != nil {
@@ -460,7 +504,7 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
460504
}
461505
}
462506
case verificationTaskVerifyDocuments:
463-
err := verifier.ProcessVerifyTask(ctx, workerNum, task)
507+
err := verifier.ProcessVerifyTask(ctx, workerNum, &task)
464508
verifier.workerTracker.Unset(workerNum)
465509

466510
if err != nil {

internal/verifier/migration_verifier.go

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,26 +1239,36 @@ func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*Verificat
12391239
taskCollection := verifier.verificationTaskCollection()
12401240
generation, _ := verifier.getGeneration()
12411241

1242-
aggregation := []bson.M{
1243-
{
1244-
"$match": bson.M{
1245-
"type": bson.M{"$ne": "primary"},
1246-
"generation": generation,
1247-
},
1248-
},
1249-
{
1250-
"$group": bson.M{
1251-
"_id": "$status",
1252-
"count": bson.M{"$sum": 1},
1253-
},
1254-
},
1255-
}
1256-
cursor, err := taskCollection.Aggregate(ctx, aggregation)
1257-
if err != nil {
1258-
return nil, err
1259-
}
12601242
var results []bson.Raw
1261-
err = cursor.All(ctx, &results)
1243+
1244+
err := retry.Retry(
1245+
ctx,
1246+
verifier.logger,
1247+
func(ctx context.Context, _ *retry.FuncInfo) error {
1248+
cursor, err := taskCollection.Aggregate(
1249+
ctx,
1250+
[]bson.M{
1251+
{
1252+
"$match": bson.M{
1253+
"type": bson.M{"$ne": "primary"},
1254+
"generation": generation,
1255+
},
1256+
},
1257+
{
1258+
"$group": bson.M{
1259+
"_id": "$status",
1260+
"count": bson.M{"$sum": 1},
1261+
},
1262+
},
1263+
},
1264+
)
1265+
if err != nil {
1266+
return err
1267+
}
1268+
1269+
return cursor.All(ctx, &results)
1270+
},
1271+
)
12621272
if err != nil {
12631273
return nil, err
12641274
}

internal/verifier/migration_verifier_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Recheck() {
176176
func() {
177177
verifier.mux.Lock()
178178
defer func() { verifier.mux.Unlock() }()
179-
suite.Require().NoError(verifier.GenerateRecheckTasks(ctx))
179+
suite.Require().NoError(verifier.GenerateRecheckTasksWhileLocked(ctx))
180180
}()
181181

182182
stats, err := verifier.GetNamespaceStatistics(ctx)
@@ -433,7 +433,7 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() {
433433
verifier.mux.Lock()
434434
defer verifier.mux.Unlock()
435435

436-
err = verifier.GenerateRecheckTasks(ctx)
436+
err = verifier.GenerateRecheckTasksWhileLocked(ctx)
437437
suite.Require().NoError(err)
438438
}()
439439

0 commit comments

Comments
 (0)