Skip to content

Commit 86e8fb4

Browse files
committed
add test
1 parent 28a62d1 commit 86e8fb4

File tree

5 files changed

+97
-12
lines changed

5 files changed

+97
-12
lines changed

internal/verifier/change_stream.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,6 @@ import (
1515
"golang.org/x/exp/constraints"
1616
)
1717

18-
// ChangeEventRecheckBuffer contains rechecks from change events as a map namespace -> document _ids, and
19-
type ChangeEventRecheckBuffer struct {
20-
buf map[string][]interface{}
21-
bufSize map[string]uint64
22-
}
23-
2418
// ParsedEvent contains the fields of an event that we have parsed from 'bson.Raw'.
2519
type ParsedEvent struct {
2620
ID interface{} `bson:"_id"`
@@ -40,8 +34,16 @@ type DocKey struct {
4034
ID interface{} `bson:"_id"`
4135
}
4236

37+
// ChangeEventRecheckBuffer contains rechecks from change events as a map namespace -> document _ids, and
38+
type ChangeEventRecheckBuffer struct {
39+
buf map[string][]interface{}
40+
bufSize map[string]uint64
41+
}
42+
43+
// minChangeStreamCheckpointInterval is a var instead of a const for tests to overwrite.
44+
var minChangeStreamCheckpointInterval = time.Second * 10
45+
4346
const (
44-
minChangeStreamCheckpointInterval = time.Second * 10
4547
metadataChangeStreamCollectionName = "changeStream"
4648
)
4749

@@ -112,6 +114,14 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
112114
return nil
113115
}
114116

117+
// Do a final flush of the buffered change event rechecks before the function returns.
118+
defer func() {
119+
err := verifier.flushAllBufferedChangeEventRechecks(ctx)
120+
if err != nil {
121+
verifier.changeStreamErrChan <- err
122+
}
123+
}()
124+
115125
for {
116126
var err error
117127

internal/verifier/change_stream_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package verifier
22

33
import (
44
"context"
5+
"github.com/rs/zerolog"
6+
"strings"
57
"testing"
68
"time"
79

@@ -198,3 +200,69 @@ func (suite *MultiSourceVersionTestSuite) TestNoStartAtTime() {
198200
suite.Require().NotNil(verifier.srcStartAtTs)
199201
suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcStartAtTs), 0)
200202
}
203+
204+
func (suite *MultiSourceVersionTestSuite) TestBatchInsertChangeEventRecheckDocs() {
205+
zerolog.SetGlobalLevel(zerolog.DebugLevel)
206+
207+
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
208+
209+
ctx := context.Background()
210+
vCtx, cancel := context.WithCancel(ctx)
211+
212+
// Don't do a checkpoint for this test.
213+
origInterval := minChangeStreamCheckpointInterval
214+
minChangeStreamCheckpointInterval = 10 * time.Hour
215+
defer func() {
216+
minChangeStreamCheckpointInterval = origInterval
217+
}()
218+
219+
err := verifier.StartChangeStream(vCtx)
220+
suite.Require().NoError(err)
221+
222+
// A large recheck document should be flushed immediately.
223+
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne(
224+
ctx,
225+
bson.D{{"_id", strings.Repeat("a", 4*1024*1024)}},
226+
)
227+
suite.Require().NoError(err)
228+
require.Eventually(
229+
suite.T(),
230+
func() bool {
231+
return len(suite.fetchVerifierRechecks(ctx, verifier)) == 1
232+
},
233+
time.Minute,
234+
500*time.Millisecond,
235+
"the verifier should flush a recheck",
236+
)
237+
suite.Require().Empty(verifier.changeEventRecheckBuf.buf["testDB.testColl"])
238+
239+
// A small recheck document should be buffered in-memory.
240+
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne(
241+
ctx,
242+
bson.D{{"_id", 0}},
243+
)
244+
suite.Require().NoError(err)
245+
require.Eventually(
246+
suite.T(),
247+
func() bool {
248+
suite.Require().Len(suite.fetchVerifierRechecks(ctx, verifier), 1)
249+
return len(verifier.changeEventRecheckBuf.buf["testDB.testColl"]) == 1
250+
},
251+
time.Minute,
252+
500*time.Millisecond,
253+
"the verifier should buffer a recheck",
254+
)
255+
256+
// Any recheck docs remaining in the buffer should be flushed before the change stream reader exits.
257+
cancel()
258+
require.Eventually(
259+
suite.T(),
260+
func() bool {
261+
return len(suite.fetchVerifierRechecks(ctx, verifier)) == 2
262+
},
263+
time.Minute,
264+
500*time.Millisecond,
265+
"the verifier should have flushed all recheck docs",
266+
)
267+
suite.Require().Empty(verifier.changeEventRecheckBuf.buf["testDB.testColl"])
268+
}

