Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions internal/reportutils/reportutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (

const decimalPrecision = 2

// This could include signed ints, but we have no need for now.
// The bigger requirement is that it exclude uint8.
// num16Plus is like realNum, but it excludes 8-bit int/uint.
type num16Plus interface {
constraints.Float | ~uint | ~uint16 | ~uint32 | ~uint64
constraints.Float |
~uint | ~uint16 | ~uint32 | ~uint64 |
~int | ~int16 | ~int32 | ~int64
}

type realNum interface {
Expand Down Expand Up @@ -178,3 +179,9 @@ func FindBestUnit[T num16Plus](count T) DataUnit {

return biggestUnit
}

// FmtBytes is a convenience that combines BytesToUnit with FindBestUnit.
// Use it to format a single count of bytes.
func FmtBytes[T num16Plus](count T) string {
return BytesToUnit(count, FindBestUnit(count))
}
131 changes: 97 additions & 34 deletions internal/verifier/recheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/10gen/migration-verifier/internal/reportutils"
"github.com/10gen/migration-verifier/internal/retry"
"github.com/10gen/migration-verifier/internal/types"
"github.com/pkg/errors"
Expand All @@ -18,11 +19,16 @@ const (
recheckQueue = "recheckQueue"
maxBSONObjSize = 16 * 1024 * 1024
recheckInserterThreadsSoftMax = 100
maxIdsPerRecheckTask = 12 * 1024 * 1024
)

// RecheckPrimaryKey stores the implicit type of recheck to perform
// Currently, we only handle document mismatches/change stream updates,
// so DatabaseName, CollectionName, and DocumentID must always be specified.
//
// NB: Order is important here so that, within a given generation,
// sorting by _id will guarantee that all rechecks for a given
// namespace appear consecutively.
type RecheckPrimaryKey struct {
Generation int `bson:"generation"`
DatabaseName string `bson:"db"`
Expand Down Expand Up @@ -194,76 +200,133 @@ func (verifier *Verifier) getPreviousGenerationWhileLocked() int {
func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
prevGeneration := verifier.getPreviousGenerationWhileLocked()

verifier.logger.Debug().Msgf("Creating recheck tasks from generation %d’s %s documents", prevGeneration, recheckQueue)
findFilter := bson.D{{"_id.generation", prevGeneration}}

verifier.logger.Debug().
Int("priorGeneration", prevGeneration).
Msgf("Counting prior generation’s enqueued rechecks.")

recheckColl := verifier.verificationDatabase().Collection(recheckQueue)
rechecksCount, err := recheckColl.CountDocuments(ctx, findFilter)
if err != nil {
return errors.Wrapf(err,
"failed to count generation %d’s rechecks",
prevGeneration,
)
}

verifier.logger.Debug().
Int("priorGeneration", prevGeneration).
Int("rechecksCount", int(rechecksCount)).
Msgf("Creating recheck tasks from prior generation’s enqueued rechecks.")

// We generate one recheck task per collection, unless
// 1) The size of the list of IDs would exceed 12MB (a very conservative way of avoiding
// the 16MB BSON limit)
// 2) The size of the data would exceed our desired partition size. This limits memory use
// during the recheck phase.
prevDBName, prevCollName := "", ""
// 3) The number of documents exceeds $rechecksCount/$numWorkers. We do
// this to prevent one thread from doing all of the rechecks.

var prevDBName, prevCollName string
var idAccum []interface{}
var idLenAccum int
var dataSizeAccum int64
const maxIdsSize = 12 * 1024 * 1024
cursor, err := verifier.verificationDatabase().Collection(recheckQueue).Find(
ctx, bson.D{{"_id.generation", prevGeneration}}, options.Find().SetSort(bson.D{{"_id", 1}}))

maxDocsPerTask := rechecksCount / int64(verifier.numWorkers)

if maxDocsPerTask < int64(verifier.numWorkers) {
maxDocsPerTask = int64(verifier.numWorkers)
}

// The sort here is important because the recheck _id is an embedded
// document that includes the namespace. Thus, all rechecks for a given
// namespace will be consecutive in this query’s result.
cursor, err := recheckColl.Find(
ctx,
findFilter,
options.Find().SetSort(bson.D{{"_id", 1}}),
)
if err != nil {
return err
}
defer cursor.Close(ctx)

persistBufferedRechecks := func() error {
if len(idAccum) == 0 {
return nil
}

namespace := prevDBName + "." + prevCollName

err := verifier.InsertDocumentRecheckTask(
idAccum,
types.ByteCount(dataSizeAccum),
namespace,
)
if err != nil {
return errors.Wrapf(
err,
"failed to create a %d-document recheck task for collection %#q",
len(idAccum),
namespace,
)
}

verifier.logger.Debug().
Str("namespace", namespace).
Int("numDocuments", len(idAccum)).
Str("dataSize", reportutils.FmtBytes(dataSizeAccum)).
Msg("Created document recheck task.")

return nil
}

// We group these here using a sort rather than using aggregate because aggregate is
// subject to a 16MB limit on group size.
for cursor.Next(ctx) {
err := cursor.Err()
if err != nil {
return err
}
var doc RecheckDoc
err = cursor.Decode(&doc)
if err != nil {
return err
}

idRaw := cursor.Current.Lookup("_id", "docID")
idLen := len(idRaw.Value)

verifier.logger.Debug().Msgf("Found persisted recheck doc for %s.%s", doc.PrimaryKey.DatabaseName, doc.PrimaryKey.CollectionName)

// We persist rechecks if any of these happen:
// - the namespace has changed
// - we’ve reached the per-task recheck maximum
// - the buffered document IDs’ size exceeds the per-task maximum
// - the buffered documents exceed the partition size
//
if doc.PrimaryKey.DatabaseName != prevDBName ||
doc.PrimaryKey.CollectionName != prevCollName ||
idLenAccum >= maxIdsSize ||
int64(len(idAccum)) > maxDocsPerTask ||
idLenAccum >= maxIdsPerRecheckTask ||
dataSizeAccum >= verifier.partitionSizeInBytes {
namespace := prevDBName + "." + prevCollName
if len(idAccum) > 0 {
err := verifier.InsertFailedIdsVerificationTask(idAccum, types.ByteCount(dataSizeAccum), namespace)
if err != nil {
return err
}
verifier.logger.Debug().Msgf(
"Created ID verification task for namespace %s with %d ids, "+
"%d id bytes and %d data bytes",
namespace, len(idAccum), idLenAccum, dataSizeAccum)

err := persistBufferedRechecks()
if err != nil {
return err
}

prevDBName = doc.PrimaryKey.DatabaseName
prevCollName = doc.PrimaryKey.CollectionName
idLenAccum = 0
dataSizeAccum = 0
idAccum = []interface{}{}
idAccum = idAccum[:0]
}

idLenAccum += idLen
dataSizeAccum += int64(doc.DataSize)
idAccum = append(idAccum, doc.PrimaryKey.DocumentID)
}
if len(idAccum) > 0 {
namespace := prevDBName + "." + prevCollName
err := verifier.InsertFailedIdsVerificationTask(idAccum, types.ByteCount(dataSizeAccum), namespace)
if err != nil {
return err
}
verifier.logger.Debug().Msgf(
"Created ID verification task for namespace %s with %d ids, "+
"%d id bytes and %d data bytes",
namespace, len(idAccum), idLenAccum, dataSizeAccum)

err = cursor.Err()
if err != nil {
return err
}
return nil

return persistBufferedRechecks()
}
2 changes: 1 addition & 1 deletion internal/verifier/verification_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (verifier *Verifier) InsertPartitionVerificationTask(partition *partitions.
return &verificationTask, err
}

func (verifier *Verifier) InsertFailedIdsVerificationTask(ids []interface{}, dataSize types.ByteCount, srcNamespace string) error {
func (verifier *Verifier) InsertDocumentRecheckTask(ids []interface{}, dataSize types.ByteCount, srcNamespace string) error {
dstNamespace := srcNamespace
if len(verifier.nsMap) != 0 {
var ok bool
Expand Down
Loading