From 65d42525627735ded82b503e9b8559bd73ac67c2 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 14 Nov 2024 01:57:17 -0500 Subject: [PATCH 1/3] insert rechecks in parallel --- internal/verifier/recheck.go | 111 ++++++++++++++++++++++++++--------- 1 file changed, 84 insertions(+), 27 deletions(-) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index b7143d7a..5b37fc70 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -3,15 +3,20 @@ package verifier import ( "context" + "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/types" + "github.com/pkg/errors" + "github.com/samber/lo" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "golang.org/x/sync/errgroup" ) const ( - recheckQueue = "recheckQueue" - maxBSONObjSize = 16 * 1024 * 1024 + recheckQueue = "recheckQueue" + maxBSONObjSize = 16 * 1024 * 1024 + recheckInserterThreadsSoftMax = 100 ) // RecheckPrimaryKey stores the implicit type of recheck to perform @@ -51,6 +56,18 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs( ) } +// This will split the given slice into *roughly* the given number of chunks. +// It may end up being more or fewer, but it should be pretty close. +func splitToChunks[T any, Slice ~[]T](elements Slice, numChunks int) []Slice { + elsPerChunk := len(elements) / numChunks + + if elsPerChunk == 0 { + elsPerChunk = 1 + } + + return lo.Chunk(elements, elsPerChunk) +} + func (verifier *Verifier) insertRecheckDocs( ctx context.Context, dbNames []string, @@ -63,38 +80,78 @@ func (verifier *Verifier) insertRecheckDocs( generation, _ := verifier.getGenerationWhileLocked() - models := []mongo.WriteModel{} - for i, documentID := range documentIDs { - pk := RecheckPrimaryKey{ - Generation: generation, - DatabaseName: dbNames[i], - CollectionName: collNames[i], - DocumentID: documentID, - } + docIDIndexes := lo.Range(len(documentIDs)) + indexesPerThread := splitToChunks(docIDIndexes, recheckInserterThreadsSoftMax) - // The filter must exclude DataSize; otherwise, if a failed comparison - // and a change event happen on the same document for the same - // generation, the 2nd insert will fail because a) its filter won’t - // match anything, and b) it’ll try to insert a new document with the - // same _id as the one that the 1st insert already created. - filterDoc := bson.D{{"_id", pk}} + eg, groupCtx := errgroup.WithContext(ctx) - recheckDoc := RecheckDoc{ - PrimaryKey: pk, - DataSize: dataSizes[i], - } + for _, curThreadIndexes := range indexesPerThread { + curThreadIndexes := curThreadIndexes + + eg.Go(func() error { + models := make([]mongo.WriteModel, len(curThreadIndexes)) + for m, i := range curThreadIndexes { + pk := RecheckPrimaryKey{ + Generation: generation, + DatabaseName: dbNames[i], + CollectionName: collNames[i], + DocumentID: documentIDs[i], + } + + // The filter must exclude DataSize; otherwise, if a failed comparison + // and a change event happen on the same document for the same + // generation, the 2nd insert will fail because a) its filter won’t + // match anything, and b) it’ll try to insert a new document with the + // same _id as the one that the 1st insert already created. + filterDoc := bson.D{{"_id", pk}} + + recheckDoc := RecheckDoc{ + PrimaryKey: pk, + DataSize: dataSizes[i], + } + + models[m] = mongo.NewReplaceOneModel(). + SetFilter(filterDoc). + SetReplacement(recheckDoc). + SetUpsert(true) + } + + retryer := retry.New(retry.DefaultDurationLimit) + err := retryer.RunForTransientErrorsOnly( + groupCtx, + verifier.logger, + func(_ *retry.Info) error { + _, err := verifier.verificationDatabase().Collection(recheckQueue).BulkWrite( + groupCtx, + models, + options.BulkWrite().SetOrdered(false), + ) + + return err + }, + ) - models = append(models, - mongo.NewReplaceOneModel(). - SetFilter(filterDoc).SetReplacement(recheckDoc).SetUpsert(true)) + return errors.Wrapf(err, "failed to persist %d recheck(s) for generation %d", len(models), generation) + }) } - _, err := verifier.verificationDatabase().Collection(recheckQueue).BulkWrite(ctx, models) - if err == nil { - verifier.logger.Debug().Msgf("Persisted %d recheck doc(s) for generation %d", len(models), generation) + err := eg.Wait() + + if err != nil { + return errors.Wrapf( + err, + "failed to persist %d recheck(s) for generation %d", + len(documentIDs), + generation, + ) } - return err + verifier.logger.Debug(). + Int("generation", generation). + Int("count", len(documentIDs)). + Msg("Persisted rechecks.") + + return nil } // ClearRecheckDocs deletes the previous generation’s recheck From 4de9c7c1cdde3f1e093d8a46a9af15c8160b8888 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 14 Nov 2024 10:19:12 -0500 Subject: [PATCH 2/3] Add logging for when enqueueing rechecks. --- internal/verifier/change_stream.go | 4 ++++ internal/verifier/recheck.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 8821b603..3c71cef5 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -90,6 +90,10 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch [] } } + verifier.logger.Debug(). + Int("count", len(docIDs)). + Msg("Persisting rechecks for change events.") + return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes) } diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 5b37fc70..1b9bf7c4 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -47,6 +47,10 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs( collNames[i] = collName } + verifier.logger.Debug(). + Int("count", len(documentIDs)). + Msg("Persisting rechecks for mismatched or missing documents.") + return verifier.insertRecheckDocs( context.Background(), dbNames, From 93842d06ef4c8de06f1823718269a13d98aae727 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 14 Nov 2024 20:20:02 -0500 Subject: [PATCH 3/3] panic if nonsensical numChunks --- internal/verifier/recheck.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 1b9bf7c4..5626e714 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -2,6 +2,7 @@ package verifier import ( "context" + "fmt" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/types" @@ -63,6 +64,10 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs( // This will split the given slice into *roughly* the given number of chunks. // It may end up being more or fewer, but it should be pretty close. func splitToChunks[T any, Slice ~[]T](elements Slice, numChunks int) []Slice { + if numChunks < 1 { + panic(fmt.Sprintf("numChunks (%v) should be >=1", numChunks)) + } + elsPerChunk := len(elements) / numChunks if elsPerChunk == 0 {