internal/verifier/migration_verifier.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ type Verifier struct {
139139

140140
pprofInterval time.Duration
141141

142-
changeEventRecheckBuf ChangeEventRecheckBuffer
142+
changeEventRecheckBuf *ChangeEventRecheckBuffer
143143
}
144144

145145
// VerificationStatus holds the Verification Status
@@ -199,6 +199,10 @@ func NewVerifier(settings VerifierSettings) *Verifier {
199199
changeStreamErrChan: make(chan error),
200200
changeStreamDoneChan: make(chan struct{}),
201201
readConcernSetting: readConcern,
202+
changeEventRecheckBuf: &ChangeEventRecheckBuffer{
203+
buf: make(map[string][]interface{}),
204+
bufSize: make(map[string]uint64),
205+
},
202206
}
203207
}
204208

internal/verifier/migration_verifier_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func (suite *MultiMetaVersionTestSuite) TestGetNamespaceStatistics_Recheck() {
232232
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
233233

234234
suite.Require().NoError(
235-
verifier.InsertChangeEventRecheckDoc(
235+
verifier.AddAndMaybeFlushChangeEventRecheckDoc(
236236
ctx,
237237
&ParsedEvent{
238238
OpType: "insert",
@@ -245,7 +245,7 @@ func (suite *MultiMetaVersionTestSuite) TestGetNamespaceStatistics_Recheck() {
245245
)
246246

247247
suite.Require().NoError(
248-
verifier.InsertChangeEventRecheckDoc(
248+
verifier.AddAndMaybeFlushChangeEventRecheckDoc(
249249
ctx,
250250
&ParsedEvent{
251251
ID: bson.M{
@@ -260,6 +260,8 @@ func (suite *MultiMetaVersionTestSuite) TestGetNamespaceStatistics_Recheck() {
260260
),
261261
)
262262

263+
suite.Require().NoError(verifier.flushAllBufferedChangeEventRechecks(ctx))
264+
263265
verifier.generation++
264266

265267
func() {

internal/verifier/recheck.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,9 @@ func (verifier *Verifier) AddAndMaybeFlushChangeEventRecheckDoc(ctx context.Cont
5050
}
5151
verifier.changeEventRecheckBuf.bufSize[namespace] += uint64(len(namespace) + len(bsonID))
5252

53-
// Flush all recheck documents once a buffer reaches 5 MB.
54-
if verifier.changeEventRecheckBuf.bufSize[changeEvent.Ns.String()] > 5*1024*1024 {
53+
// Flush all recheck documents once a buffer reaches 4 MB. It is a conservative threshold
54+
// to prevent a recheck task document exceeding 16MB size limit.
55+
if verifier.changeEventRecheckBuf.bufSize[namespace] > 4*1024*1024 {
5556
if err := verifier.flushChangeEventRechecksForNamespace(ctx, namespace); err != nil {
5657
return err
5758
}

0 commit comments

Comments
 (0)