Skip to content

Commit 13e2a62

Browse files
authored
REP-5219 Make migration-verifier process change events in batch. (#39)
This makes the change stream reader: 1. reads a change event until `RemainingBatchSize == 0` 2. buffer all events of a batch in memory 3. batch insert recheck docs It also fixes the bug that the `eventRecorder` counts each event twice.
1 parent 063c8e5 commit 13e2a62

File tree

5 files changed

+171
-100
lines changed

5 files changed

+171
-100
lines changed

internal/verifier/change_stream.go

Lines changed: 68 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -47,30 +47,50 @@ func (uee UnknownEventError) Error() string {
4747
return fmt.Sprintf("Received event with unknown optype: %+v", uee.Event)
4848
}
4949

50-
// HandleChangeStreamEvent performs the necessary work for change stream events that occur during
51-
// operation.
52-
func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEvent *ParsedEvent) error {
53-
if changeEvent.ClusterTime != nil &&
54-
(verifier.lastChangeEventTime == nil ||
55-
verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) {
56-
verifier.lastChangeEventTime = changeEvent.ClusterTime
50+
// HandleChangeStreamEvents performs the necessary work for change stream events after receiving a batch.
51+
func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []ParsedEvent) error {
52+
if len(batch) == 0 {
53+
return nil
5754
}
58-
switch changeEvent.OpType {
59-
case "delete":
60-
fallthrough
61-
case "insert":
62-
fallthrough
63-
case "replace":
64-
fallthrough
65-
case "update":
66-
if err := verifier.eventRecorder.AddEvent(changeEvent); err != nil {
67-
return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent)
68-
}
6955

70-
return verifier.InsertChangeEventRecheckDoc(ctx, changeEvent)
71-
default:
72-
return UnknownEventError{Event: changeEvent}
56+
dbNames := make([]string, len(batch))
57+
collNames := make([]string, len(batch))
58+
docIDs := make([]interface{}, len(batch))
59+
dataSizes := make([]int, len(batch))
60+
61+
for i, changeEvent := range batch {
62+
if changeEvent.ClusterTime != nil &&
63+
(verifier.lastChangeEventTime == nil ||
64+
verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) {
65+
verifier.lastChangeEventTime = changeEvent.ClusterTime
66+
}
67+
switch changeEvent.OpType {
68+
case "delete":
69+
fallthrough
70+
case "insert":
71+
fallthrough
72+
case "replace":
73+
fallthrough
74+
case "update":
75+
if err := verifier.eventRecorder.AddEvent(&changeEvent); err != nil {
76+
return errors.Wrapf(err, "failed to augment stats with change event: %+v", changeEvent)
77+
}
78+
dbNames[i] = changeEvent.Ns.DB
79+
collNames[i] = changeEvent.Ns.Coll
80+
docIDs[i] = changeEvent.DocKey.ID
81+
82+
// We don't know the document sizes for documents for all change events,
83+
// so just be conservative and assume they are maximum size.
84+
//
85+
// Note that this prevents us from being able to report a meaningful
86+
// total data size for noninitial generations in the log.
87+
dataSizes[i] = maxBSONObjSize
88+
default:
89+
return UnknownEventError{Event: &changeEvent}
90+
}
7391
}
92+
93+
return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes)
7494
}
7595

7696
func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
@@ -102,20 +122,37 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
102122
return err
103123
}
104124

