Skip to content

Commit 7a11e3e

Browse files
committed
buffer by change stream batch
1 parent fa17104 commit 7a11e3e

File tree

7 files changed

+106
-66
lines changed

7 files changed

+106
-66
lines changed

internal/verifier/change_stream.go

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ const (
3939
metadataChangeStreamCollectionName = "changeStream"
4040
)
4141

42+
// ChangeEventRecheckBuffer buffers change events recheck docs in memory as a map of namespace -> _ids.
43+
type ChangeEventRecheckBuffer map[string][]interface{}
44+
4245
type UnknownEventError struct {
4346
Event *ParsedEvent
4447
}
@@ -49,7 +52,7 @@ func (uee UnknownEventError) Error() string {
4952

5053
// HandleChangeStreamEvent performs the necessary work for change stream events that occur during
5154
// operation.
52-
func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEvent *ParsedEvent) error {
55+
func (verifier *Verifier) HandleChangeStreamEvent(changeEvent *ParsedEvent) error {
5356
if changeEvent.ClusterTime != nil &&
5457
(verifier.lastChangeEventTime == nil ||
5558
verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) {
@@ -63,7 +66,8 @@ func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEve
6366
case "replace":
6467
fallthrough
6568
case "update":
66-
return verifier.InsertChangeEventRecheckDoc(ctx, changeEvent)
69+
verifier.changeEventRecheckBuf[changeEvent.Ns.String()] = append(verifier.changeEventRecheckBuf[changeEvent.Ns.String()], changeEvent.DocKey.ID)
70+
return nil
6771
default:
6872
return UnknownEventError{Event: changeEvent}
6973
}
@@ -100,19 +104,30 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
100104
return err
101105
}
102106

103-
readOneChangeEvent := func() (bool, error) {
104-
gotEvent := cs.TryNext(ctx)
105-
if gotEvent {
107+
readAndHandleOneChangeEventBatch := func() (bool, error) {
108+
eventsRead := 0
109+
for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 {
110+
gotEvent := cs.TryNext(ctx)
111+
if !gotEvent {
112+
break
113+
}
114+
106115
if err := cs.Decode(&changeEvent); err != nil {
107116
return false, errors.Wrap(err, "failed to decode change event")
108117
}
109-
err := verifier.HandleChangeStreamEvent(ctx, &changeEvent)
118+
err := verifier.HandleChangeStreamEvent(&changeEvent)
110119
if err != nil {
111120
return false, errors.Wrap(err, "failed to handle change event")
112121
}
122+
123+
eventsRead++
113124
}
114125

115-
return gotEvent, errors.Wrap(cs.Err(), "change stream iteration failed")
126+
if err := verifier.flushAllBufferedChangeEventRechecks(ctx); err != nil {
127+
return false, errors.Wrap(err, "failed to flush buffered change event rechecks")
128+
}
129+
130+
return eventsRead > 0, errors.Wrap(cs.Err(), "change stream iteration failed")
116131
}
117132

118133
for {
@@ -136,7 +151,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
136151
// Read all change events until the source reports no events.
137152
// (i.e., the `getMore` call returns empty)
138153
for {
139-
gotEvent, err = readOneChangeEvent()
154+
gotEvent, err = readAndHandleOneChangeEventBatch()
140155

141156
if !gotEvent || err != nil {
142157
break
@@ -148,7 +163,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
148163
}
149164

150165
default:
151-
_, err = readOneChangeEvent()
166+
_, err = readAndHandleOneChangeEventBatch()
152167
}
153168

154169
if err == nil {
@@ -177,10 +192,14 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
177192
}
178193

179194
// StartChangeStream starts the change stream.
180-
func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
195+
func (verifier *Verifier) StartChangeStream(ctx context.Context, batchSize *int32) error {
181196
pipeline := verifier.GetChangeStreamFilter()
182197
opts := options.ChangeStream().SetMaxAwaitTime(1 * time.Second)
183198

199+
if batchSize != nil {
200+
opts = opts.SetBatchSize(*batchSize)
201+
}
202+
184203
savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx)
185204
if err != nil {
186205
return errors.Wrap(err, "failed to load persisted change stream resume token")
@@ -340,3 +359,30 @@ func getClusterTimeFromSession(sess mongo.Session) (primitive.Timestamp, error)
340359

341360
return ctStruct.ClusterTime.ClusterTime, nil
342361
}
362+
363+
func (verifier *Verifier) flushAllBufferedChangeEventRechecks(ctx context.Context) error {
364+
for namespace, ids := range verifier.changeEventRecheckBuf {
365+
if len(ids) == 0 {
366+
return nil
367+
}
368+
369+
// We don't know the document sizes for documents for all change events,
370+
// so just be conservative and assume they are maximum size.
371+
//
372+
// Note that this prevents us from being able to report a meaningful
373+
// total data size for noninitial generations in the log.
374+
dataSizes := make([]int, len(ids))
375+
for i, _ := range ids {
376+
dataSizes[i] = maxBSONObjSize
377+
}
378+
379+
dbName, collName := SplitNamespace(namespace)
380+
if err := verifier.insertRecheckDocs(ctx, dbName, collName, ids, dataSizes); err != nil {
381+
return errors.Wrapf(err, "failed to insert recheck docs for namespace %s", namespace)
382+
}
383+
384+
delete(verifier.changeEventRecheckBuf, namespace)
385+
}
386+
387+
return nil
388+
}

internal/verifier/change_stream_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() {
3838
verifier1 := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
3939
ctx, cancel := context.WithCancel(context.Background())
4040
defer cancel()
41-
err := verifier1.StartChangeStream(ctx)
41+
err := verifier1.StartChangeStream(ctx, nil)
4242
suite.Require().NoError(err)
4343
}()
4444

@@ -63,7 +63,7 @@ func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() {
6363

6464
newTime := suite.getClusterTime(ctx, suite.srcMongoClient)
6565

66-
err = verifier2.StartChangeStream(ctx)
66+
err = verifier2.StartChangeStream(ctx, nil)
6767
suite.Require().NoError(err)
6868

6969
suite.Require().NotNil(verifier2.srcStartAtTs)
@@ -138,7 +138,7 @@ func (suite *MultiSourceVersionTestSuite) TestStartAtTimeNoChanges() {
138138
suite.Require().NoError(err)
139139
origStartTs := sess.OperationTime()
140140
suite.Require().NotNil(origStartTs)
141-
err = verifier.StartChangeStream(ctx)
141+
err = verifier.StartChangeStream(ctx, nil)
142142
suite.Require().NoError(err)
143143
suite.Require().Equal(verifier.srcStartAtTs, origStartTs)
144144
verifier.changeStreamEnderChan <- struct{}{}
@@ -158,7 +158,7 @@ func (suite *MultiSourceVersionTestSuite) TestStartAtTimeWithChanges() {
158158
suite.Require().NoError(err)
159159
origStartTs := sess.OperationTime()
160160
suite.Require().NotNil(origStartTs)
161-
err = verifier.StartChangeStream(ctx)
161+
err = verifier.StartChangeStream(ctx, nil)
162162
suite.Require().NoError(err)
163163
suite.Require().Equal(verifier.srcStartAtTs, origStartTs)
164164
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne(
@@ -193,7 +193,7 @@ func (suite *MultiSourceVersionTestSuite) TestNoStartAtTime() {
193193
suite.Require().NoError(err)
194194
origStartTs := sess.OperationTime()
195195
suite.Require().NotNil(origStartTs)
196-
err = verifier.StartChangeStream(ctx)
196+
err = verifier.StartChangeStream(ctx, nil)
197197
suite.Require().NoError(err)
198198
suite.Require().NotNil(verifier.srcStartAtTs)
199199
suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcStartAtTs), 0)

internal/verifier/check.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
162162
if !csRunning {
163163
verifier.logger.Debug().Msg("Change stream not running; starting change stream")
164164

165-
err = verifier.StartChangeStream(ctx)
165+
err = verifier.StartChangeStream(ctx, nil)
166166
if err != nil {
167167
return errors.Wrap(err, "failed to start change stream on source")
168168
}

internal/verifier/migration_verifier.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ type Verifier struct {
138138
globalFilter map[string]any
139139

140140
pprofInterval time.Duration
141+
142+
changeEventRecheckBuf ChangeEventRecheckBuffer
141143
}
142144

143145
// VerificationStatus holds the Verification Status
@@ -197,6 +199,7 @@ func NewVerifier(settings VerifierSettings) *Verifier {
197199
changeStreamErrChan: make(chan error),
198200
changeStreamDoneChan: make(chan struct{}),
199201
readConcernSetting: readConcern,
202+
changeEventRecheckBuf: make(ChangeEventRecheckBuffer),
200203
}
201204
}
202205

internal/verifier/migration_verifier_test.go

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -231,33 +231,31 @@ func (suite *MultiMetaVersionTestSuite) TestGetNamespaceStatistics_Recheck() {
231231
ctx := context.Background()
232232
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
233233

234-
suite.Require().NoError(
235-
verifier.InsertChangeEventRecheckDoc(
236-
ctx,
237-
&ParsedEvent{
238-
OpType: "insert",
239-
Ns: &Namespace{DB: "mydb", Coll: "coll2"},
240-
DocKey: DocKey{
241-
ID: "heyhey",
242-
},
234+
suite.handleAndFlushChangeEvent(
235+
ctx,
236+
verifier,
237+
ParsedEvent{
238+
OpType: "insert",
239+
Ns: &Namespace{DB: "mydb", Coll: "coll2"},
240+
DocKey: DocKey{
241+
ID: "heyhey",
243242
},
244-
),
243+
},
245244
)
246245

247-
suite.Require().NoError(
248-
verifier.InsertChangeEventRecheckDoc(
249-
ctx,
250-
&ParsedEvent{
251-
ID: bson.M{
252-
"docID": "ID/docID",
253-
},
254-
OpType: "insert",
255-
Ns: &Namespace{DB: "mydb", Coll: "coll1"},
256-
DocKey: DocKey{
257-
ID: "hoohoo",
258-
},
246+
suite.handleAndFlushChangeEvent(
247+
ctx,
248+
verifier,
249+
ParsedEvent{
250+
ID: bson.M{
251+
"docID": "ID/docID",
252+
},
253+
OpType: "insert",
254+
Ns: &Namespace{DB: "mydb", Coll: "coll1"},
255+
DocKey: DocKey{
256+
ID: "hoohoo",
259257
},
260-
),
258+
},
261259
)
262260

263261
verifier.generation++
@@ -480,6 +478,14 @@ func (suite *MultiMetaVersionTestSuite) TestGetNamespaceStatistics_Gen0() {
480478
)
481479
}
482480

481+
func (suite *MultiMetaVersionTestSuite) handleAndFlushChangeEvent(ctx context.Context, verifier *Verifier, event ParsedEvent) {
482+
err := verifier.HandleChangeStreamEvent(&event)
483+
suite.Require().NoError(err)
484+
485+
err = verifier.flushAllBufferedChangeEventRechecks(ctx)
486+
suite.Require().NoError(err)
487+
}
488+
483489
func (suite *MultiMetaVersionTestSuite) TestFailedVerificationTaskInsertions() {
484490
ctx := context.Background()
485491
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
@@ -497,19 +503,16 @@ func (suite *MultiMetaVersionTestSuite) TestFailedVerificationTaskInsertions() {
497503
Coll: "bar2",
498504
},
499505
}
500-
err = verifier.HandleChangeStreamEvent(ctx, &event)
501-
suite.Require().NoError(err)
506+
507+
suite.handleAndFlushChangeEvent(ctx, verifier, event)
502508
event.OpType = "insert"
503-
err = verifier.HandleChangeStreamEvent(ctx, &event)
504-
suite.Require().NoError(err)
509+
suite.handleAndFlushChangeEvent(ctx, verifier, event)
505510
event.OpType = "replace"
506-
err = verifier.HandleChangeStreamEvent(ctx, &event)
507-
suite.Require().NoError(err)
511+
suite.handleAndFlushChangeEvent(ctx, verifier, event)
508512
event.OpType = "update"
509-
err = verifier.HandleChangeStreamEvent(ctx, &event)
510-
suite.Require().NoError(err)
513+
suite.handleAndFlushChangeEvent(ctx, verifier, event)
511514
event.OpType = "flibbity"
512-
err = verifier.HandleChangeStreamEvent(ctx, &event)
515+
err = verifier.HandleChangeStreamEvent(&event)
513516
badEventErr := UnknownEventError{}
514517
suite.Require().ErrorAs(err, &badEventErr)
515518
suite.Assert().Equal("flibbity", badEventErr.Event.OpType)

internal/verifier/recheck.go

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,6 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs(
3838
dbName, collName, documentIDs, dataSizes)
3939
}
4040

41-
func (verifier *Verifier) InsertChangeEventRecheckDoc(ctx context.Context, changeEvent *ParsedEvent) error {
42-
documentIDs := []interface{}{changeEvent.DocKey.ID}
43-
44-
// We don't know the document sizes for documents for all change events,
45-
// so just be conservative and assume they are maximum size.
46-
//
47-
// Note that this prevents us from being able to report a meaningful
48-
// total data size for noninitial generations in the log.
49-
dataSizes := []int{maxBSONObjSize}
50-
51-
return verifier.insertRecheckDocs(
52-
ctx, changeEvent.Ns.DB, changeEvent.Ns.Coll, documentIDs, dataSizes)
53-
}
54-
5541
func (verifier *Verifier) insertRecheckDocs(
5642
ctx context.Context,
5743
dbName, collName string, documentIDs []interface{}, dataSizes []int) error {
@@ -91,6 +77,11 @@ func (verifier *Verifier) insertRecheckDocs(
9177
verifier.logger.Debug().Msgf("Persisted %d recheck doc(s) for generation %d", len(models), generation)
9278
}
9379

80+
// Silence any duplicate key errors as recheck docs should have existed.
81+
if mongo.IsDuplicateKeyError(err) {
82+
err = nil
83+
}
84+
9485
return err
9586
}
9687

internal/verifier/recheck_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,7 @@ func (suite *MultiMetaVersionTestSuite) TestFailedCompareThenReplace() {
4949
},
5050
}
5151

52-
suite.Require().NoError(
53-
verifier.InsertChangeEventRecheckDoc(ctx, &event),
54-
"insert change event recheck",
55-
)
52+
suite.handleAndFlushChangeEvent(ctx, verifier, event)
5653

5754
recheckDocs = suite.fetchRecheckDocs(ctx, verifier)
5855
suite.Assert().Equal(

0 commit comments

Comments
 (0)