Skip to content
Merged
71 changes: 61 additions & 10 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ const (
metadataChangeStreamCollectionName = "changeStream"
)

// ChangeEventRecheckBuffer buffers change events recheck docs in memory as a map of namespace -> doc keys.
type ChangeEventRecheckBuffer map[string][]interface{}

type UnknownEventError struct {
Event *ParsedEvent
}
Expand All @@ -49,7 +52,7 @@ func (uee UnknownEventError) Error() string {

// HandleChangeStreamEvent performs the necessary work for change stream events that occur during
// operation.
func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEvent *ParsedEvent) error {
func (verifier *Verifier) HandleChangeStreamEvent(changeEvent *ParsedEvent) error {
Copy link
Collaborator

@FGasper FGasper Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, what was the advantage of doing the buffering here versus closer to the read from the cursor? I was thinking we’d read all the events into a slice then pass that slice to this function (renamed HandleChangeStreamEvents).

We wouldn’t need separate handle-vs-flush methods in that case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I made the changes.

if changeEvent.ClusterTime != nil &&
(verifier.lastChangeEventTime == nil ||
verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) {
Expand All @@ -63,7 +66,9 @@ func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEve
case "replace":
fallthrough
case "update":
return verifier.InsertChangeEventRecheckDoc(ctx, changeEvent)
namespace := fmt.Sprintf("%s.%s", changeEvent.Ns.DB, changeEvent.Ns.Coll)
verifier.changeEventRecheckBuf[namespace] = append(verifier.changeEventRecheckBuf[changeEvent.Ns.String()], changeEvent.DocKey.ID)
return nil
default:
return UnknownEventError{Event: changeEvent}
}
Expand Down Expand Up @@ -98,20 +103,35 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
return err
}

readOneChangeEvent := func() (bool, error) {
gotEvent := cs.TryNext(ctx)
if gotEvent {
readAndHandleOneChangeEventBatch := func() (bool, error) {
eventsRead := 0
for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 {
gotEvent := cs.TryNext(ctx)

if !gotEvent {
break
}

var changeEvent ParsedEvent

if err := cs.Decode(&changeEvent); err != nil {
return false, errors.Wrap(err, "failed to decode change event")
}
err := verifier.HandleChangeStreamEvent(ctx, &changeEvent)
err := verifier.HandleChangeStreamEvent(&changeEvent)
if err != nil {
return false, errors.Wrap(err, "failed to handle change event")
}

eventsRead++
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be error handling for cs.Err() here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to the if statement above.


verifier.logger.Debug().Msgf("Received a batch of %d events", eventsRead)

if err := verifier.flushAllBufferedChangeEventRechecks(ctx); err != nil {
return false, errors.Wrap(err, "failed to flush buffered change event rechecks")
}

return gotEvent, errors.Wrap(cs.Err(), "change stream iteration failed")
return eventsRead > 0, errors.Wrap(cs.Err(), "change stream iteration failed")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems strange to defer handling cs.Err() until after HandleChangeStreamEvents() is called. It’d look a bit more idiomatic if the check were closer to the TryNext.

}

for {
Expand All @@ -137,15 +157,15 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
// (i.e., the `getMore` call returns empty)
for {
var gotEvent bool
gotEvent, err = readOneChangeEvent()
gotEvent, err = readAndHandleOneChangeEventBatch()

if !gotEvent || err != nil {
break
}
}

default:
_, err = readOneChangeEvent()
_, err = readAndHandleOneChangeEventBatch()
}

if err == nil {
Expand Down Expand Up @@ -189,10 +209,14 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
}

// StartChangeStream starts the change stream.
func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
func (verifier *Verifier) StartChangeStream(ctx context.Context, batchSize *int32) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be a setting? Aren’t we just taking whatever batch size we get from the source?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this for a test to pass in a small batch size.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was that necessary, though?

getMore always returns after a relatively short period of inactivity with just whatever events it happens to have seen. So the batch size should be naturally limited to whichever events you’ve triggered … ?

pipeline := verifier.GetChangeStreamFilter()
opts := options.ChangeStream().SetMaxAwaitTime(1 * time.Second)

if batchSize != nil {
opts = opts.SetBatchSize(*batchSize)
}

savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx)
if err != nil {
return errors.Wrap(err, "failed to load persisted change stream resume token")
Expand Down Expand Up @@ -352,3 +376,30 @@ func getClusterTimeFromSession(sess mongo.Session) (primitive.Timestamp, error)

return ctStruct.ClusterTime.ClusterTime, nil
}

func (verifier *Verifier) flushAllBufferedChangeEventRechecks(ctx context.Context) error {
for namespace, ids := range verifier.changeEventRecheckBuf {
if len(ids) == 0 {
return nil
}

// We don't know the document sizes for documents for all change events,
// so just be conservative and assume they are maximum size.
//
// Note that this prevents us from being able to report a meaningful
// total data size for noninitial generations in the log.
dataSizes := make([]int, len(ids))
for i := 0; i < len(ids); i++ {
dataSizes[i] = maxBSONObjSize
}

dbName, collName := SplitNamespace(namespace)
if err := verifier.insertRecheckDocs(ctx, dbName, collName, ids, dataSizes); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach will cause us to block between each namespace’s events, I think.

Could we instead persist all of the events for all namespaces at once?

return errors.Wrapf(err, "failed to insert recheck docs for namespace %s", namespace)
}

delete(verifier.changeEventRecheckBuf, namespace)
}

return nil
}
39 changes: 34 additions & 5 deletions internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() {
verifier1 := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := verifier1.StartChangeStream(ctx)
err := verifier1.StartChangeStream(ctx, nil)
suite.Require().NoError(err)
}()

Expand All @@ -63,7 +63,7 @@ func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() {

newTime := suite.getClusterTime(ctx, suite.srcMongoClient)

err = verifier2.StartChangeStream(ctx)
err = verifier2.StartChangeStream(ctx, nil)
suite.Require().NoError(err)

suite.Require().NotNil(verifier2.srcStartAtTs)
Expand Down Expand Up @@ -138,7 +138,7 @@ func (suite *MultiSourceVersionTestSuite) TestStartAtTimeNoChanges() {
suite.Require().NoError(err)
origStartTs := sess.OperationTime()
suite.Require().NotNil(origStartTs)
err = verifier.StartChangeStream(ctx)
err = verifier.StartChangeStream(ctx, nil)
suite.Require().NoError(err)
suite.Require().Equal(verifier.srcStartAtTs, origStartTs)
verifier.changeStreamEnderChan <- struct{}{}
Expand All @@ -158,7 +158,7 @@ func (suite *MultiSourceVersionTestSuite) TestStartAtTimeWithChanges() {
suite.Require().NoError(err)
origStartTs := sess.OperationTime()
suite.Require().NotNil(origStartTs)
err = verifier.StartChangeStream(ctx)
err = verifier.StartChangeStream(ctx, nil)
suite.Require().NoError(err)
suite.Require().Equal(verifier.srcStartAtTs, origStartTs)
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne(
Expand Down Expand Up @@ -193,8 +193,37 @@ func (suite *MultiSourceVersionTestSuite) TestNoStartAtTime() {
suite.Require().NoError(err)
origStartTs := sess.OperationTime()
suite.Require().NotNil(origStartTs)
err = verifier.StartChangeStream(ctx)
err = verifier.StartChangeStream(ctx, nil)
suite.Require().NoError(err)
suite.Require().NotNil(verifier.srcStartAtTs)
suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcStartAtTs), 0)
}

func (suite *MultiSourceVersionTestSuite) TestWithChangeEventsBatching() {
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

batchSize := int32(3)
suite.Require().NoError(verifier.StartChangeStream(ctx, &batchSize))

_, err := suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 1}})
suite.Require().NoError(err)
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 2}})
suite.Require().NoError(err)

