Skip to content

Commit ad69b4d

Browse files
authored
REP-5317 Add destination change stream (#53)
This PR adds a destination change stream to the MV. It also adds a `ChangeStreamReader` struct that reads change events from either source or destination. Change events are handled in change event handlers that are goroutines started for each of the change stream reader.
1 parent 8992698 commit ad69b4d

File tree

12 files changed

+608
-226
lines changed

12 files changed

+608
-226
lines changed

internal/verifier/change_stream.go

Lines changed: 189 additions & 76 deletions
Large diffs are not rendered by default.

internal/verifier/change_stream_test.go

Lines changed: 109 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,13 @@ package verifier
33
import (
44
"context"
55
"strings"
6-
"testing"
76
"time"
87

98
"github.com/10gen/migration-verifier/internal/testutil"
109
"github.com/10gen/migration-verifier/internal/util"
1110
"github.com/10gen/migration-verifier/mslices"
1211
"github.com/pkg/errors"
1312
"github.com/samber/lo"
14-
"github.com/stretchr/testify/assert"
1513
"github.com/stretchr/testify/require"
1614
"go.mongodb.org/mongo-driver/bson"
1715
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -20,18 +18,17 @@ import (
2018
"go.mongodb.org/mongo-driver/mongo/readconcern"
2119
)
2220

23-
func TestChangeStreamFilter(t *testing.T) {
24-
verifier := Verifier{}
25-
verifier.SetMetaDBName("metadb")
26-
assert.Contains(t,
27-
verifier.GetChangeStreamFilter(),
21+
func (suite *IntegrationTestSuite) TestChangeStreamFilter() {
22+
verifier := suite.BuildVerifier()
23+
suite.Assert().Contains(
24+
verifier.srcChangeStreamReader.GetChangeStreamFilter(),
2825
bson.D{
29-
{"$match", bson.D{{"ns.db", bson.D{{"$ne", "metadb"}}}}},
26+
{"$match", bson.D{{"ns.db", bson.D{{"$ne", metaDBName}}}}},
3027
},
3128
)
32-
verifier.srcNamespaces = []string{"foo.bar", "foo.baz", "test.car", "test.chaz"}
33-
assert.Contains(t,
34-
verifier.GetChangeStreamFilter(),
29+
verifier.srcChangeStreamReader.namespaces = []string{"foo.bar", "foo.baz", "test.car", "test.chaz"}
30+
suite.Assert().Contains(
31+
verifier.srcChangeStreamReader.GetChangeStreamFilter(),
3532
bson.D{{"$match", bson.D{
3633
{"$or", []bson.D{
3734
{{"ns", bson.D{{"db", "foo"}, {"coll", "bar"}}}},
@@ -43,6 +40,18 @@ func TestChangeStreamFilter(t *testing.T) {
4340
)
4441
}
4542

43+
func (suite *IntegrationTestSuite) startSrcChangeStreamReaderAndHandler(ctx context.Context, verifier *Verifier) {
44+
err := verifier.srcChangeStreamReader.StartChangeStream(ctx)
45+
suite.Require().NoError(err)
46+
go func() {
47+
err := verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader)
48+
if errors.Is(err, context.Canceled) {
49+
return
50+
}
51+
suite.Require().NoError(err)
52+
}()
53+
}
54+
4655
// TestChangeStreamResumability creates a verifier, starts its change stream,
4756
// terminates that verifier, updates the source cluster, starts a new
4857
// verifier with change stream, and confirms that things look as they should.
@@ -57,8 +66,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
5766
verifier1 := suite.BuildVerifier()
5867
ctx, cancel := context.WithCancel(suite.Context())
5968
defer cancel()
60-
err := verifier1.StartChangeStream(ctx)
61-
suite.Require().NoError(err)
69+
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier1)
6270
}()
6371

6472
ctx, cancel := context.WithCancel(suite.Context())
@@ -82,13 +90,12 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
8290

8391
newTime := suite.getClusterTime(ctx, suite.srcMongoClient)
8492

85-
err = verifier2.StartChangeStream(ctx)
86-
suite.Require().NoError(err)
93+
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier2)
8794

88-
suite.Require().NotNil(verifier2.srcStartAtTs)
95+
suite.Require().NotNil(verifier2.srcChangeStreamReader.startAtTs)
8996

9097
suite.Assert().False(
91-
verifier2.srcStartAtTs.After(newTime),
98+
verifier2.srcChangeStreamReader.startAtTs.After(newTime),
9299
"verifier2's change stream should be no later than this new session",
93100
)
94101

@@ -156,12 +163,11 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() {
156163
suite.Require().NoError(err)
157164
origStartTs := sess.OperationTime()
158165
suite.Require().NotNil(origStartTs)
159-
err = verifier.StartChangeStream(ctx)
160-
suite.Require().NoError(err)
161-
suite.Require().Equal(verifier.srcStartAtTs, origStartTs)
162-
verifier.changeStreamWritesOffTsChan <- *origStartTs
163-
<-verifier.changeStreamDoneChan
164-
suite.Require().Equal(verifier.srcStartAtTs, origStartTs)
166+
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
167+
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
168+
verifier.srcChangeStreamReader.writesOffTsChan <- *origStartTs
169+
<-verifier.srcChangeStreamReader.doneChan
170+
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
165171
}
166172

167173
func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
@@ -176,13 +182,12 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
176182

177183
origSessionTime := sess.OperationTime()
178184
suite.Require().NotNil(origSessionTime)
179-
err = verifier.StartChangeStream(ctx)
180-
suite.Require().NoError(err)
185+
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
181186

182187
// srcStartAtTs derives from the change stream’s resume token, which can
183188
// postdate our session time but should not precede it.
184189
suite.Require().False(
185-
verifier.srcStartAtTs.Before(*origSessionTime),
190+
verifier.srcChangeStreamReader.startAtTs.Before(*origSessionTime),
186191
"srcStartAtTs should be >= the insert’s optime",
187192
)
188193

@@ -206,12 +211,12 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
206211
"session time after events should exceed the original",
207212
)
208213

209-
verifier.changeStreamWritesOffTsChan <- *postEventsSessionTime
210-
<-verifier.changeStreamDoneChan
214+
verifier.srcChangeStreamReader.writesOffTsChan <- *postEventsSessionTime
215+
<-verifier.srcChangeStreamReader.doneChan
211216

212217
suite.Assert().Equal(
213218
*postEventsSessionTime,
214-
*verifier.srcStartAtTs,
219+
*verifier.srcChangeStreamReader.startAtTs,
215220
"verifier.srcStartAtTs should now be our session timestamp",
216221
)
217222
}
@@ -227,10 +232,9 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() {
227232
suite.Require().NoError(err)
228233
origStartTs := sess.OperationTime()
229234
suite.Require().NotNil(origStartTs)
230-
err = verifier.StartChangeStream(ctx)
231-
suite.Require().NoError(err)
232-
suite.Require().NotNil(verifier.srcStartAtTs)
233-
suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcStartAtTs), 0)
235+
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
236+
suite.Require().NotNil(verifier.srcChangeStreamReader.startAtTs)
237+
suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcChangeStreamReader.startAtTs), 0)
234238
}
235239

236240
func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
@@ -246,7 +250,7 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
246250

247251
verifier := suite.BuildVerifier()
248252

249-
suite.Require().NoError(verifier.StartChangeStream(ctx))
253+
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
250254

251255
_, err := coll1.InsertOne(ctx, bson.D{{"_id", 1}})
252256
suite.Require().NoError(err)
@@ -267,7 +271,6 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
267271
500*time.Millisecond,
268272
"the verifier should flush a recheck doc after a batch",
269273
)
270-
271274
}
272275

