Skip to content

Commit 58ca577

Browse files
committed
Merge branch 'main' into felipe_expanded_events
2 parents f3a9b91 + dcfb296 commit 58ca577

File tree

9 files changed

+860
-149
lines changed

9 files changed

+860
-149
lines changed

internal/util/collections.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package util
2+
3+
import (
4+
"context"
5+
6+
"github.com/10gen/migration-verifier/option"
7+
"github.com/pkg/errors"
8+
"go.mongodb.org/mongo-driver/bson"
9+
"go.mongodb.org/mongo-driver/bson/primitive"
10+
"go.mongodb.org/mongo-driver/mongo"
11+
)
12+
13+
// CollectionSpec is like mongo.CollectionSpecification except:
14+
// - IDIndex is a bson.Raw rather than mongo.IndexSpecification.
15+
// - It can detect unexpected fields.
16+
type CollectionSpec struct {
17+
Name string
18+
Type string
19+
Options bson.Raw
20+
Info struct {
21+
ReadOnly bool `bson:"readOnly"`
22+
UUID primitive.Binary
23+
24+
Extra map[string]any
25+
}
26+
IDIndex bson.Raw `bson:"idIndex"`
27+
28+
Extra map[string]any
29+
}
30+
31+
// FullName returns the collection's full namespace.
32+
func FullName(collection *mongo.Collection) string {
33+
return collection.Database().Name() + "." + collection.Name()
34+
}
35+
36+
// GetCollectionSpecIfExists returns the given collection’s specification,
37+
// or empty if the collection doesn’t exist. If any unexpected properties
38+
// exist in the collection specification then an error is returned.
39+
func GetCollectionSpecIfExists(
40+
ctx context.Context,
41+
coll *mongo.Collection,
42+
) (option.Option[CollectionSpec], error) {
43+
cursor, err := coll.Database().ListCollections(ctx, bson.M{"name": coll.Name()})
44+
if err != nil {
45+
return option.None[CollectionSpec](), errors.Wrapf(
46+
err,
47+
"failed to fetch %#q's specification",
48+
FullName(coll),
49+
)
50+
}
51+
52+
var specs []CollectionSpec
53+
err = cursor.All(ctx, &specs)
54+
if err != nil {
55+
return option.None[CollectionSpec](), errors.Wrapf(
56+
err,
57+
"failed to parse %#q's specification",
58+
FullName(coll),
59+
)
60+
}
61+
62+
switch len(specs) {
63+
case 0:
64+
return option.None[CollectionSpec](), nil
65+
case 1:
66+
if len(specs[0].Extra) > 0 || len(specs[0].Info.Extra) > 0 {
67+
return option.None[CollectionSpec](), errors.Wrapf(
68+
err,
69+
"%#q's specification (%v) contains unrecognized fields",
70+
FullName(coll),
71+
specs[0],
72+
)
73+
}
74+
75+
return option.Some(specs[0]), nil
76+
}
77+
78+
return option.None[CollectionSpec](), errors.Wrapf(
79+
err,
80+
"received multiple results (%v) when fetching %#q's specification",
81+
specs,
82+
FullName(coll),
83+
)
84+
}

internal/verifier/change_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
226226
break
227227
}
228228

229-
if curTs == writesOffTs || curTs.After(writesOffTs) {
229+
if curTs.After(writesOffTs) {
230230
verifier.logger.Debug().
231231
Interface("currentTimestamp", curTs).
232232
Interface("writesOffTimestamp", writesOffTs).

internal/verifier/change_stream_test.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,13 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
170170
suite.Require().NotNil(origSessionTime)
171171
err = verifier.StartChangeStream(ctx)
172172
suite.Require().NoError(err)
173-
suite.Require().Equal(verifier.srcStartAtTs, origSessionTime)
173+
174+
// srcStartAtTs derives from the change stream’s resume token, which can
175+
// postdate our session time but should not precede it.
176+
suite.Require().False(
177+
verifier.srcStartAtTs.Before(*origSessionTime),
178+
"srcStartAtTs should be >= the insert’s optime",
179+
)
174180

175181
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne(
176182
sctx, bson.D{{"_id", 1}})
@@ -257,7 +263,15 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
257263

258264
}
259265

260-
func (suite *IntegrationTestSuite) TestEventBeforeWritesOff() {
266+
func (suite *IntegrationTestSuite) TestManyInsertsBeforeWritesOff() {
267+
suite.testInsertsBeforeWritesOff(10_000)
268+
}
269+
270+
func (suite *IntegrationTestSuite) TestOneInsertBeforeWritesOff() {
271+
suite.testInsertsBeforeWritesOff(1)
272+
}
273+
274+
func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {
261275
ctx := suite.Context()
262276

263277
verifier := suite.BuildVerifier()
@@ -274,7 +288,6 @@ func (suite *IntegrationTestSuite) TestEventBeforeWritesOff() {
274288
// wait for generation 0 to end
275289
verifierRunner.AwaitGenerationEnd()
276290

277-
docsCount := 10_000
278291
docs := lo.RepeatBy(docsCount, func(_ int) bson.D { return bson.D{} })
279292
_, err := coll.InsertMany(
280293
ctx,

0 commit comments

Comments
 (0)