Skip to content

Commit 2ae4abc

Browse files
committed
Merge branch 'main' into REP-5317-add-dst-change-stream
2 parents 733220c + 365482d commit 2ae4abc

File tree

7 files changed

+144
-54
lines changed

7 files changed

+144
-54
lines changed

internal/partitions/partition.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (p *Partition) FindCmd(
137137
// (e.g. use the partitions on the source to read the destination for verification)
138138
// If the passed-in buildinfo indicates a mongodb version < 5.0, type bracketing is not used.
139139
// filterAndPredicates is a slice of filter criteria that's used to construct the "filter" field in the find option.
140-
func (p *Partition) GetFindOptions(buildInfo *bson.M, filterAndPredicates bson.A) bson.D {
140+
func (p *Partition) GetFindOptions(buildInfo *util.BuildInfo, filterAndPredicates bson.A) bson.D {
141141
if p == nil {
142142
if len(filterAndPredicates) > 0 {
143143
return bson.D{{"filter", bson.D{{"$and", filterAndPredicates}}}}
@@ -160,16 +160,9 @@ func (p *Partition) GetFindOptions(buildInfo *bson.M, filterAndPredicates bson.A
160160
allowTypeBracketing := false
161161
if buildInfo != nil {
162162
allowTypeBracketing = true
163-
versionArray, ok := (*buildInfo)["versionArray"].(bson.A)
164-
//bson values are int32 or int64, never int.
165-
if ok {
166-
majorVersion, ok := versionArray[0].(int32)
167-
if ok {
168-
allowTypeBracketing = majorVersion < 5
169-
} else {
170-
majorVersion64, _ := versionArray[0].(int64)
171-
allowTypeBracketing = majorVersion64 < 5
172-
}
163+
164+
if buildInfo.VersionArray != nil {
165+
allowTypeBracketing = buildInfo.VersionArray[0] < 5
173166
}
174167
}
175168
if !allowTypeBracketing {

internal/partitions/partition_test.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,43 +77,38 @@ func (suite *UnitTestSuite) TestVersioning() {
7777
filter := getFilterFromFindOptions(findOptions)
7878
suite.Require().Equal(expectedFilter, filter)
7979

80-
// 6.0 (int64)
81-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int64(6), int64(0), int64(0), int64(0)}}, nil)
82-
filter = getFilterFromFindOptions(findOptions)
83-
suite.Require().Equal(expectedFilter, filter)
84-
8580
// 6.0
86-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int32(6), int32(0), int32(0), int32(0)}}, nil)
81+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{6, 0, 0}}, nil)
8782
filter = getFilterFromFindOptions(findOptions)
8883
suite.Require().Equal(expectedFilter, filter)
8984

9085
// 5.3.0.9
91-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int32(5), int32(3), int32(0), int32(9)}}, nil)
86+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{5, 3, 0, 9}}, nil)
9287
filter = getFilterFromFindOptions(findOptions)
9388
suite.Require().Equal(expectedFilter, filter)
9489

9590
// 7.1.3.5
96-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int32(7), int32(1), int32(3), int32(5)}}, nil)
91+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{7, 1, 3, 5}}, nil)
9792
filter = getFilterFromFindOptions(findOptions)
9893
suite.Require().Equal(expectedFilter, filter)
9994

10095
// 4.4 (int64)
101-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int64(4), int64(4), int64(0), int64(0)}}, nil)
96+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
10297
filter = getFilterFromFindOptions(findOptions)
10398
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
10499

105100
// 4.4
106-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int32(4), int32(4), int32(0), int32(0)}}, nil)
101+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
107102
filter = getFilterFromFindOptions(findOptions)
108103
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
109104

110105
// 4.2
111-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int32(4), int32(2), int32(0), int32(0)}}, nil)
106+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 2, 0, 0}}, nil)
112107
filter = getFilterFromFindOptions(findOptions)
113108
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
114109

115110
// No version array -- assume old, require type bracketing.
116-
findOptions = partition.GetFindOptions(&bson.M{"notVersionArray": bson.A{6, int32(0), int32(0), int32(0)}}, nil)
111+
findOptions = partition.GetFindOptions(&util.BuildInfo{}, nil)
117112
filter = getFilterFromFindOptions(findOptions)
118113
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
119114
}

internal/util/buildinfo.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package util
2+
3+
import (
4+
"context"
5+
6+
"github.com/10gen/migration-verifier/mbson"
7+
"github.com/pkg/errors"
8+
"go.mongodb.org/mongo-driver/bson"
9+
"go.mongodb.org/mongo-driver/mongo"
10+
)
11+
12+
type BuildInfo struct {
13+
VersionArray []int
14+
}
15+
16+
func GetBuildInfo(ctx context.Context, client *mongo.Client) (BuildInfo, error) {
17+
commandResult := client.Database("admin").RunCommand(ctx, bson.D{{"buildinfo", 1}})
18+
19+
rawResp, err := commandResult.Raw()
20+
if err != nil {
21+
return BuildInfo{}, errors.Wrap(err, "failed to fetch build info")
22+
}
23+
24+
bi := BuildInfo{}
25+
_, err = mbson.RawLookup(rawResp, &bi.VersionArray, "versionArray")
26+
if err != nil {
27+
return BuildInfo{}, errors.Wrap(err, "failed to decode build info version array")
28+
}
29+
30+
return bi, nil
31+
}

