Skip to content
Merged
104 changes: 71 additions & 33 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,30 +47,50 @@ func (uee UnknownEventError) Error() string {
return fmt.Sprintf("Received event with unknown optype: %+v", uee.Event)
}

// HandleChangeStreamEvent performs the necessary work for change stream events that occur during
// operation.
func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEvent *ParsedEvent) error {
if changeEvent.ClusterTime != nil &&
(verifier.lastChangeEventTime == nil ||
verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) {
verifier.lastChangeEventTime = changeEvent.ClusterTime
// HandleChangeStreamEvents performs the necessary work for change stream events after receiving a batch.
func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []ParsedEvent) error {
if len(batch) == 0 {
return nil
}
switch changeEvent.OpType {
case "delete":
fallthrough
case "insert":
fallthrough
case "replace":
fallthrough
case "update":
if err := verifier.eventRecorder.AddEvent(changeEvent); err != nil {
return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent)
}

return verifier.InsertChangeEventRecheckDoc(ctx, changeEvent)
default:
return UnknownEventError{Event: changeEvent}
dbNames := make([]string, len(batch))
collNames := make([]string, len(batch))
docIDs := make([]interface{}, len(batch))
dataSizes := make([]int, len(batch))

for i, changeEvent := range batch {
if changeEvent.ClusterTime != nil &&
(verifier.lastChangeEventTime == nil ||
verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) {
verifier.lastChangeEventTime = changeEvent.ClusterTime
}
switch changeEvent.OpType {
case "delete":
fallthrough
case "insert":
fallthrough
case "replace":
fallthrough
case "update":
if err := verifier.eventRecorder.AddEvent(&changeEvent); err != nil {
return errors.Wrapf(err, "failed to augment stats with change event: %+v", changeEvent)
}
dbNames[i] = changeEvent.Ns.DB
collNames[i] = changeEvent.Ns.Coll
docIDs[i] = changeEvent.DocKey.ID

// We don't know the document sizes for documents for all change events,
// so just be conservative and assume they are maximum size.
//
// Note that this prevents us from being able to report a meaningful
// total data size for noninitial generations in the log.
dataSizes[i] = maxBSONObjSize
default:
return UnknownEventError{Event: &changeEvent}
}
}

return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes)
}

func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
Expand Down Expand Up @@ -102,20 +122,38 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
return err
}

readOneChangeEvent := func() (bool, error) {
gotEvent := cs.TryNext(ctx)
if gotEvent {
var changeEvent ParsedEvent
if err := cs.Decode(&changeEvent); err != nil {
return false, errors.Wrap(err, "failed to decode change event")
readAndHandleOneChangeEventBatch := func() (bool, error) {
eventsRead := 0
var changeEventBatch []ParsedEvent

for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 {
gotEvent := cs.TryNext(ctx)

if !gotEvent || cs.Err() != nil {
break
}

if changeEventBatch == nil {
changeEventBatch = make([]ParsedEvent, cs.RemainingBatchLength()+1)
}
err := verifier.HandleChangeStreamEvent(ctx, &changeEvent)
if err != nil {
return false, errors.Wrap(err, "failed to handle change event")

if err := cs.Decode(&changeEventBatch[eventsRead]); err != nil {
return false, errors.Wrap(err, "failed to decode change event")
}

eventsRead++
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be error handling for cs.Err() here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to the if statement above.


if eventsRead > 0 {
verifier.logger.Debug().Msgf("Received a batch of %d events", eventsRead)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you also want to log the 0 events case? I don't have enough context to know if that would be useful.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to log if there's no event. We've recommended users to run the m-v with debug log level, so I don't want too many log lines even at debug level.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
verifier.logger.Debug().Msgf("Received a batch of %d events", eventsRead)
verifier.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.")

}

err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this handling logic also be under the >0 condition?

if err != nil {
return false, errors.Wrap(err, "failed to handle change events")
}

return gotEvent, errors.Wrap(cs.Err(), "change stream iteration failed")
return eventsRead > 0, errors.Wrap(cs.Err(), "change stream iteration failed")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems strange to defer handling cs.Err() until after HandleChangeStreamEvents() is called. It’d look a bit more idiomatic if the check were closer to the TryNext.

}

for {
Expand All @@ -141,15 +179,15 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
// (i.e., the `getMore` call returns empty)
for {
var gotEvent bool
gotEvent, err = readOneChangeEvent()
gotEvent, err = readAndHandleOneChangeEventBatch()

if !gotEvent || err != nil {
break
}
}

default:
_, err = readOneChangeEvent()
_, err = readAndHandleOneChangeEventBatch()
}

if err == nil {
Expand Down
28 changes: 28 additions & 0 deletions internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,31 @@ func (suite *MultiSourceVersionTestSuite) TestNoStartAtTime() {
suite.Require().NotNil(verifier.srcStartAtTs)
suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcStartAtTs), 0)
}

func (suite *MultiSourceVersionTestSuite) TestWithChangeEventsBatching() {
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

suite.Require().NoError(verifier.StartChangeStream(ctx))

_, err := suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 1}})
suite.Require().NoError(err)
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 2}})
suite.Require().NoError(err)

