Skip to content

Commit 9a081cc

Browse files
committed
Merge branch 'main' into REP-5317-add-dst-change-stream
2 parents d66f25e + c0c8966 commit 9a081cc

File tree

12 files changed

+1110
-274
lines changed

12 files changed

+1110
-274
lines changed

internal/util/collections.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package util
2+
3+
import (
4+
"context"
5+
6+
"github.com/10gen/migration-verifier/option"
7+
"github.com/pkg/errors"
8+
"go.mongodb.org/mongo-driver/bson"
9+
"go.mongodb.org/mongo-driver/bson/primitive"
10+
"go.mongodb.org/mongo-driver/mongo"
11+
)
12+
13+
// CollectionSpec is like mongo.CollectionSpecification except:
14+
// - IDIndex is a bson.Raw rather than mongo.IndexSpecification.
15+
// - It can detect unexpected fields.
16+
type CollectionSpec struct {
17+
Name string
18+
Type string
19+
Options bson.Raw
20+
Info struct {
21+
ReadOnly bool `bson:"readOnly"`
22+
UUID primitive.Binary
23+
24+
Extra map[string]any
25+
}
26+
IDIndex bson.Raw `bson:"idIndex"`
27+
28+
Extra map[string]any
29+
}
30+
31+
// FullName returns the collection's full namespace.
32+
func FullName(collection *mongo.Collection) string {
33+
return collection.Database().Name() + "." + collection.Name()
34+
}
35+
36+
// GetCollectionSpecIfExists returns the given collection’s specification,
37+
// or empty if the collection doesn’t exist. If any unexpected properties
38+
// exist in the collection specification then an error is returned.
39+
func GetCollectionSpecIfExists(
40+
ctx context.Context,
41+
coll *mongo.Collection,
42+
) (option.Option[CollectionSpec], error) {
43+
cursor, err := coll.Database().ListCollections(ctx, bson.M{"name": coll.Name()})
44+
if err != nil {
45+
return option.None[CollectionSpec](), errors.Wrapf(
46+
err,
47+
"failed to fetch %#q's specification",
48+
FullName(coll),
49+
)
50+
}
51+
52+
var specs []CollectionSpec
53+
err = cursor.All(ctx, &specs)
54+
if err != nil {
55+
return option.None[CollectionSpec](), errors.Wrapf(
56+
err,
57+
"failed to parse %#q's specification",
58+
FullName(coll),
59+
)
60+
}
61+
62+
switch len(specs) {
63+
case 0:
64+
return option.None[CollectionSpec](), nil
65+
case 1:
66+
if len(specs[0].Extra) > 0 || len(specs[0].Info.Extra) > 0 {
67+
return option.None[CollectionSpec](), errors.Wrapf(
68+
err,
69+
"%#q's specification (%v) contains unrecognized fields",
70+
FullName(coll),
71+
specs[0],
72+
)
73+
}
74+
75+
return option.Some(specs[0]), nil
76+
}
77+
78+
return option.None[CollectionSpec](), errors.Wrapf(
79+
err,
80+
"received multiple results (%v) when fetching %#q's specification",
81+
specs,
82+
FullName(coll),
83+
)
84+
}

internal/verifier/change_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(ctx context.Context, cs *mong
319319
break
320320
}
321321

322-
if curTs == writesOffTs || curTs.After(writesOffTs) {
322+
if curTs.After(writesOffTs) {
323323
csr.logger.Debug().
324324
Interface("currentTimestamp", curTs).
325325
Interface("writesOffTimestamp", writesOffTs).

internal/verifier/change_stream_test.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,13 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
163163
suite.Require().NotNil(origSessionTime)
164164
err = verifier.srcChangeStreamReader.StartChangeStream(ctx)
165165
suite.Require().NoError(err)
166-
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origSessionTime)
166+
167+
// srcStartAtTs derives from the change stream’s resume token, which can
168+
// postdate our session time but should not precede it.
169+
suite.Require().False(
170+
verifier.srcChangeStreamReader.startAtTs.Before(*origSessionTime),
171+
"srcStartAtTs should be >= the insert’s optime",
172+
)
167173

168174
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne(
169175
sctx, bson.D{{"_id", 1}})
@@ -242,7 +248,15 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
242248
)
243249
}
244250

245-
func (suite *IntegrationTestSuite) TestEventBeforeWritesOff() {
251+
func (suite *IntegrationTestSuite) TestManyInsertsBeforeWritesOff() {
252+
suite.testInsertsBeforeWritesOff(10_000)
253+
}
254+
255+
func (suite *IntegrationTestSuite) TestOneInsertBeforeWritesOff() {
256+
suite.testInsertsBeforeWritesOff(1)
257+
}
258+
259+
func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {
246260
ctx := suite.Context()
247261

248262
verifier := suite.BuildVerifier()
@@ -259,7 +273,6 @@ func (suite *IntegrationTestSuite) TestEventBeforeWritesOff() {
259273
// wait for generation 0 to end
260274
verifierRunner.AwaitGenerationEnd()
261275

262-
docsCount := 10_000
263276
docs := lo.RepeatBy(docsCount, func(_ int) bson.D { return bson.D{} })
264277
_, err := coll.InsertMany(
265278
ctx,

0 commit comments

Comments
 (0)