-
Notifications
You must be signed in to change notification settings - Fork 15
REP-5219 Make migration-verifier process change events in batch. #39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
7a11e3e
6dfbc64
b201a20
c4c7a66
59effc4
84b6df6
3c6383b
759a9bc
b5a170a
91b1896
a58b996
0fea3ad
3ac0547
86935c1
5c71523
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -47,30 +47,53 @@ func (uee UnknownEventError) Error() string { | |||||
| return fmt.Sprintf("Received event with unknown optype: %+v", uee.Event) | ||||||
| } | ||||||
|
|
||||||
| // HandleChangeStreamEvent performs the necessary work for change stream events that occur during | ||||||
| // operation. | ||||||
| func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEvent *ParsedEvent) error { | ||||||
| if changeEvent.ClusterTime != nil && | ||||||
| (verifier.lastChangeEventTime == nil || | ||||||
| verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) { | ||||||
| verifier.lastChangeEventTime = changeEvent.ClusterTime | ||||||
| // HandleChangeStreamEvents performs the necessary work for change stream events after receiving a batch. | ||||||
| func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []ParsedEvent) error { | ||||||
| if len(batch) == 0 { | ||||||
| return nil | ||||||
| } | ||||||
| switch changeEvent.OpType { | ||||||
| case "delete": | ||||||
| fallthrough | ||||||
| case "insert": | ||||||
| fallthrough | ||||||
| case "replace": | ||||||
| fallthrough | ||||||
| case "update": | ||||||
| if err := verifier.eventRecorder.AddEvent(changeEvent); err != nil { | ||||||
| return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent) | ||||||
| } | ||||||
|
|
||||||
| return verifier.InsertChangeEventRecheckDoc(ctx, changeEvent) | ||||||
| default: | ||||||
| return UnknownEventError{Event: changeEvent} | ||||||
| dbNames := make([]string, len(batch)) | ||||||
| collNames := make([]string, len(batch)) | ||||||
| docIDs := make([]interface{}, len(batch)) | ||||||
| dataSizes := make([]int, len(batch)) | ||||||
|
|
||||||
| for i, changeEvent := range batch { | ||||||
| if changeEvent.ClusterTime != nil && | ||||||
| (verifier.lastChangeEventTime == nil || | ||||||
| verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) { | ||||||
| verifier.lastChangeEventTime = changeEvent.ClusterTime | ||||||
| } | ||||||
| switch changeEvent.OpType { | ||||||
| case "delete": | ||||||
| fallthrough | ||||||
| case "insert": | ||||||
| fallthrough | ||||||
| case "replace": | ||||||
| fallthrough | ||||||
| case "update": | ||||||
| if err := verifier.eventRecorder.AddEvent(&changeEvent); err != nil { | ||||||
| return errors.Wrapf(err, "failed to augment stats with change event: %+v", changeEvent) | ||||||
| } | ||||||
| dbNames[i] = changeEvent.Ns.DB | ||||||
| collNames[i] = changeEvent.Ns.Coll | ||||||
| docIDs[i] = changeEvent.DocKey.ID | ||||||
|
|
||||||
| // We don't know the document sizes for documents for all change events, | ||||||
| // so just be conservative and assume they are maximum size. | ||||||
| // | ||||||
| // Note that this prevents us from being able to report a meaningful | ||||||
| // total data size for noninitial generations in the log. | ||||||
| dataSizes[i] = maxBSONObjSize | ||||||
| default: | ||||||
| return UnknownEventError{Event: &changeEvent} | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| verifier.mux.Lock() | ||||||
| defer verifier.mux.Unlock() | ||||||
|
|
||||||
| return verifier.insertRecheckDocsWhileLocked(ctx, dbNames, collNames, docIDs, dataSizes) | ||||||
| } | ||||||
|
|
||||||
| func (verifier *Verifier) GetChangeStreamFilter() []bson.D { | ||||||
|
|
@@ -102,20 +125,38 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha | |||||
| return err | ||||||
| } | ||||||
|
|
||||||
| readOneChangeEvent := func() (bool, error) { | ||||||
| gotEvent := cs.TryNext(ctx) | ||||||
| if gotEvent { | ||||||
| var changeEvent ParsedEvent | ||||||
| if err := cs.Decode(&changeEvent); err != nil { | ||||||
| return false, errors.Wrap(err, "failed to decode change event") | ||||||
| readAndHandleOneChangeEventBatch := func() (bool, error) { | ||||||
| eventsRead := 0 | ||||||
| var changeEventBatch []ParsedEvent | ||||||
|
|
||||||
| for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 { | ||||||
FGasper marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| gotEvent := cs.TryNext(ctx) | ||||||
|
|
||||||
| if !gotEvent || cs.Err() != nil { | ||||||
| break | ||||||
| } | ||||||
|
|
||||||
| if changeEventBatch == nil { | ||||||
| changeEventBatch = make([]ParsedEvent, cs.RemainingBatchLength()+1) | ||||||
| } | ||||||
| err := verifier.HandleChangeStreamEvent(ctx, &changeEvent) | ||||||
| if err != nil { | ||||||
| return false, errors.Wrap(err, "failed to handle change event") | ||||||
|
|
||||||
| if err := cs.Decode(&changeEventBatch[eventsRead]); err != nil { | ||||||
| return false, errors.Wrap(err, "failed to decode change event") | ||||||
| } | ||||||
|
|
||||||
| eventsRead++ | ||||||
| } | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should there be error handling for cs.Err() here?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added to the if statement above. |
||||||
|
|
||||||
| if eventsRead > 0 { | ||||||
| verifier.logger.Debug().Msgf("Received a batch of %d events", eventsRead) | ||||||
|
||||||
| verifier.logger.Debug().Msgf("Received a batch of %d events", eventsRead) | |
| verifier.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.") |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this handling logic also be under the >0 condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems strange to defer handling cs.Err() until after HandleChangeStreamEvents() is called. It’d look a bit more idiomatic if the check were closer to the TryNext.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ import ( | |
| "github.com/10gen/migration-verifier/internal/testutil" | ||
| "github.com/cespare/permute/v2" | ||
| "github.com/rs/zerolog" | ||
| "github.com/rs/zerolog/log" | ||
| "github.com/samber/lo" | ||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
|
|
@@ -26,6 +27,7 @@ import ( | |
| "go.mongodb.org/mongo-driver/bson/primitive" | ||
| "go.mongodb.org/mongo-driver/mongo" | ||
| "go.mongodb.org/mongo-driver/mongo/options" | ||
| "golang.org/x/sync/errgroup" | ||
| ) | ||
|
|
||
| var macArmMongoVersions []string = []string{ | ||
|
|
@@ -228,34 +230,32 @@ func (suite *MultiMetaVersionTestSuite) TestGetNamespaceStatistics_Recheck() { | |
| ctx := context.Background() | ||
| verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) | ||
|
|
||
| suite.Require().NoError( | ||
| verifier.InsertChangeEventRecheckDoc( | ||
| ctx, | ||
| &ParsedEvent{ | ||
| OpType: "insert", | ||
| Ns: &Namespace{DB: "mydb", Coll: "coll2"}, | ||
| DocKey: DocKey{ | ||
| ID: "heyhey", | ||
| }, | ||
| err := verifier.HandleChangeStreamEvents( | ||
| ctx, | ||
| []ParsedEvent{{ | ||
| OpType: "insert", | ||
| Ns: &Namespace{DB: "mydb", Coll: "coll2"}, | ||
| DocKey: DocKey{ | ||
| ID: "heyhey", | ||
| }, | ||
| ), | ||
| }}, | ||
| ) | ||
| suite.Require().NoError(err) | ||
|
|
||
| suite.Require().NoError( | ||
| verifier.InsertChangeEventRecheckDoc( | ||
| ctx, | ||
| &ParsedEvent{ | ||
| ID: bson.M{ | ||
| "docID": "ID/docID", | ||
| }, | ||
| OpType: "insert", | ||
| Ns: &Namespace{DB: "mydb", Coll: "coll1"}, | ||
| DocKey: DocKey{ | ||
| ID: "hoohoo", | ||
| }, | ||
| err = verifier.HandleChangeStreamEvents( | ||
| ctx, | ||
| []ParsedEvent{{ | ||
| ID: bson.M{ | ||
| "docID": "ID/docID", | ||
| }, | ||
| ), | ||
| OpType: "insert", | ||
| Ns: &Namespace{DB: "mydb", Coll: "coll1"}, | ||
| DocKey: DocKey{ | ||
| ID: "hoohoo", | ||
| }, | ||
| }}, | ||
| ) | ||
| suite.Require().NoError(err) | ||
|
|
||
| verifier.generation++ | ||
|
|
||
|
|
@@ -494,19 +494,20 @@ func (suite *MultiMetaVersionTestSuite) TestFailedVerificationTaskInsertions() { | |
| Coll: "bar2", | ||
| }, | ||
| } | ||
| err = verifier.HandleChangeStreamEvent(ctx, &event) | ||
|
|
||
| err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) | ||
| suite.Require().NoError(err) | ||
| event.OpType = "insert" | ||
| err = verifier.HandleChangeStreamEvent(ctx, &event) | ||
| err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) | ||
| suite.Require().NoError(err) | ||
| event.OpType = "replace" | ||
| err = verifier.HandleChangeStreamEvent(ctx, &event) | ||
| err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) | ||
| suite.Require().NoError(err) | ||
| event.OpType = "update" | ||
| err = verifier.HandleChangeStreamEvent(ctx, &event) | ||
| err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) | ||
| suite.Require().NoError(err) | ||
| event.OpType = "flibbity" | ||
| err = verifier.HandleChangeStreamEvent(ctx, &event) | ||
| err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) | ||
| badEventErr := UnknownEventError{} | ||
| suite.Require().ErrorAs(err, &badEventErr) | ||
| suite.Assert().Equal("flibbity", badEventErr.Event.OpType) | ||
|
|
@@ -1363,7 +1364,7 @@ func (suite *MultiDataVersionTestSuite) TestVerificationStatus() { | |
| } | ||
|
|
||
| func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() { | ||
| zerolog.SetGlobalLevel(zerolog.InfoLevel) | ||
| zerolog.SetGlobalLevel(zerolog.DebugLevel) | ||
| verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) | ||
| verifier.SetSrcNamespaces([]string{"testDb1.testColl1"}) | ||
| verifier.SetDstNamespaces([]string{"testDb2.testColl3"}) | ||
|
|
@@ -1382,10 +1383,16 @@ func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() { | |
|
|
||
| checkDoneChan := make(chan struct{}) | ||
| checkContinueChan := make(chan struct{}) | ||
| go func() { | ||
| err := verifier.CheckDriver(ctx, nil, checkDoneChan, checkContinueChan) | ||
| suite.Require().NoError(err) | ||
| }() | ||
|
|
||
| errGroup, errGrpCtx := errgroup.WithContext(context.Background()) | ||
| errGroup.Go(func() error { | ||
| checkDriverErr := verifier.CheckDriver(errGrpCtx, nil, checkDoneChan, checkContinueChan) | ||
| // Log this as fatal error so that the test doesn't hang. | ||
| if checkDriverErr != nil { | ||
| log.Fatal().Err(checkDriverErr).Msg("check driver error") | ||
| } | ||
| return checkDriverErr | ||
| }) | ||
|
|
||
| waitForTasks := func() *VerificationStatus { | ||
| status, err := verifier.GetVerificationStatus() | ||
|
|
@@ -1459,6 +1466,9 @@ func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() { | |
| suite.Require().NoError(err) | ||
| // there should be a failure from the src insert | ||
| suite.Require().Equal(VerificationStatus{TotalTasks: 1, FailedTasks: 1}, *status) | ||
|
|
||
| checkContinueChan <- struct{}{} | ||
| require.NoError(suite.T(), errGroup.Wait()) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice find! |
||
| } | ||
|
|
||
| func (suite *MultiDataVersionTestSuite) TestVerifierWithFilter() { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,6 @@ import ( | |
| "context" | ||
|
|
||
| "github.com/10gen/migration-verifier/internal/types" | ||
| "github.com/pkg/errors" | ||
| "go.mongodb.org/mongo-driver/bson" | ||
| "go.mongodb.org/mongo-driver/mongo" | ||
| "go.mongodb.org/mongo-driver/mongo/options" | ||
|
|
@@ -36,46 +35,32 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs( | |
| namespace string, documentIDs []interface{}, dataSizes []int) error { | ||
| dbName, collName := SplitNamespace(namespace) | ||
|
|
||
| verifier.mux.Lock() | ||
| defer verifier.mux.Unlock() | ||
|
|
||
| return verifier.insertRecheckDocsUnderLock(context.Background(), | ||
| dbName, collName, documentIDs, dataSizes) | ||
| } | ||
|
|
||
| func (verifier *Verifier) InsertChangeEventRecheckDoc(ctx context.Context, changeEvent *ParsedEvent) error { | ||
| documentIDs := []interface{}{changeEvent.DocKey.ID} | ||
|
|
||
| // We don't know the document sizes for documents for all change events, | ||
| // so just be conservative and assume they are maximum size. | ||
| // | ||
| // Note that this prevents us from being able to report a meaningful | ||
| // total data size for noninitial generations in the log. | ||
| dataSizes := []int{maxBSONObjSize} | ||
| dbNames := make([]string, len(documentIDs)) | ||
| collNames := make([]string, len(documentIDs)) | ||
| for i := range documentIDs { | ||
| dbNames[i] = dbName | ||
| collNames[i] = collName | ||
| } | ||
|
|
||
| verifier.mux.Lock() | ||
| defer verifier.mux.Unlock() | ||
|
|
||
| if err := verifier.eventRecorder.AddEvent(changeEvent); err != nil { | ||
| return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent) | ||
| } | ||
|
|
||
| return verifier.insertRecheckDocsUnderLock( | ||
| ctx, changeEvent.Ns.DB, changeEvent.Ns.Coll, documentIDs, dataSizes) | ||
| return verifier.insertRecheckDocsWhileLocked(context.Background(), | ||
| dbNames, collNames, documentIDs, dataSizes) | ||
| } | ||
|
|
||
| func (verifier *Verifier) insertRecheckDocsUnderLock( | ||
| func (verifier *Verifier) insertRecheckDocsWhileLocked( | ||
| ctx context.Context, | ||
| dbName, collName string, documentIDs []interface{}, dataSizes []int) error { | ||
| dbNames []string, collNames []string, documentIDs []interface{}, dataSizes []int) error { | ||
|
||
|
|
||
| generation, _ := verifier.getGenerationWhileLocked() | ||
|
|
||
| models := []mongo.WriteModel{} | ||
| for i, documentID := range documentIDs { | ||
| pk := RecheckPrimaryKey{ | ||
| Generation: generation, | ||
| DatabaseName: dbName, | ||
| CollectionName: collName, | ||
| DatabaseName: dbNames[i], | ||
| CollectionName: collNames[i], | ||
| DocumentID: documentID, | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it'd make more sense to put the locking in the
insertRecheckDocsWhileLockedmethod. Right now every caller has to remember to use the mutex properly, which seems like a recipe for mistakes.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made it this way just this week to accommodate stats. If we’re going to change this I’d rather it’d be in a separate PR since the changes don’t really seem germane.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed the change event stats from this function in this PR. I think it makes sense to make this change here as well.