Skip to content

Commit 74377be

Browse files
committed
Merge branch 'main' into felipe_retry_change_stream
2 parents 78bc6dd + 365482d commit 74377be

File tree

7 files changed

+135
-53
lines changed

7 files changed

+135
-53
lines changed

internal/partitions/partition.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (p *Partition) FindCmd(
137137
// (e.g. use the partitions on the source to read the destination for verification)
138138
// If the passed-in buildinfo indicates a mongodb version < 5.0, type bracketing is not used.
139139
// filterAndPredicates is a slice of filter criteria that's used to construct the "filter" field in the find option.
140-
func (p *Partition) GetFindOptions(buildInfo *bson.M, filterAndPredicates bson.A) bson.D {
140+
func (p *Partition) GetFindOptions(buildInfo *util.BuildInfo, filterAndPredicates bson.A) bson.D {
141141
if p == nil {
142142
if len(filterAndPredicates) > 0 {
143143
return bson.D{{"filter", bson.D{{"$and", filterAndPredicates}}}}
@@ -160,16 +160,9 @@ func (p *Partition) GetFindOptions(buildInfo *bson.M, filterAndPredicates bson.A
160160
allowTypeBracketing := false
161161
if buildInfo != nil {
162162
allowTypeBracketing = true
163-
versionArray, ok := (*buildInfo)["versionArray"].(bson.A)
164-
//bson values are int32 or int64, never int.
165-
if ok {
166-
majorVersion, ok := versionArray[0].(int32)
167-
if ok {
168-
allowTypeBracketing = majorVersion < 5
169-
} else {
170-
majorVersion64, _ := versionArray[0].(int64)
171-
allowTypeBracketing = majorVersion64 < 5
172-
}
163+
164+
if buildInfo.VersionArray != nil {
165+
allowTypeBracketing = buildInfo.VersionArray[0] < 5
173166
}
174167
}
175168
if !allowTypeBracketing {

internal/partitions/partition_test.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,43 +77,38 @@ func (suite *UnitTestSuite) TestVersioning() {
7777
filter := getFilterFromFindOptions(findOptions)
7878
suite.Require().Equal(expectedFilter, filter)
7979

80-
// 6.0 (int64)
81-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int64(6), int64(0), int64(0), int64(0)}}, nil)
82-
filter = getFilterFromFindOptions(findOptions)
83-
suite.Require().Equal(expectedFilter, filter)
84-
8580
// 6.0
86-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int32(6), int32(0), int32(0), int32(0)}}, nil)
81+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{6, 0, 0}}, nil)
8782
filter = getFilterFromFindOptions(findOptions)
8883
suite.Require().Equal(expectedFilter, filter)
8984

9085
// 5.3.0.9
91-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int32(5), int32(3), int32(0), int32(9)}}, nil)
86+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{5, 3, 0, 9}}, nil)
9287
filter = getFilterFromFindOptions(findOptions)
9388
suite.Require().Equal(expectedFilter, filter)
9489

9590
// 7.1.3.5
96-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int32(7), int32(1), int32(3), int32(5)}}, nil)
91+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{7, 1, 3, 5}}, nil)
9792
filter = getFilterFromFindOptions(findOptions)
9893
suite.Require().Equal(expectedFilter, filter)
9994

10095
// 4.4 (int64)
101-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int64(4), int64(4), int64(0), int64(0)}}, nil)
96+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
10297
filter = getFilterFromFindOptions(findOptions)
10398
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
10499

105100
// 4.4
106-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int32(4), int32(4), int32(0), int32(0)}}, nil)
101+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
107102
filter = getFilterFromFindOptions(findOptions)
108103
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
109104

110105
// 4.2
111-
findOptions = partition.GetFindOptions(&bson.M{"versionArray": bson.A{int32(4), int32(2), int32(0), int32(0)}}, nil)
106+
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 2, 0, 0}}, nil)
112107
filter = getFilterFromFindOptions(findOptions)
113108
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
114109

115110
// No version array -- assume old, require type bracketing.
116-
findOptions = partition.GetFindOptions(&bson.M{"notVersionArray": bson.A{6, int32(0), int32(0), int32(0)}}, nil)
111+
findOptions = partition.GetFindOptions(&util.BuildInfo{}, nil)
117112
filter = getFilterFromFindOptions(findOptions)
118113
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
119114
}

internal/util/buildinfo.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package util
2+
3+
import (
4+
"context"
5+
6+
"github.com/10gen/migration-verifier/mbson"
7+
"github.com/pkg/errors"
8+
"go.mongodb.org/mongo-driver/bson"
9+
"go.mongodb.org/mongo-driver/mongo"
10+
)
11+
12+
type BuildInfo struct {
13+
VersionArray []int
14+
}
15+
16+
func GetBuildInfo(ctx context.Context, client *mongo.Client) (BuildInfo, error) {
17+
commandResult := client.Database("admin").RunCommand(ctx, bson.D{{"buildinfo", 1}})
18+
19+
rawResp, err := commandResult.Raw()
20+
if err != nil {
21+
return BuildInfo{}, errors.Wrap(err, "failed to fetch build info")
22+
}
23+
24+
bi := BuildInfo{}
25+
_, err = mbson.RawLookup(rawResp, &bi.VersionArray, "versionArray")
26+
if err != nil {
27+
return BuildInfo{}, errors.Wrap(err, "failed to decode build info version array")
28+
}
29+
30+
return bi, nil
31+
}

