Skip to content

Commit d951623

Browse files
committed
catch expanded events
1 parent f241d1e commit d951623

File tree

1 file changed

+20
-5
lines changed

1 file changed

+20
-5
lines changed

internal/verifier/change_stream_test.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"testing"
66
"time"
77

8+
"github.com/10gen/migration-verifier/mslices"
89
"github.com/pkg/errors"
910
"github.com/samber/lo"
1011
"github.com/stretchr/testify/require"
@@ -35,6 +36,12 @@ func TestChangeStreamFilter(t *testing.T) {
3536
// terminates that verifier, updates the source cluster, starts a new
3637
// verifier with change stream, and confirms that things look as they should.
3738
func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
39+
suite.Require().NoError(
40+
suite.srcMongoClient.
41+
Database(suite.DBNameForTest()).
42+
CreateCollection(suite.Context(), "testColl"),
43+
)
44+
3845
func() {
3946
verifier1 := suite.BuildVerifier()
4047
ctx, cancel := context.WithCancel(context.Background())
@@ -43,7 +50,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
4350
suite.Require().NoError(err)
4451
}()
4552

46-
ctx, cancel := context.WithCancel(context.Background())
53+
ctx, cancel := context.WithCancel(suite.Context())
4754
defer cancel()
4855

4956
_, err := suite.srcMongoClient.
@@ -219,14 +226,21 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() {
219226
}
220227

221228
func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
222-
verifier := suite.BuildVerifier()
229+
ctx := suite.Context()
223230

224-
ctx, cancel := context.WithCancel(context.Background())
225-
defer cancel()
231+
db := suite.srcMongoClient.Database(suite.DBNameForTest())
232+
coll1 := db.Collection("testColl1")
233+
coll2 := db.Collection("testColl2")
234+
235+
for _, coll := range mslices.Of(coll1, coll2) {
236+
suite.Require().NoError(db.CreateCollection(ctx, coll.Name()))
237+
}
238+
239+
verifier := suite.BuildVerifier()
226240

227241
suite.Require().NoError(verifier.StartChangeStream(ctx))
228242

229-
_, err := suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 1}})
243+
_, err := coll1.InsertOne(ctx, bson.D{{"_id", 1}})
230244
suite.Require().NoError(err)
231245
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 2}})
232246
suite.Require().NoError(err)
@@ -245,6 +259,7 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
245259
500*time.Millisecond,
246260
"the verifier should flush a recheck doc after a batch",
247261
)
262+
248263
}
249264

250265
func (suite *IntegrationTestSuite) TestManyInsertsBeforeWritesOff() {

0 commit comments

Comments
 (0)