105-
readOneChangeEvent := func() (bool, error) {
106-
gotEvent := cs.TryNext(ctx)
107-
if gotEvent {
108-
var changeEvent ParsedEvent
109-
if err := cs.Decode(&changeEvent); err != nil {
125+
readAndHandleOneChangeEventBatch := func() (bool, error) {
126+
eventsRead := 0
127+
var changeEventBatch []ParsedEvent
128+
129+
for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 {
130+
gotEvent := cs.TryNext(ctx)
131+
132+
if !gotEvent || cs.Err() != nil {
133+
break
134+
}
135+
136+
if changeEventBatch == nil {
137+
changeEventBatch = make([]ParsedEvent, cs.RemainingBatchLength()+1)
138+
}
139+
140+
if err := cs.Decode(&changeEventBatch[eventsRead]); err != nil {
110141
return false, errors.Wrap(err, "failed to decode change event")
111142
}
112-
err := verifier.HandleChangeStreamEvent(ctx, &changeEvent)
143+
144+
eventsRead++
145+
}
146+
147+
if eventsRead > 0 {
148+
verifier.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.")
149+
err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
113150
if err != nil {
114-
return false, errors.Wrap(err, "failed to handle change event")
151+
return false, errors.Wrap(err, "failed to handle change events")
115152
}
116153
}
117154

118-
return gotEvent, errors.Wrap(cs.Err(), "change stream iteration failed")
155+
return eventsRead > 0, errors.Wrap(cs.Err(), "change stream iteration failed")
119156
}
120157

121158
for {
@@ -141,15 +178,15 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
141178
// (i.e., the `getMore` call returns empty)
142179
for {
143180
var gotEvent bool
144-
gotEvent, err = readOneChangeEvent()
181+
gotEvent, err = readAndHandleOneChangeEventBatch()
145182

146183
if !gotEvent || err != nil {
147184
break
148185
}
149186
}
150187

151188
default:
152-
_, err = readOneChangeEvent()
189+
_, err = readAndHandleOneChangeEventBatch()
153190
}
154191

155192
if err == nil {

internal/verifier/change_stream_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,3 +198,31 @@ func (suite *MultiSourceVersionTestSuite) TestNoStartAtTime() {
198198
suite.Require().NotNil(verifier.srcStartAtTs)
199199
suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcStartAtTs), 0)
200200
}
201+
202+
func (suite *MultiSourceVersionTestSuite) TestWithChangeEventsBatching() {
203+
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
204+
ctx, cancel := context.WithCancel(context.Background())
205+
defer cancel()
206+
207+
suite.Require().NoError(verifier.StartChangeStream(ctx))
208+
209+
_, err := suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 1}})
210+
suite.Require().NoError(err)
211+
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 2}})
212+
suite.Require().NoError(err)
213+
214+
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl2").InsertOne(ctx, bson.D{{"_id", 1}})
215+
suite.Require().NoError(err)
216+
217+
var rechecks []bson.M
218+
require.Eventually(
219+
suite.T(),
220+
func() bool {
221+
rechecks = suite.fetchVerifierRechecks(ctx, verifier)
222+
return len(rechecks) == 3
223+
},
224+
time.Minute,
225+
500*time.Millisecond,
226+
"the verifier should flush a recheck doc after a batch",
227+
)
228+
}

internal/verifier/migration_verifier_test.go

Lines changed: 43 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/10gen/migration-verifier/internal/testutil"
1919
"github.com/cespare/permute/v2"
2020
"github.com/rs/zerolog"
21+
"github.com/rs/zerolog/log"
2122
"github.com/samber/lo"
2223
"github.com/stretchr/testify/assert"
2324
"github.com/stretchr/testify/require"
@@ -26,6 +27,7 @@ import (
2627
"go.mongodb.org/mongo-driver/bson/primitive"
2728
"go.mongodb.org/mongo-driver/mongo"
2829
"go.mongodb.org/mongo-driver/mongo/options"
30+
"golang.org/x/sync/errgroup"
2931
)
3032