internal/verifier/change_stream.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,10 @@ func (verifier *Verifier) createChangeStream(
296296
SetMaxAwaitTime(1 * time.Second).
297297
SetFullDocument(options.UpdateLookup)
298298

299+
if verifier.srcBuildInfo.VersionArray[0] >= 6 {
300+
opts = opts.SetCustomPipeline(bson.M{"showExpandedEvents": true})
301+
}
302+
299303
savedResumeToken, err := verifier.loadChangeStreamResumeToken(sctx)
300304
if err != nil {
301305
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token")

internal/verifier/change_stream_test.go

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"testing"
66
"time"
77

8+
"github.com/10gen/migration-verifier/internal/util"
9+
"github.com/10gen/migration-verifier/mslices"
810
"github.com/pkg/errors"
911
"github.com/samber/lo"
1012
"github.com/stretchr/testify/require"
@@ -37,6 +39,12 @@ func TestChangeStreamFilter(t *testing.T) {
3739
// terminates that verifier, updates the source cluster, starts a new
3840
// verifier with change stream, and confirms that things look as they should.
3941
func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
42+
suite.Require().NoError(
43+
suite.srcMongoClient.
44+
Database(suite.DBNameForTest()).
45+
CreateCollection(suite.Context(), "testColl"),
46+
)
47+
4048
func() {
4149
verifier1 := suite.BuildVerifier()
4250
ctx, cancel := context.WithCancel(context.Background())
@@ -45,7 +53,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
4553
suite.Require().NoError(err)
4654
}()
4755

48-
ctx, cancel := context.WithCancel(context.Background())
56+
ctx, cancel := context.WithCancel(suite.Context())
4957
defer cancel()
5058

5159
_, err := suite.srcMongoClient.
@@ -221,19 +229,26 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() {
221229
}
222230

223231
func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
224-
verifier := suite.BuildVerifier()
232+
ctx := suite.Context()
225233

226-
ctx, cancel := context.WithCancel(context.Background())
227-
defer cancel()
234+
db := suite.srcMongoClient.Database(suite.DBNameForTest())
235+
coll1 := db.Collection("testColl1")
236+
coll2 := db.Collection("testColl2")
237+
238+
for _, coll := range mslices.Of(coll1, coll2) {
239+
suite.Require().NoError(db.CreateCollection(ctx, coll.Name()))
240+
}
241+
242+
verifier := suite.BuildVerifier()
228243

229244
suite.Require().NoError(verifier.StartChangeStream(ctx))
230245

231-
_, err := suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 1}})
246+
_, err := coll1.InsertOne(ctx, bson.D{{"_id", 1}})
232247
suite.Require().NoError(err)
233-
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 2}})
248+
_, err = coll1.InsertOne(ctx, bson.D{{"_id", 2}})
234249
suite.Require().NoError(err)
235250

236-
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl2").InsertOne(ctx, bson.D{{"_id", 1}})
251+
_, err = coll2.InsertOne(ctx, bson.D{{"_id", 1}})
237252
suite.Require().NoError(err)
238253

239254
var rechecks []bson.M
@@ -247,6 +262,7 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
247262
500*time.Millisecond,
248263
"the verifier should flush a recheck doc after a batch",
249264
)
265+
250266
}
251267

