Skip to content

Commit 365482d

Browse files
authored
Set showExpandedEvents in change streams for server v6+. (#54)
Since migration-verifier forbids DDL events during verification, it should set `showExpandedEvents` so that it can fail on events like `create` or `shardCollection`. This changeset sets that flag. This also redoes version detection to avoid typing weirdness. For convenience, this copies mslices.Of() from mongosync.
1 parent 9eacb63 commit 365482d

File tree

7 files changed

+130
-52
lines changed

7 files changed

+130
-52
lines changed

internal/partitions/partition.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (p *Partition) FindCmd(
137137
// (e.g. use the partitions on the source to read the destination for verification)
138138
// If the passed-in buildinfo indicates a mongodb version < 5.0, type bracketing is not used.
139139
// filterAndPredicates is a slice of filter criteria that's used to construct the "filter" field in the find option.
140-
func (p *Partition) GetFindOptions(buildInfo *bson.M, filterAndPredicates bson.A) bson.D {
140+
func (p *Partition) GetFindOptions(buildInfo *util.BuildInfo, filterAndPredicates bson.A) bson.D {
141141
if p == nil {
142142
if len(filterAndPredicates) > 0 {
143143
return bson.D{{"filter", bson.D{{"$and", filterAndPredicates}}}}
@@ -160,16 +160,9 @@ func (p *Partition) GetFindOptions(buildInfo *bson.M, filterAndPredicates bson.A
160160
allowTypeBracketing := false
161161
if buildInfo != nil {
162162
allowTypeBracketing = true
163-
versionArray, ok := (*buildInfo)["versionArray"].(bson.A)
164-
//bson values are int32 or int64, never int.
165-
if ok {
166-
majorVersion, ok := versionArray[0].(int32)
167-
if ok {
168-
allowTypeBracketing = majorVersion < 5
169-
} else {
170-
majorVersion64, _ := versionArray[0].(int64)
171-
allowTypeBracketing = majorVersion64 < 5
172-
}
163+
164+
if buildInfo.VersionArray != nil {
165+
allowTypeBracketing = buildInfo.VersionArray[0] < 5
173166
}
174167
}
175168
if !allowTypeBracketing {

internal/partitions/partition_test.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,43 +77,38 @@ func (suite *UnitTestSuite) TestVersioning() {
7777
filter := getFilterFromFindOptions(findOptions)
7878
suite.Require().Equal(expectedFilter, filter)
7979

80-
// 6.0 (int64)
81-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int64(6), int64(0), int64(0), int64(0)}}, nil)
82-
filter = getFilterFromFindOptions(findOptions)
83-
suite.Require().Equal(expectedFilter, filter)
84-
8580
// 6.0
86-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int32(6), int32(0), int32(0), int32(0)}}, nil)
81+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{6, 0, 0}}, nil)
8782
filter = getFilterFromFindOptions(findOptions)
8883
suite.Require().Equal(expectedFilter, filter)
8984

9085
// 5.3.0.9
91-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int32(5), int32(3), int32(0), int32(9)}}, nil)
86+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{5, 3, 0, 9}}, nil)
9287
filter = getFilterFromFindOptions(findOptions)
9388
suite.Require().Equal(expectedFilter, filter)
9489

9590
// 7.1.3.5
96-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int32(7), int32(1), int32(3), int32(5)}}, nil)
91+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{7, 1, 3, 5}}, nil)
9792
filter = getFilterFromFindOptions(findOptions)
9893
suite.Require().Equal(expectedFilter, filter)
9994

10095
// 4.4 (int64)
101-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int64(4), int64(4), int64(0), int64(0)}}, nil)
96+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
10297
filter = getFilterFromFindOptions(findOptions)
10398
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
10499

105100
// 4.4
106-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int32(4), int32(4), int32(0), int32(0)}}, nil)
101+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
107102
filter = getFilterFromFindOptions(findOptions)
108103
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
109104

110105
// 4.2
111-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int32(4), int32(2), int32(0), int32(0)}}, nil)
106+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 2, 0, 0}}, nil)
112107
filter = getFilterFromFindOptions(findOptions)
113108
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
114109

115110
// No version array -- assume old, require type bracketing.
116-
findOptions = partition.GetFindOptions(&bson.M{"notVersionArray": bson.A{6, int32(0), int32(0), int32(0)}}, nil)
111+
findOptions = partition.GetFindOptions(&util.BuildInfo{}, nil)
117112
filter = getFilterFromFindOptions(findOptions)
118113
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
119114
}