273276
func (suite *IntegrationTestSuite) TestCursorKilledResilience() {
@@ -451,6 +454,77 @@ func (suite *IntegrationTestSuite) TestCreateForbidden() {
451454
suite.Assert().Equal("create", eventErr.Event.OpType)
452455
}
453456

457+
func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() {
458+
ctx := suite.Context()
459+
460+
srcDBName := suite.DBNameForTest("src")
461+
dstDBName := suite.DBNameForTest("dst")
462+
463+
db := suite.dstMongoClient.Database(dstDBName)
464+
coll1 := db.Collection("dstColl1")
465+
coll2 := db.Collection("dstColl2")
466+
467+
for _, coll := range mslices.Of(coll1, coll2) {
468+
suite.Require().NoError(db.CreateCollection(ctx, coll.Name()))
469+
}
470+
471+
verifier := suite.BuildVerifier()
472+
verifier.SetSrcNamespaces([]string{srcDBName + ".srcColl1", srcDBName + ".srcColl2"})
473+
verifier.SetDstNamespaces([]string{dstDBName + ".dstColl1", dstDBName + ".dstColl2"})
474+
verifier.SetNamespaceMap()
475+
476+
suite.Require().NoError(verifier.dstChangeStreamReader.StartChangeStream(ctx))
477+
go func() {
478+
err := verifier.StartChangeEventHandler(ctx, verifier.dstChangeStreamReader)
479+
if errors.Is(err, context.Canceled) {
480+
return
481+
}
482+
suite.Require().NoError(err)
483+
}()
484+
485+
_, err := coll1.InsertOne(ctx, bson.D{{"_id", 1}})
486+
suite.Require().NoError(err)
487+
_, err = coll1.InsertOne(ctx, bson.D{{"_id", 2}})
488+
suite.Require().NoError(err)
489+
490+
_, err = coll2.InsertOne(ctx, bson.D{{"_id", 1}})
491+
suite.Require().NoError(err)
492+
493+
var rechecks []RecheckDoc
494+
require.Eventually(
495+
suite.T(),
496+
func() bool {
497+
recheckColl := verifier.verificationDatabase().Collection(recheckQueue)
498+
cursor, err := recheckColl.Find(ctx, bson.D{})
499+
if errors.Is(err, mongo.ErrNoDocuments) {
500+
return false
501+
}
502+
503+
suite.Require().NoError(err)
504+
suite.Require().NoError(cursor.All(ctx, &rechecks))
505+
return len(rechecks) == 3
506+
},
507+
time.Minute,
508+
500*time.Millisecond,
509+
"the verifier should flush a recheck doc after a batch",
510+
)
511+
512+
coll1RecheckCount, coll2RecheckCount := 0, 0
513+
for _, recheck := range rechecks {
514+
suite.Require().Equal(srcDBName, recheck.PrimaryKey.SrcDatabaseName)
515+
switch recheck.PrimaryKey.SrcCollectionName {
516+
case "srcColl1":
517+
coll1RecheckCount++
518+
case "srcColl2":
519+
coll2RecheckCount++
520+
default:
521+
suite.T().Fatalf("unknown collection name: %v", recheck.PrimaryKey.SrcCollectionName)
522+
}
523+
}
524+
suite.Require().Equal(2, coll1RecheckCount)
525+
suite.Require().Equal(1, coll2RecheckCount)
526+
}
527+
454528
func (suite *IntegrationTestSuite) TestLargeEvents() {
455529
ctx := suite.Context()
456530

internal/verifier/check.go

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,17 @@ func (verifier *Verifier) Check(ctx context.Context, filter map[string]any) {
4040
verifier.MaybeStartPeriodicHeapProfileCollection(ctx)
4141
}
4242

43-
func (verifier *Verifier) waitForChangeStream(ctx context.Context) error {
43+
func (verifier *Verifier) waitForChangeStream(ctx context.Context, csr *ChangeStreamReader) error {
4444
select {
4545
case <-ctx.Done():
4646
return ctx.Err()
47-
case err := <-verifier.changeStreamErrChan:
47+
case err := <-csr.errChan:
4848
verifier.logger.Warn().Err(err).
49-
Msg("Received error from change stream.")
49+
Msgf("Received error from %s.", csr)
5050
return err
51-
case <-verifier.changeStreamDoneChan:
51+
case <-csr.doneChan:
5252
verifier.logger.Debug().
53-
Msg("Received completion signal from change stream.")
53+
Msgf("Received completion signal from %s.", csr)
5454
break
5555
}
5656

@@ -82,8 +82,10 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error {
8282
// If the change stream fails, everything should stop.
8383
eg.Go(func() error {
8484
select {
85-
case err := <-verifier.changeStreamErrChan:
86-
return errors.Wrap(err, "change stream failed")
85+
case err := <-verifier.srcChangeStreamReader.errChan:
86+
return errors.Wrapf(err, "%s failed", verifier.srcChangeStreamReader)
87+
case err := <-verifier.dstChangeStreamReader.errChan:
88+
return errors.Wrapf(err, "%s failed", verifier.dstChangeStreamReader)
8789
case <-ctx.Done():
8890
return nil
8991
}
@@ -168,6 +170,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
168170
}
169171
verifier.running = true
170172
verifier.globalFilter = filter
173+
verifier.initializeChangeStreamReaders()
171174
verifier.mux.Unlock()
172175
defer func() {
173176
verifier.mux.Lock()
@@ -204,17 +207,20 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
204207
verifier.phase = Idle
205208
}()
206209

207-
verifier.mux.RLock()
208-
csRunning := verifier.changeStreamRunning
209-
verifier.mux.RUnlock()
210-
if csRunning {
211-
verifier.logger.Debug().Msg("Check: Change stream already running.")
212-
} else {
213-
verifier.logger.Debug().Msg("Change stream not running; starting change stream")
210+
ceHandlerGroup, groupCtx := errgroup.WithContext(ctx)
211+
for _, csReader := range []*ChangeStreamReader{verifier.srcChangeStreamReader, verifier.dstChangeStreamReader} {
212+
if csReader.changeStreamRunning {
213+
verifier.logger.Debug().Msgf("Check: %s already running.", csReader)
214+
} else {
215+
verifier.logger.Debug().Msgf("%s not running; starting change stream", csReader)
214216

215-
err = verifier.StartChangeStream(ctx)
216-
if err != nil {
217-
return errors.Wrap(err, "failed to start change stream on source")
217+
err = csReader.StartChangeStream(ctx)
218+
if err != nil {
219+
return errors.Wrapf(err, "failed to start %s", csReader)
220+
}
221+
ceHandlerGroup.Go(func() error {
222+
return verifier.StartChangeEventHandler(groupCtx, csReader)
223+
})
218224
}
219225
}
220226

@@ -279,13 +285,18 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
279285
// caught again on the next iteration.
280286
if verifier.writesOff {
281287
verifier.logger.Debug().
282-
Msg("Waiting for change stream to end.")
288+
Msg("Waiting for change streams to end.")
283289

284290
// It's necessary to wait for the change stream to finish before incrementing the
285291
// generation number, or the last changes will not be checked.
286292
verifier.mux.Unlock()
287-
err := verifier.waitForChangeStream(ctx)
288-
if err != nil {
293+
if err = verifier.waitForChangeStream(ctx, verifier.srcChangeStreamReader); err != nil {
294+
return err
295+
}
296+
if err = verifier.waitForChangeStream(ctx, verifier.dstChangeStreamReader); err != nil {
297+
return err
298+
}
299+
if err = ceHandlerGroup.Wait(); err != nil {
289300
return err
290301
}
291302
verifier.mux.Lock()

internal/verifier/compare.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks(
299299
ctx,
300300
verifier.srcClientCollection(task),
301301
verifier.srcClusterInfo,
302-
verifier.srcStartAtTs,
302+
verifier.srcChangeStreamReader.startAtTs,
303303
task,
304304
)
305305

@@ -325,7 +325,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks(
325325
ctx,
326326
verifier.dstClientCollection(task),
327327
verifier.dstClusterInfo,
328-
nil, //startAtTs
328+
verifier.dstChangeStreamReader.startAtTs,
329329
task,
330330
)
331331

internal/verifier/integration_test_suite.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ func (suite *IntegrationTestSuite) TearDownTest() {
118118

119119
suite.contextCanceller(errors.Errorf("tearing down test %#q", suite.T().Name()))
120120
suite.testContext, suite.contextCanceller = nil, nil
121-
122121
ctx := context.Background()
123122
for _, client := range []*mongo.Client{suite.srcMongoClient, suite.dstMongoClient} {
124123
dbNames, err := client.ListDatabaseNames(ctx, bson.D{})
@@ -171,6 +170,7 @@ func (suite *IntegrationTestSuite) BuildVerifier() *Verifier {
171170
"should set metadata connection string",
172171
)
173172
verifier.SetMetaDBName(metaDBName)
173+
verifier.initializeChangeStreamReaders()
174174

175175
suite.Require().NoError(verifier.srcClientCollection(&task).Drop(ctx))
176176
suite.Require().NoError(verifier.dstClientCollection(&task).Drop(ctx))

0 commit comments

Comments
 (0)