_, err = suite.srcMongoClient.Database("testDb").Collection("testColl2").InsertOne(ctx, bson.D{{"_id", 1}})
suite.Require().NoError(err)

require.Eventually(
suite.T(),
func() bool {
return len(suite.fetchVerifierRechecks(ctx, verifier)) == 3
},
time.Minute,
500*time.Millisecond,
"the verifier should flush a recheck doc after a batch",
)
suite.Require().Empty(verifier.changeEventRecheckBuf["testDB.testColl1"])
suite.Require().Empty(verifier.changeEventRecheckBuf["testDB.testColl2"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid testing verifier internals here? It’d be ideal not to depend on them.

}
2 changes: 1 addition & 1 deletion internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
if !csRunning {
verifier.logger.Debug().Msg("Change stream not running; starting change stream")

err = verifier.StartChangeStream(ctx)
err = verifier.StartChangeStream(ctx, nil)
if err != nil {
return errors.Wrap(err, "failed to start change stream on source")
}
Expand Down
3 changes: 3 additions & 0 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ type Verifier struct {
globalFilter map[string]any

pprofInterval time.Duration

changeEventRecheckBuf ChangeEventRecheckBuffer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: We shouldn’t need this if we switch to reading all events from the cursor into a slice.

}

// VerificationStatus holds the Verification Status
Expand Down Expand Up @@ -195,6 +197,7 @@ func NewVerifier(settings VerifierSettings) *Verifier {
changeStreamErrChan: make(chan error),
changeStreamDoneChan: make(chan struct{}),
readConcernSetting: readConcern,
changeEventRecheckBuf: make(ChangeEventRecheckBuffer),
}
}

Expand Down
88 changes: 51 additions & 37 deletions internal/verifier/migration_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/10gen/migration-verifier/internal/testutil"
"github.com/cespare/permute/v2"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -26,6 +27,7 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/sync/errgroup"
)