252268
func (suite *IntegrationTestSuite) TestCursorKilledResilience() {
@@ -392,3 +408,40 @@ func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {
392408

393409
suite.Assert().Equal(docsCount, totalFailed, "all source docs should be missing")
394410
}
411+
412+
func (suite *IntegrationTestSuite) TestCreateForbidden() {
413+
ctx := suite.Context()
414+
buildInfo, err := util.GetBuildInfo(ctx, suite.srcMongoClient)
415+
suite.Require().NoError(err)
416+
417+
if buildInfo.VersionArray[0] < 6 {
418+
suite.T().Skipf("This test requires server v6+. (Found: %v)", buildInfo.VersionArray)
419+
}
420+
421+
verifier := suite.BuildVerifier()
422+
423+
// start verifier
424+
verifierRunner := RunVerifierCheck(suite.Context(), suite.T(), verifier)
425+
426+
// wait for generation 0 to end
427+
verifierRunner.AwaitGenerationEnd()
428+
429+
db := suite.srcMongoClient.Database(suite.DBNameForTest())
430+
coll := db.Collection("mycoll")
431+
suite.Require().NoError(
432+
db.CreateCollection(ctx, coll.Name()),
433+
)
434+
435+
// The error from the create event will come either at WritesOff
436+
// or when we finalize the change stream.
437+
err = verifier.WritesOff(ctx)
438+
if err == nil {
439+
err = verifierRunner.Await()
440+
}
441+
442+
suite.Require().Error(err, "should detect forbidden create event")
443+
444+
eventErr := UnknownEventError{}
445+
suite.Require().ErrorAs(err, &eventErr)
446+
suite.Assert().Equal("create", eventErr.Event.OpType)
447+
}

internal/verifier/check.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
236236
// paying attention. Also, this should not matter too much because any failures will be
237237
// caught again on the next iteration.
238238
if verifier.writesOff {
239+
verifier.logger.Debug().
240+
Msg("Waiting for change stream to end.")
241+
239242
// It's necessary to wait for the change stream to finish before incrementing the
240243
// generation number, or the last changes will not be checked.
241244
verifier.mux.Unlock()

internal/verifier/migration_verifier.go

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ type Verifier struct {
9090
metaClient *mongo.Client
9191
srcClient *mongo.Client
9292
dstClient *mongo.Client
93-
srcBuildInfo *bson.M
94-
dstBuildInfo *bson.M
93+
srcBuildInfo *util.BuildInfo
94+
dstBuildInfo *util.BuildInfo
9595
numWorkers int
9696
failureDisplaySize int64
9797

@@ -264,7 +264,11 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
264264
// This has to happen outside the lock because the change stream
265265
// might be inserting docs into the recheck queue, which happens
266266
// under the lock.
267-
verifier.changeStreamWritesOffTsChan <- finalTs
267+
select {
268+
case verifier.changeStreamWritesOffTsChan <- finalTs:
269+
case err := <-verifier.changeStreamErrChan:
270+
return errors.Wrap(err, "tried to send writes-off timestamp to change stream, but change stream already failed")
271+
}
268272
} else {
269273
verifier.mux.Unlock()
270274
}
@@ -311,21 +315,33 @@ func (verifier *Verifier) SetSrcURI(ctx context.Context, uri string) error {
311315
var err error
312316
verifier.srcClient, err = mongo.Connect(ctx, opts)
313317
if err != nil {
314-
return err
318+
return errors.Wrapf(err, "failed to connect to source %#q", uri)
315319
}
316-
verifier.srcBuildInfo, err = getBuildInfo(ctx, verifier.srcClient)
317-
return err
320+
321+
buildInfo, err := util.GetBuildInfo(ctx, verifier.srcClient)
322+
if err != nil {
323+
return errors.Wrap(err, "failed to read source build info")
324+
}
325+
326+
verifier.srcBuildInfo = &buildInfo
327+
return nil
318328
}
319329

320330
func (verifier *Verifier) SetDstURI(ctx context.Context, uri string) error {
321331
opts := verifier.getClientOpts(uri)
322332
var err error
323333
verifier.dstClient, err = mongo.Connect(ctx, opts)
324334
if err != nil {
325-
return err
335+
return errors.Wrapf(err, "failed to connect to destination %#q", uri)
326336
}
327-
verifier.dstBuildInfo, err = getBuildInfo(ctx, verifier.dstClient)
328-
return err
337+
338+
buildInfo, err := util.GetBuildInfo(ctx, verifier.dstClient)
339+
if err != nil {
340+
return errors.Wrap(err, "failed to read destination build info")
341+
}
342+
343+
verifier.dstBuildInfo = &buildInfo
344+
return nil
329345
}
330346

331347
func (verifier *Verifier) SetServerPort(port int) {
@@ -456,7 +472,7 @@ func (verifier *Verifier) maybeAppendGlobalFilterToPredicates(predicates bson.A)
456472
return append(predicates, verifier.globalFilter)
457473
}
458474

459-
func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, buildInfo *bson.M,
475+
func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, buildInfo *util.BuildInfo,
460476
startAtTs *primitive.Timestamp, task *VerificationTask) (*mongo.Cursor, error) {
461477
var findOptions bson.D
462478
runCommandOptions := options.RunCmd()
@@ -1509,16 +1525,3 @@ func (verifier *Verifier) getNamespaces(ctx context.Context, fieldName string) (
15091525
}
15101526
return namespaces, nil
15111527
}
1512-
1513-
func getBuildInfo(ctx context.Context, client *mongo.Client) (*bson.M, error) {
1514-
commandResult := client.Database("admin").RunCommand(ctx, bson.D{{"buildinfo", 1}})
1515-
if commandResult.Err() != nil {
1516-
return nil, commandResult.Err()
1517-
}
1518-
var buildInfoMap bson.M
1519-
err := commandResult.Decode(&buildInfoMap)
1520-
if err != nil {
1521-
return nil, err
1522-
}
1523-
return &buildInfoMap, nil
1524-
}

0 commit comments

Comments
 (0)