internal/verifier/change_stream.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,10 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
385385
SetMaxAwaitTime(1 * time.Second).
386386
SetFullDocument(options.UpdateLookup)
387387

388+
if verifier.srcBuildInfo.VersionArray[0] >= 6 {
389+
opts = opts.SetCustomPipeline(bson.M{"showExpandedEvents": true})
390+
}
391+
388392
savedResumeToken, err := csr.loadChangeStreamResumeToken(ctx)
389393
if err != nil {
390394
return errors.Wrap(err, "failed to load persisted change stream resume token")

internal/verifier/change_stream_test.go

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"testing"
66
"time"
77

8+
"github.com/10gen/migration-verifier/internal/util"
9+
"github.com/10gen/migration-verifier/mslices"
810
"github.com/pkg/errors"
911
"github.com/samber/lo"
1012
"github.com/stretchr/testify/require"
@@ -36,6 +38,12 @@ func TestChangeStreamFilter(t *testing.T) {
3638
// terminates that verifier, updates the source cluster, starts a new
3739
// verifier with change stream, and confirms that things look as they should.
3840
func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
41+
suite.Require().NoError(
42+
suite.srcMongoClient.
43+
Database(suite.DBNameForTest()).
44+
CreateCollection(suite.Context(), "testColl"),
45+
)
46+
3947
func() {
4048
verifier1 := suite.BuildVerifier()
4149
ctx, cancel := context.WithCancel(context.Background())
@@ -44,7 +52,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
4452
suite.Require().NoError(err)
4553
}()
4654

47-
ctx, cancel := context.WithCancel(context.Background())
55+
ctx, cancel := context.WithCancel(suite.Context())
4856
defer cancel()
4957

5058
_, err := suite.srcMongoClient.
@@ -220,19 +228,26 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() {
220228
}
221229

222230
func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
223-
verifier := suite.BuildVerifier()
231+
ctx := suite.Context()
224232

225-
ctx, cancel := context.WithCancel(context.Background())
226-
defer cancel()
233+
db := suite.srcMongoClient.Database(suite.DBNameForTest())
234+
coll1 := db.Collection("testColl1")
235+
coll2 := db.Collection("testColl2")
236+
237+
for _, coll := range mslices.Of(coll1, coll2) {
238+
suite.Require().NoError(db.CreateCollection(ctx, coll.Name()))
239+
}
240+
241+
verifier := suite.BuildVerifier()
227242

228243
suite.Require().NoError(verifier.srcChangeStreamReader.StartChangeStream(ctx))
229244

230-
_, err := suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 1}})
245+
_, err := coll1.InsertOne(ctx, bson.D{{"_id", 1}})
231246
suite.Require().NoError(err)
232-
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 2}})
247+
_, err = coll1.InsertOne(ctx, bson.D{{"_id", 2}})
233248
suite.Require().NoError(err)
234249

235-
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl2").InsertOne(ctx, bson.D{{"_id", 1}})
250+
_, err = coll2.InsertOne(ctx, bson.D{{"_id", 1}})
236251
suite.Require().NoError(err)
237252

238253
var rechecks []bson.M
@@ -246,6 +261,7 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
246261
500*time.Millisecond,
247262
"the verifier should flush a recheck doc after a batch",
248263
)
264+
249265
}
250266