internal/util/buildinfo.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package util
2+
3+
import (
4+
"context"
5+
6+
"github.com/10gen/migration-verifier/mbson"
7+
"github.com/pkg/errors"
8+
"go.mongodb.org/mongo-driver/bson"
9+
"go.mongodb.org/mongo-driver/mongo"
10+
)
11+
12+
type BuildInfo struct {
13+
VersionArray []int
14+
}
15+
16+
func GetBuildInfo(ctx context.Context, client *mongo.Client) (BuildInfo, error) {
17+
commandResult := client.Database("admin").RunCommand(ctx, bson.D{{"buildinfo", 1}})
18+
19+
rawResp, err := commandResult.Raw()
20+
if err != nil {
21+
return BuildInfo{}, errors.Wrap(err, "failed to fetch build info")
22+
}
23+
24+
bi := BuildInfo{}
25+
_, err = mbson.RawLookup(rawResp, &bi.VersionArray, "versionArray")
26+
if err != nil {
27+
return BuildInfo{}, errors.Wrap(err, "failed to decode build info version array")
28+
}
29+
30+
return bi, nil
31+
}

internal/verifier/change_stream.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,10 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
293293
SetMaxAwaitTime(1 * time.Second).
294294
SetFullDocument(options.UpdateLookup)
295295

296+
if verifier.srcBuildInfo.VersionArray[0] >= 6 {
297+
opts = opts.SetCustomPipeline(bson.M{"showExpandedEvents": true})
298+
}
299+
296300
savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx)
297301
if err != nil {
298302
return errors.Wrap(err, "failed to load persisted change stream resume token")

internal/verifier/change_stream_test.go

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"testing"
66
"time"
77

8+
"github.com/10gen/migration-verifier/internal/util"
9+
"github.com/10gen/migration-verifier/mslices"
810
"github.com/pkg/errors"
911
"github.com/samber/lo"
1012
"github.com/stretchr/testify/require"
@@ -35,6 +37,12 @@ func TestChangeStreamFilter(t *testing.T) {
3537
// terminates that verifier, updates the source cluster, starts a new
3638
// verifier with change stream, and confirms that things look as they should.
3739
func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
40+
suite.Require().NoError(
41+
suite.srcMongoClient.
42+
Database(suite.DBNameForTest()).
43+
CreateCollection(suite.Context(), "testColl"),
44+
)
45+
3846
func() {
3947
verifier1 := suite.BuildVerifier()
4048
ctx, cancel := context.WithCancel(context.Background())
@@ -43,7 +51,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
4351
suite.Require().NoError(err)
4452
}()
4553

46-
ctx, cancel := context.WithCancel(context.Background())
54+
ctx, cancel := context.WithCancel(suite.Context())
4755
defer cancel()
4856

4957
_, err := suite.srcMongoClient.
@@ -219,19 +227,26 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() {
219227
}
220228

221229
func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
222-
verifier := suite.BuildVerifier()
230+
ctx := suite.Context()
223231

224-
ctx, cancel := context.WithCancel(context.Background())
225-
defer cancel()
232+
db := suite.srcMongoClient.Database(suite.DBNameForTest())
233+
coll1 := db.Collection("testColl1")
234+
coll2 := db.Collection("testColl2")
235+
236+
for _, coll := range mslices.Of(coll1, coll2) {
237+
suite.Require().NoError(db.CreateCollection(ctx, coll.Name()))
238+
}
239+
240+
verifier := suite.BuildVerifier()
226241

227242
suite.Require().NoError(verifier.StartChangeStream(ctx))
228243

229-
_, err := suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 1}})
244+
_, err := coll1.InsertOne(ctx, bson.D{{"_id", 1}})
230245
suite.Require().NoError(err)
231-
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 2}})
246+
_, err = coll1.InsertOne(ctx, bson.D{{"_id", 2}})
232247
suite.Require().NoError(err)
233248

234-
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl2").InsertOne(ctx, bson.D{{"_id", 1}})
249+
_, err = coll2.InsertOne(ctx, bson.D{{"_id", 1}})
235250
suite.Require().NoError(err)
236251

237252
var rechecks []bson.M
@@ -245,6 +260,7 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
245260
500*time.Millisecond,
246261
"the verifier should flush a recheck doc after a batch",
247262
)
263+
248264
}
249265