_, err = suite.srcMongoClient.Database("testDb").Collection("testColl2").InsertOne(ctx, bson.D{{"_id", 1}})
suite.Require().NoError(err)

var rechecks []bson.M
require.Eventually(
suite.T(),
func() bool {
rechecks = suite.fetchVerifierRechecks(ctx, verifier)
return len(rechecks) == 3
},
time.Minute,
500*time.Millisecond,
"the verifier should flush a recheck doc after a batch",
)
}
76 changes: 43 additions & 33 deletions internal/verifier/migration_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/10gen/migration-verifier/internal/testutil"
"github.com/cespare/permute/v2"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -26,6 +27,7 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/sync/errgroup"
)

var macArmMongoVersions []string = []string{
Expand Down Expand Up @@ -228,34 +230,32 @@ func (suite *MultiMetaVersionTestSuite) TestGetNamespaceStatistics_Recheck() {
ctx := context.Background()
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)

suite.Require().NoError(
verifier.InsertChangeEventRecheckDoc(
ctx,
&ParsedEvent{
OpType: "insert",
Ns: &Namespace{DB: "mydb", Coll: "coll2"},
DocKey: DocKey{
ID: "heyhey",
},
err := verifier.HandleChangeStreamEvents(
ctx,
[]ParsedEvent{{
OpType: "insert",
Ns: &Namespace{DB: "mydb", Coll: "coll2"},
DocKey: DocKey{
ID: "heyhey",
},
),
}},
)
suite.Require().NoError(err)

suite.Require().NoError(
verifier.InsertChangeEventRecheckDoc(
ctx,
&ParsedEvent{
ID: bson.M{
"docID": "ID/docID",
},
OpType: "insert",
Ns: &Namespace{DB: "mydb", Coll: "coll1"},
DocKey: DocKey{
ID: "hoohoo",
},
err = verifier.HandleChangeStreamEvents(
ctx,
[]ParsedEvent{{
ID: bson.M{
"docID": "ID/docID",
},
),
OpType: "insert",
Ns: &Namespace{DB: "mydb", Coll: "coll1"},
DocKey: DocKey{
ID: "hoohoo",
},
}},
)
suite.Require().NoError(err)

verifier.generation++

Expand Down Expand Up @@ -494,19 +494,20 @@ func (suite *MultiMetaVersionTestSuite) TestFailedVerificationTaskInsertions() {
Coll: "bar2",
},
}
err = verifier.HandleChangeStreamEvent(ctx, &event)

err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
suite.Require().NoError(err)
event.OpType = "insert"
err = verifier.HandleChangeStreamEvent(ctx, &event)
err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
suite.Require().NoError(err)
event.OpType = "replace"
err = verifier.HandleChangeStreamEvent(ctx, &event)
err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
suite.Require().NoError(err)
event.OpType = "update"
err = verifier.HandleChangeStreamEvent(ctx, &event)
err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
suite.Require().NoError(err)
event.OpType = "flibbity"
err = verifier.HandleChangeStreamEvent(ctx, &event)
err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
badEventErr := UnknownEventError{}
suite.Require().ErrorAs(err, &badEventErr)
suite.Assert().Equal("flibbity", badEventErr.Event.OpType)
Expand Down Expand Up @@ -1363,7 +1364,7 @@ func (suite *MultiDataVersionTestSuite) TestVerificationStatus() {
}

func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
zerolog.SetGlobalLevel(zerolog.DebugLevel)
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
verifier.SetSrcNamespaces([]string{"testDb1.testColl1"})
verifier.SetDstNamespaces([]string{"testDb2.testColl3"})
Expand All @@ -1382,10 +1383,16 @@ func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() {

checkDoneChan := make(chan struct{})
checkContinueChan := make(chan struct{})
go func() {
err := verifier.CheckDriver(ctx, nil, checkDoneChan, checkContinueChan)
suite.Require().NoError(err)
}()

errGroup, errGrpCtx := errgroup.WithContext(context.Background())
errGroup.Go(func() error {
checkDriverErr := verifier.CheckDriver(errGrpCtx, nil, checkDoneChan, checkContinueChan)
// Log this as fatal error so that the test doesn't hang.
if checkDriverErr != nil {
log.Fatal().Err(checkDriverErr).Msg("check driver error")
}
return checkDriverErr
})

waitForTasks := func() *VerificationStatus {
status, err := verifier.GetVerificationStatus()
Expand Down Expand Up @@ -1459,6 +1466,9 @@ func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() {
suite.Require().NoError(err)
// there should be a failure from the src insert
suite.Require().Equal(VerificationStatus{TotalTasks: 1, FailedTasks: 1}, *status)

checkContinueChan <- struct{}{}
require.NoError(suite.T(), errGroup.Wait())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find!

}

func (suite *MultiDataVersionTestSuite) TestVerifierWithFilter() {
Expand Down
51 changes: 22 additions & 29 deletions internal/verifier/recheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/10gen/migration-verifier/internal/types"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
Expand Down Expand Up @@ -36,46 +35,40 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs(
namespace string, documentIDs []interface{}, dataSizes []int) error {
dbName, collName := SplitNamespace(namespace)

verifier.mux.Lock()
defer verifier.mux.Unlock()

return verifier.insertRecheckDocsUnderLock(context.Background(),
dbName, collName, documentIDs, dataSizes)
}

func (verifier *Verifier) InsertChangeEventRecheckDoc(ctx context.Context, changeEvent *ParsedEvent) error {
documentIDs := []interface{}{changeEvent.DocKey.ID}

// We don't know the document sizes for documents for all change events,
// so just be conservative and assume they are maximum size.
//
// Note that this prevents us from being able to report a meaningful
// total data size for noninitial generations in the log.
dataSizes := []int{maxBSONObjSize}

verifier.mux.Lock()
defer verifier.mux.Unlock()

if err := verifier.eventRecorder.AddEvent(changeEvent); err != nil {
return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent)
dbNames := make([]string, len(documentIDs))
collNames := make([]string, len(documentIDs))
for i := range documentIDs {
dbNames[i] = dbName
collNames[i] = collName
}

return verifier.insertRecheckDocsUnderLock(
ctx, changeEvent.Ns.DB, changeEvent.Ns.Coll, documentIDs, dataSizes)
return verifier.insertRecheckDocs(
context.Background(),
dbNames,
collNames,
documentIDs,
dataSizes,
)
}

func (verifier *Verifier) insertRecheckDocsUnderLock(
func (verifier *Verifier) insertRecheckDocs(
ctx context.Context,
dbName, collName string, documentIDs []interface{}, dataSizes []int) error {
dbNames []string,
collNames []string,
documentIDs []interface{},
dataSizes []int,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If we’re doing a separate slice for each category it’d seem a bit cleaner if there were 1 slice with a struct that contains these data points.

) error {
verifier.mux.Lock()
defer verifier.mux.Unlock()

generation, _ := verifier.getGenerationWhileLocked()

models := []mongo.WriteModel{}
for i, documentID := range documentIDs {
pk := RecheckPrimaryKey{
Generation: generation,
DatabaseName: dbName,
CollectionName: collName,
DatabaseName: dbNames[i],
CollectionName: collNames[i],
DocumentID: documentID,
}

Expand Down
Loading
Loading