var macArmMongoVersions []string = []string{
Expand Down Expand Up @@ -228,33 +230,31 @@ func (suite *MultiMetaVersionTestSuite) TestGetNamespaceStatistics_Recheck() {
ctx := context.Background()
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)

suite.Require().NoError(
verifier.InsertChangeEventRecheckDoc(
ctx,
&ParsedEvent{
OpType: "insert",
Ns: &Namespace{DB: "mydb", Coll: "coll2"},
DocKey: DocKey{
ID: "heyhey",
},
suite.handleAndFlushChangeEvent(
ctx,
verifier,
ParsedEvent{
OpType: "insert",
Ns: &Namespace{DB: "mydb", Coll: "coll2"},
DocKey: DocKey{
ID: "heyhey",
},
),
},
)

suite.Require().NoError(
verifier.InsertChangeEventRecheckDoc(
ctx,
&ParsedEvent{
ID: bson.M{
"docID": "ID/docID",
},
OpType: "insert",
Ns: &Namespace{DB: "mydb", Coll: "coll1"},
DocKey: DocKey{
ID: "hoohoo",
},
suite.handleAndFlushChangeEvent(
ctx,
verifier,
ParsedEvent{
ID: bson.M{
"docID": "ID/docID",
},
),
OpType: "insert",
Ns: &Namespace{DB: "mydb", Coll: "coll1"},
DocKey: DocKey{
ID: "hoohoo",
},
},
)

verifier.generation++
Expand Down Expand Up @@ -477,6 +477,14 @@ func (suite *MultiMetaVersionTestSuite) TestGetNamespaceStatistics_Gen0() {
)
}

func (suite *MultiMetaVersionTestSuite) handleAndFlushChangeEvent(ctx context.Context, verifier *Verifier, event ParsedEvent) {
err := verifier.HandleChangeStreamEvent(&event)
suite.Require().NoError(err)

err = verifier.flushAllBufferedChangeEventRechecks(ctx)
suite.Require().NoError(err)
}

func (suite *MultiMetaVersionTestSuite) TestFailedVerificationTaskInsertions() {
ctx := context.Background()
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
Expand All @@ -494,19 +502,16 @@ func (suite *MultiMetaVersionTestSuite) TestFailedVerificationTaskInsertions() {
Coll: "bar2",
},
}
err = verifier.HandleChangeStreamEvent(ctx, &event)
suite.Require().NoError(err)

suite.handleAndFlushChangeEvent(ctx, verifier, event)
event.OpType = "insert"
err = verifier.HandleChangeStreamEvent(ctx, &event)
suite.Require().NoError(err)
suite.handleAndFlushChangeEvent(ctx, verifier, event)
event.OpType = "replace"
err = verifier.HandleChangeStreamEvent(ctx, &event)
suite.Require().NoError(err)
suite.handleAndFlushChangeEvent(ctx, verifier, event)
event.OpType = "update"
err = verifier.HandleChangeStreamEvent(ctx, &event)
suite.Require().NoError(err)
suite.handleAndFlushChangeEvent(ctx, verifier, event)
event.OpType = "flibbity"
err = verifier.HandleChangeStreamEvent(ctx, &event)
err = verifier.HandleChangeStreamEvent(&event)
badEventErr := UnknownEventError{}
suite.Require().ErrorAs(err, &badEventErr)
suite.Assert().Equal("flibbity", badEventErr.Event.OpType)
Expand Down Expand Up @@ -1363,7 +1368,7 @@ func (suite *MultiDataVersionTestSuite) TestVerificationStatus() {
}

func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
zerolog.SetGlobalLevel(zerolog.DebugLevel)
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
verifier.SetSrcNamespaces([]string{"testDb1.testColl1"})
verifier.SetDstNamespaces([]string{"testDb2.testColl3"})
Expand All @@ -1382,10 +1387,16 @@ func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() {

checkDoneChan := make(chan struct{})
checkContinueChan := make(chan struct{})
go func() {
err := verifier.CheckDriver(ctx, nil, checkDoneChan, checkContinueChan)
suite.Require().NoError(err)
}()

errGroup, errGrpCtx := errgroup.WithContext(context.Background())
errGroup.Go(func() error {
checkDriverErr := verifier.CheckDriver(errGrpCtx, nil, checkDoneChan, checkContinueChan)
// Log this as fatal error so that the test doesn't hang.
if checkDriverErr != nil {
log.Fatal().Err(checkDriverErr).Msg("check driver error")
}
return checkDriverErr
})

waitForTasks := func() *VerificationStatus {
status, err := verifier.GetVerificationStatus()
Expand Down Expand Up @@ -1459,6 +1470,9 @@ func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() {
suite.Require().NoError(err)
// there should be a failure from the src insert
suite.Require().Equal(VerificationStatus{TotalTasks: 1, FailedTasks: 1}, *status)

checkContinueChan <- struct{}{}
require.NoError(suite.T(), errGroup.Wait())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find!

}

func (suite *MultiDataVersionTestSuite) TestVerifierWithFilter() {
Expand Down
Loading
Loading