Skip to content

Commit c96a495

Browse files
committed
fix race condition in tests
1 parent 281afc0 commit c96a495

File tree

2 files changed

+14
-18
lines changed

2 files changed

+14
-18
lines changed

internal/verifier/change_stream.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
102102
}
103103

104104
// StartChangeEventHandler starts a goroutine that handles change event batches from the reader.
105+
// It needs to be started after the reader starts.
105106
func (verifier *Verifier) StartChangeEventHandler(ctx context.Context, reader *ChangeStreamReader, errGroup *errgroup.Group) {
106107
errGroup.Go(func() error {
107108
for {

internal/verifier/change_stream_test.go

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter() {
4141
)
4242
}
4343

44+
func (suite *IntegrationTestSuite) startSrcChangeStreamReaderAndHandler(ctx context.Context, verifier *Verifier) {
45+
err := verifier.srcChangeStreamReader.StartChangeStream(ctx)
46+
suite.Require().NoError(err)
47+
verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader, &errgroup.Group{})
48+
}
49+
4450
// TestChangeStreamResumability creates a verifier, starts its change stream,
4551
// terminates that verifier, updates the source cluster, starts a new
4652
// verifier with change stream, and confirms that things look as they should.
@@ -55,9 +61,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
5561
verifier1 := suite.BuildVerifier()
5662
ctx, cancel := context.WithCancel(suite.Context())
5763
defer cancel()
58-
verifier1.StartChangeEventHandler(ctx, verifier1.srcChangeStreamReader, &errgroup.Group{})
59-
err := verifier1.srcChangeStreamReader.StartChangeStream(ctx)
60-
suite.Require().NoError(err)
64+
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier1)
6165
}()
6266

6367
ctx, cancel := context.WithCancel(suite.Context())
@@ -81,9 +85,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
8185

8286
newTime := suite.getClusterTime(ctx, suite.srcMongoClient)
8387

84-
verifier2.StartChangeEventHandler(ctx, verifier2.srcChangeStreamReader, &errgroup.Group{})
85-
err = verifier2.srcChangeStreamReader.StartChangeStream(ctx)
86-
suite.Require().NoError(err)
88+
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier2)
8789

8890
suite.Require().NotNil(verifier2.srcChangeStreamReader.startAtTs)
8991

@@ -156,9 +158,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() {
156158
suite.Require().NoError(err)
157159
origStartTs := sess.OperationTime()
158160
suite.Require().NotNil(origStartTs)
159-
verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader, &errgroup.Group{})
160-
err = verifier.srcChangeStreamReader.StartChangeStream(ctx)
161-
suite.Require().NoError(err)
161+
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
162162
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
163163
verifier.srcChangeStreamReader.ChangeStreamWritesOffTsChan <- *origStartTs
164164
<-verifier.srcChangeStreamReader.ChangeStreamDoneChan
@@ -177,9 +177,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
177177

178178
origSessionTime := sess.OperationTime()
179179
suite.Require().NotNil(origSessionTime)
180-
verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader, &errgroup.Group{})
181-
err = verifier.srcChangeStreamReader.StartChangeStream(ctx)
182-
suite.Require().NoError(err)
180+
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
183181

184182
// srcStartAtTs derives from the change stream’s resume token, which can
185183
// postdate our session time but should not precede it.
@@ -229,9 +227,7 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() {
229227
suite.Require().NoError(err)
230228
origStartTs := sess.OperationTime()
231229
suite.Require().NotNil(origStartTs)
232-
verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader, &errgroup.Group{})
233-
err = verifier.srcChangeStreamReader.StartChangeStream(ctx)
234-
suite.Require().NoError(err)
230+
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
235231
suite.Require().NotNil(verifier.srcChangeStreamReader.startAtTs)
236232
suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcChangeStreamReader.startAtTs), 0)
237233
}
@@ -249,8 +245,7 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
249245

250246
verifier := suite.BuildVerifier()
251247

252-
verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader, &errgroup.Group{})
253-
suite.Require().NoError(verifier.srcChangeStreamReader.StartChangeStream(ctx))
248+
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
254249

255250
_, err := coll1.InsertOne(ctx, bson.D{{"_id", 1}})
256251
suite.Require().NoError(err)
@@ -474,8 +469,8 @@ func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() {
474469
verifier.SetDstNamespaces([]string{dstDBName + ".dstColl1", dstDBName + ".dstColl2"})
475470
verifier.SetNamespaceMap()
476471

477-
verifier.StartChangeEventHandler(ctx, verifier.dstChangeStreamReader, &errgroup.Group{})
478472
suite.Require().NoError(verifier.dstChangeStreamReader.StartChangeStream(ctx))
473+
verifier.StartChangeEventHandler(ctx, verifier.dstChangeStreamReader, &errgroup.Group{})
479474

480475
_, err := coll1.InsertOne(ctx, bson.D{{"_id", 1}})
481476
suite.Require().NoError(err)

0 commit comments

Comments
 (0)