250266
func (suite *IntegrationTestSuite) TestManyInsertsBeforeWritesOff() {
@@ -304,3 +320,40 @@ func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {
304320

305321
suite.Assert().Equal(docsCount, totalFailed, "all source docs should be missing")
306322
}
323+
324+
func (suite *IntegrationTestSuite) TestCreateForbidden() {
325+
ctx := suite.Context()
326+
buildInfo, err := util.GetBuildInfo(ctx, suite.srcMongoClient)
327+
suite.Require().NoError(err)
328+
329+
if buildInfo.VersionArray[0] < 6 {
330+
suite.T().Skipf("This test requires server v6+. (Found: %v)", buildInfo.VersionArray)
331+
}
332+
333+
verifier := suite.BuildVerifier()
334+
335+
// start verifier
336+
verifierRunner := RunVerifierCheck(suite.Context(), suite.T(), verifier)
337+
338+
// wait for generation 0 to end
339+
verifierRunner.AwaitGenerationEnd()
340+
341+
db := suite.srcMongoClient.Database(suite.DBNameForTest())
342+
coll := db.Collection("mycoll")
343+
suite.Require().NoError(
344+
db.CreateCollection(ctx, coll.Name()),
345+
)
346+
347+
// The error from the create event will come either at WritesOff
348+
// or when we finalize the change stream.
349+
err = verifier.WritesOff(ctx)
350+
if err == nil {
351+
err = verifierRunner.Await()
352+
}
353+
354+
suite.Require().Error(err, "should detect forbidden create event")
355+
356+
eventErr := UnknownEventError{}
357+
suite.Require().ErrorAs(err, &eventErr)
358+
suite.Assert().Equal("create", eventErr.Event.OpType)
359+
}

internal/verifier/check.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
236236
// paying attention. Also, this should not matter too much because any failures will be
237237
// caught again on the next iteration.
238238
if verifier.writesOff {
239+
verifier.logger.Debug().
240+
Msg("Waiting for change stream to end.")
241+
239242
// It's necessary to wait for the change stream to finish before incrementing the
240243
// generation number, or the last changes will not be checked.
241244
verifier.mux.Unlock()

internal/verifier/migration_verifier.go

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ type Verifier struct {
8989
metaClient *mongo.Client
9090
srcClient *mongo.Client
9191
dstClient *mongo.Client
92-
srcBuildInfo *bson.M
93-
dstBuildInfo *bson.M
92+
srcBuildInfo *util.BuildInfo
93+
dstBuildInfo *util.BuildInfo
9494
numWorkers int
9595
failureDisplaySize int64
9696

@@ -312,21 +312,33 @@ func (verifier *Verifier) SetSrcURI(ctx context.Context, uri string) error {
312312
var err error
313313
verifier.srcClient, err = mongo.Connect(ctx, opts)
314314
if err != nil {
315-
return err
315+
return errors.Wrapf(err, "failed to connect to source %#q", uri)
316316
}
317-
verifier.srcBuildInfo, err = getBuildInfo(ctx, verifier.srcClient)
318-
return err
317+
318+
buildInfo, err := util.GetBuildInfo(ctx, verifier.srcClient)
319+
if err != nil {
320+
return errors.Wrap(err, "failed to read source build info")
321+
}
322+
323+
verifier.srcBuildInfo = &buildInfo
324+
return nil
319325
}
320326

321327
func (verifier *Verifier) SetDstURI(ctx context.Context, uri string) error {
322328
opts := verifier.getClientOpts(uri)
323329
var err error
324330
verifier.dstClient, err = mongo.Connect(ctx, opts)
325331
if err != nil {
326-
return err
332+
return errors.Wrapf(err, "failed to connect to destination %#q", uri)
327333
}
328-
verifier.dstBuildInfo, err = getBuildInfo(ctx, verifier.dstClient)
329-
return err
334+
335+
buildInfo, err := util.GetBuildInfo(ctx, verifier.dstClient)
336+
if err != nil {
337+
return errors.Wrap(err, "failed to read destination build info")
338+
}
339+
340+
verifier.dstBuildInfo = &buildInfo
341+
return nil
330342
}
331343

332344
func (verifier *Verifier) SetServerPort(port int) {
@@ -457,7 +469,7 @@ func (verifier *Verifier) maybeAppendGlobalFilterToPredicates(predicates bson.A)
457469
return append(predicates, verifier.globalFilter)
458470
}
459471

460-
func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, buildInfo *bson.M,
472+
func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, buildInfo *util.BuildInfo,
461473
startAtTs *primitive.Timestamp, task *VerificationTask) (*mongo.Cursor, error) {
462474
var findOptions bson.D
463475
runCommandOptions := options.RunCmd()
@@ -1510,16 +1522,3 @@ func (verifier *Verifier) getNamespaces(ctx context.Context, fieldName string) (
15101522
}
15111523
return namespaces, nil
15121524
}
1513-
1514-
func getBuildInfo(ctx context.Context, client *mongo.Client) (*bson.M, error) {
1515-
commandResult := client.Database("admin").RunCommand(ctx, bson.D{{"buildinfo", 1}})
1516-
if commandResult.Err() != nil {
1517-
return nil, commandResult.Err()
1518-
}
1519-
var buildInfoMap bson.M
1520-
err := commandResult.Decode(&buildInfoMap)
1521-
if err != nil {
1522-
return nil, err
1523-
}
1524-
return &buildInfoMap, nil
1525-
}

0 commit comments

Comments
 (0)