3133
var macArmMongoVersions []string = []string{
@@ -228,34 +230,32 @@ func (suite *MultiMetaVersionTestSuite) TestGetNamespaceStatistics_Recheck() {
228230
ctx := context.Background()
229231
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
230232

231-
suite.Require().NoError(
232-
verifier.InsertChangeEventRecheckDoc(
233-
ctx,
234-
&ParsedEvent{
235-
OpType: "insert",
236-
Ns: &Namespace{DB: "mydb", Coll: "coll2"},
237-
DocKey: DocKey{
238-
ID: "heyhey",
239-
},
233+
err := verifier.HandleChangeStreamEvents(
234+
ctx,
235+
[]ParsedEvent{{
236+
OpType: "insert",
237+
Ns: &Namespace{DB: "mydb", Coll: "coll2"},
238+
DocKey: DocKey{
239+
ID: "heyhey",
240240
},
241-
),
241+
}},
242242
)
243+
suite.Require().NoError(err)
243244

244-
suite.Require().NoError(
245-
verifier.InsertChangeEventRecheckDoc(
246-
ctx,
247-
&ParsedEvent{
248-
ID: bson.M{
249-
"docID": "ID/docID",
250-
},
251-
OpType: "insert",
252-
Ns: &Namespace{DB: "mydb", Coll: "coll1"},
253-
DocKey: DocKey{
254-
ID: "hoohoo",
255-
},
245+
err = verifier.HandleChangeStreamEvents(
246+
ctx,
247+
[]ParsedEvent{{
248+
ID: bson.M{
249+
"docID": "ID/docID",
256250
},
257-
),
251+
OpType: "insert",
252+
Ns: &Namespace{DB: "mydb", Coll: "coll1"},
253+
DocKey: DocKey{
254+
ID: "hoohoo",
255+
},
256+
}},
258257
)
258+
suite.Require().NoError(err)
259259

260260
verifier.generation++
261261

@@ -494,19 +494,20 @@ func (suite *MultiMetaVersionTestSuite) TestFailedVerificationTaskInsertions() {
494494
Coll: "bar2",
495495
},
496496
}
497-
err = verifier.HandleChangeStreamEvent(ctx, &event)
497+
498+
err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
498499
suite.Require().NoError(err)
499500
event.OpType = "insert"
500-
err = verifier.HandleChangeStreamEvent(ctx, &event)
501+
err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
501502
suite.Require().NoError(err)
502503
event.OpType = "replace"
503-
err = verifier.HandleChangeStreamEvent(ctx, &event)
504+
err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
504505
suite.Require().NoError(err)
505506
event.OpType = "update"
506-
err = verifier.HandleChangeStreamEvent(ctx, &event)
507+
err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
507508
suite.Require().NoError(err)
508509
event.OpType = "flibbity"
509-
err = verifier.HandleChangeStreamEvent(ctx, &event)
510+
err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
510511
badEventErr := UnknownEventError{}
511512
suite.Require().ErrorAs(err, &badEventErr)
512513
suite.Assert().Equal("flibbity", badEventErr.Event.OpType)
@@ -1363,7 +1364,7 @@ func (suite *MultiDataVersionTestSuite) TestVerificationStatus() {
13631364
}
13641365

13651366
func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() {
1366-
zerolog.SetGlobalLevel(zerolog.InfoLevel)
1367+
zerolog.SetGlobalLevel(zerolog.DebugLevel)
13671368
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
13681369
verifier.SetSrcNamespaces([]string{"testDb1.testColl1"})
13691370
verifier.SetDstNamespaces([]string{"testDb2.testColl3"})
@@ -1382,10 +1383,16 @@ func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() {
13821383

13831384
checkDoneChan := make(chan struct{})
13841385
checkContinueChan := make(chan struct{})
1385-
go func() {
1386-
err := verifier.CheckDriver(ctx, nil, checkDoneChan, checkContinueChan)
1387-
suite.Require().NoError(err)
1388-
}()
1386+
1387+
errGroup, errGrpCtx := errgroup.WithContext(context.Background())
1388+
errGroup.Go(func() error {
1389+
checkDriverErr := verifier.CheckDriver(errGrpCtx, nil, checkDoneChan, checkContinueChan)
1390+
// Log this as fatal error so that the test doesn't hang.
1391+
if checkDriverErr != nil {
1392+
log.Fatal().Err(checkDriverErr).Msg("check driver error")
1393+
}
1394+
return checkDriverErr
1395+
})
13891396

13901397
waitForTasks := func() *VerificationStatus {
13911398
status, err := verifier.GetVerificationStatus()
@@ -1459,6 +1466,9 @@ func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() {
14591466
suite.Require().NoError(err)
14601467
// there should be a failure from the src insert
14611468
suite.Require().Equal(VerificationStatus{TotalTasks: 1, FailedTasks: 1}, *status)
1469+
1470+
checkContinueChan <- struct{}{}
1471+
require.NoError(suite.T(), errGroup.Wait())
14621472
}
14631473

14641474
func (suite *MultiDataVersionTestSuite) TestVerifierWithFilter() {

internal/verifier/recheck.go

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55

66
"github.com/10gen/migration-verifier/internal/types"
7-
"github.com/pkg/errors"
87
"go.mongodb.org/mongo-driver/bson"
98
"go.mongodb.org/mongo-driver/mongo"
109
"go.mongodb.org/mongo-driver/mongo/options"
@@ -36,46 +35,40 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs(
3635
namespace string, documentIDs []interface{}, dataSizes []int) error {
3736
dbName, collName := SplitNamespace(namespace)
3837

39-
verifier.mux.Lock()
40-
defer verifier.mux.Unlock()
41-
42-
return verifier.insertRecheckDocsUnderLock(context.Background(),
43-
dbName, collName, documentIDs, dataSizes)
44-
}
45-
46-
func (verifier *Verifier) InsertChangeEventRecheckDoc(ctx context.Context, changeEvent *ParsedEvent) error {
47-
documentIDs := []interface{}{changeEvent.DocKey.ID}
48-
49-
// We don't know the document sizes for documents for all change events,
50-
// so just be conservative and assume they are maximum size.
51-
//
52-
// Note that this prevents us from being able to report a meaningful
53-
// total data size for noninitial generations in the log.
54-
dataSizes := []int{maxBSONObjSize}
55-
56-
verifier.mux.Lock()
57-
defer verifier.mux.Unlock()
58-
59-
if err := verifier.eventRecorder.AddEvent(changeEvent); err != nil {
60-
return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent)
38+
dbNames := make([]string, len(documentIDs))
39+
collNames := make([]string, len(documentIDs))
40+
for i := range documentIDs {
41+
dbNames[i] = dbName
42+
collNames[i] = collName
6143
}
6244

63-
return verifier.insertRecheckDocsUnderLock(
64-
ctx, changeEvent.Ns.DB, changeEvent.Ns.Coll, documentIDs, dataSizes)
45+
return verifier.insertRecheckDocs(
46+
context.Background(),
47+
dbNames,
48+
collNames,
49+
documentIDs,
50+
dataSizes,
51+
)
6552
}
6653

67-
func (verifier *Verifier) insertRecheckDocsUnderLock(
54+
func (verifier *Verifier) insertRecheckDocs(
6855
ctx context.Context,
69-
dbName, collName string, documentIDs []interface{}, dataSizes []int) error {
56+
dbNames []string,
57+
collNames []string,
58+
documentIDs []interface{},
59+
dataSizes []int,
60+
) error {
61+
verifier.mux.Lock()
62+
defer verifier.mux.Unlock()
7063

7164
generation, _ := verifier.getGenerationWhileLocked()
7265

7366
models := []mongo.WriteModel{}
7467
for i, documentID := range documentIDs {
7568
pk := RecheckPrimaryKey{
7669
Generation: generation,
77-
DatabaseName: dbName,
78-
CollectionName: collName,
70+
DatabaseName: dbNames[i],
71+
CollectionName: collNames[i],
7972
DocumentID: documentID,
8073
}
8174

0 commit comments

Comments
 (0)