251267
func (suite *IntegrationTestSuite) TestManyInsertsBeforeWritesOff() {
@@ -305,3 +321,40 @@ func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {
305321

306322
suite.Assert().Equal(docsCount, totalFailed, "all source docs should be missing")
307323
}
324+
325+
func (suite *IntegrationTestSuite) TestCreateForbidden() {
326+
ctx := suite.Context()
327+
buildInfo, err := util.GetBuildInfo(ctx, suite.srcMongoClient)
328+
suite.Require().NoError(err)
329+
330+
if buildInfo.VersionArray[0] < 6 {
331+
suite.T().Skipf("This test requires server v6+. (Found: %v)", buildInfo.VersionArray)
332+
}
333+
334+
verifier := suite.BuildVerifier()
335+
336+
// start verifier
337+
verifierRunner := RunVerifierCheck(suite.Context(), suite.T(), verifier)
338+
339+
// wait for generation 0 to end
340+
verifierRunner.AwaitGenerationEnd()
341+
342+
db := suite.srcMongoClient.Database(suite.DBNameForTest())
343+
coll := db.Collection("mycoll")
344+
suite.Require().NoError(
345+
db.CreateCollection(ctx, coll.Name()),
346+
)
347+
348+
// The error from the create event will come either at WritesOff
349+
// or when we finalize the change stream.
350+
err = verifier.WritesOff(ctx)
351+
if err == nil {
352+
err = verifierRunner.Await()
353+
}
354+
355+
suite.Require().Error(err, "should detect forbidden create event")
356+
357+
eventErr := UnknownEventError{}
358+
suite.Require().ErrorAs(err, &eventErr)
359+
suite.Assert().Equal("create", eventErr.Event.OpType)
360+
}

internal/verifier/check.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,9 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
251251
// paying attention. Also, this should not matter too much because any failures will be
252252
// caught again on the next iteration.
253253
if verifier.writesOff {
254+
verifier.logger.Debug().
255+
Msg("Waiting for change stream to end.")
256+
254257
// It's necessary to wait for the change stream to finish before incrementing the
255258
// generation number, or the last changes will not be checked.
256259
verifier.mux.Unlock()

internal/verifier/migration_verifier.go

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ type Verifier struct {
9696
metaClient *mongo.Client
9797
srcClient *mongo.Client
9898
dstClient *mongo.Client
99-
srcBuildInfo *bson.M
100-
dstBuildInfo *bson.M
99+
srcBuildInfo *util.BuildInfo
100+
dstBuildInfo *util.BuildInfo
101101
numWorkers int
102102
failureDisplaySize int64
103103

@@ -270,8 +270,20 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
270270
return errors.Wrapf(err, "failed to fetch destination's cluster time")
271271
}
272272

273-
verifier.srcChangeStreamReader.ChangeStreamWritesOffTsChan <- srcFinalTs
274-
verifier.dstChangeStreamReader.ChangeStreamWritesOffTsChan <- dstFinalTs
273+
// This has to happen under the lock because the change stream
274+
// might be inserting docs into the recheck queue, which happens
275+
// under the lock.
276+
select {
277+
case verifier.srcChangeStreamReader.ChangeStreamWritesOffTsChan <- srcFinalTs:
278+
case err := <-verifier.srcChangeStreamReader.ChangeStreamErrChan:
279+
return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.srcChangeStreamReader)
280+
}
281+
282+
select {
283+
case verifier.dstChangeStreamReader.ChangeStreamWritesOffTsChan <- dstFinalTs:
284+
case err := <-verifier.dstChangeStreamReader.ChangeStreamErrChan:
285+
return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.dstChangeStreamReader)
286+
}
275287

276288
return nil
277289
}
@@ -315,21 +327,33 @@ func (verifier *Verifier) SetSrcURI(ctx context.Context, uri string) error {
315327
var err error
316328
verifier.srcClient, err = mongo.Connect(ctx, opts)
317329
if err != nil {
318-
return err
330+
return errors.Wrapf(err, "failed to connect to source %#q", uri)
319331
}
320-
verifier.srcBuildInfo, err = getBuildInfo(ctx, verifier.srcClient)
321-
return err
332+
333+
buildInfo, err := util.GetBuildInfo(ctx, verifier.srcClient)
334+
if err != nil {
335+
return errors.Wrap(err, "failed to read source build info")
336+
}
337+
338+
verifier.srcBuildInfo = &buildInfo
339+
return nil
322340
}
323341

324342
func (verifier *Verifier) SetDstURI(ctx context.Context, uri string) error {
325343
opts := verifier.getClientOpts(uri)
326344
var err error
327345
verifier.dstClient, err = mongo.Connect(ctx, opts)
328346
if err != nil {
329-
return err
347+
return errors.Wrapf(err, "failed to connect to destination %#q", uri)
330348
}
331-
verifier.dstBuildInfo, err = getBuildInfo(ctx, verifier.dstClient)
332-
return err
349+
350+
buildInfo, err := util.GetBuildInfo(ctx, verifier.dstClient)
351+
if err != nil {
352+
return errors.Wrap(err, "failed to read destination build info")
353+
}
354+
355+
verifier.dstBuildInfo = &buildInfo
356+
return nil
333357
}
334358

335359
func (verifier *Verifier) SetServerPort(port int) {
@@ -462,7 +486,7 @@ func (verifier *Verifier) maybeAppendGlobalFilterToPredicates(predicates bson.A)
462486
return append(predicates, verifier.globalFilter)
463487
}
464488

465-
func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, buildInfo *bson.M,
489+
func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, buildInfo *util.BuildInfo,
466490
startAtTs *primitive.Timestamp, task *VerificationTask) (*mongo.Cursor, error) {
467491
var findOptions bson.D
468492
runCommandOptions := options.RunCmd()
@@ -1515,16 +1539,3 @@ func (verifier *Verifier) getNamespaces(ctx context.Context, fieldName string) (
15151539
}
15161540
return namespaces, nil
15171541
}
1518-
1519-
func getBuildInfo(ctx context.Context, client *mongo.Client) (*bson.M, error) {
1520-
commandResult := client.Database("admin").RunCommand(ctx, bson.D{{"buildinfo", 1}})
1521-
if commandResult.Err() != nil {
1522-
return nil, commandResult.Err()
1523-
}
1524-
var buildInfoMap bson.M
1525-
err := commandResult.Decode(&buildInfoMap)
1526-
if err != nil {
1527-
return nil, err
1528-
}
1529-
return &buildInfoMap, nil
1530-
}

0 commit comments

Comments
 (0)