Skip to content

Commit 281afc0

Browse files
committed
wait for change event handler
1 parent 9eed454 commit 281afc0

File tree

3 files changed

+31
-17
lines changed

3 files changed

+31
-17
lines changed

internal/verifier/change_stream.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package verifier
33
import (
44
"context"
55
"fmt"
6+
"golang.org/x/sync/errgroup"
67
"time"
78

89
"github.com/10gen/migration-verifier/internal/keystring"
@@ -82,7 +83,6 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
8283
watcherClient: verifier.srcClient,
8384
buildInfo: *verifier.srcBuildInfo,
8485
changeStreamRunning: false,
85-
ChangeEventBatchChan: make(chan []ParsedEvent),
8686
ChangeStreamWritesOffTsChan: make(chan primitive.Timestamp),
8787
ChangeStreamErrChan: make(chan error),
8888
ChangeStreamDoneChan: make(chan struct{}),
@@ -95,28 +95,31 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
9595
watcherClient: verifier.dstClient,
9696
buildInfo: *verifier.dstBuildInfo,
9797
changeStreamRunning: false,
98-
ChangeEventBatchChan: make(chan []ParsedEvent),
9998
ChangeStreamWritesOffTsChan: make(chan primitive.Timestamp),
10099
ChangeStreamErrChan: make(chan error),
101100
ChangeStreamDoneChan: make(chan struct{}),
102101
}
103102
}
104103

105104
// StartChangeEventHandler starts a goroutine that handles change event batches from the reader.
106-
func (verifier *Verifier) StartChangeEventHandler(ctx context.Context, reader *ChangeStreamReader) {
107-
go func() {
105+
func (verifier *Verifier) StartChangeEventHandler(ctx context.Context, reader *ChangeStreamReader, errGroup *errgroup.Group) {
106+
errGroup.Go(func() error {
108107
for {
109108
select {
110109
case <-ctx.Done():
111-
return
112-
case batch := <-reader.ChangeEventBatchChan:
110+
return ctx.Err()
111+
case batch, more := <-reader.ChangeEventBatchChan:
112+
if !more {
113+
// Change stream reader has closed the event batch channel because it has finished.
114+
return nil
115+
}
113116
err := verifier.HandleChangeStreamEvents(ctx, batch, reader.readerType)
114117
if err != nil {
115-
reader.ChangeStreamErrChan <- err
118+
return err
116119
}
117120
}
118121
}
119-
}()
122+
})
120123
}
121124

122125
// HandleChangeStreamEvents performs the necessary work for change stream events after receiving a batch.
@@ -298,6 +301,8 @@ func (csr *ChangeStreamReader) iterateChangeStream(
298301
) error {
299302
var lastPersistedTime time.Time
300303

304+
defer close(csr.ChangeEventBatchChan)
305+
301306
persistResumeTokenIfNeeded := func() error {
302307
if time.Since(lastPersistedTime) <= minChangeStreamPersistInterval {
303308
return nil
@@ -328,7 +333,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(
328333
case writesOffTs := <-csr.ChangeStreamWritesOffTsChan:
329334
csr.logger.Debug().
330335
Interface("writesOffTimestamp", writesOffTs).
331-
Msg("Change stream thread received writesOff timestamp. Finalizing change stream.")
336+
Msgf("%s thread received writesOff timestamp. Finalizing change stream.", csr)
332337

333338
gotwritesOffTimestamp = true
334339

@@ -477,6 +482,11 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
477482
// there's no chance of "nonsense" like both channels returning a payload.
478483
initialCreateResultChan := make(chan mo.Result[primitive.Timestamp])
479484

485+
// Change stream reader closes ChangeEventBatchChan each time after
486+
// finish reading change events.
487+
// It needs to be re-initialized when a change stream starts.
488+
csr.ChangeEventBatchChan = make(chan []ParsedEvent)
489+
480490
go func() {
481491
retryer := retry.New(retry.DefaultDurationLimit)
482492
retryer = retryer.WithErrorCodes(util.CursorKilled)

internal/verifier/change_stream_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package verifier
22

33
import (
44
"context"
5+
"golang.org/x/sync/errgroup"
56
"strings"
67
"time"
78

@@ -54,7 +55,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
5455
verifier1 := suite.BuildVerifier()
5556
ctx, cancel := context.WithCancel(suite.Context())
5657
defer cancel()
57-
verifier1.StartChangeEventHandler(ctx, verifier1.srcChangeStreamReader)
58+
verifier1.StartChangeEventHandler(ctx, verifier1.srcChangeStreamReader, &errgroup.Group{})
5859
err := verifier1.srcChangeStreamReader.StartChangeStream(ctx)
5960
suite.Require().NoError(err)
6061
}()
@@ -80,7 +81,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
8081

8182
newTime := suite.getClusterTime(ctx, suite.srcMongoClient)
8283

83-
verifier2.StartChangeEventHandler(ctx, verifier2.srcChangeStreamReader)
84+
verifier2.StartChangeEventHandler(ctx, verifier2.srcChangeStreamReader, &errgroup.Group{})
8485
err = verifier2.srcChangeStreamReader.StartChangeStream(ctx)
8586
suite.Require().NoError(err)
8687

@@ -155,7 +156,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() {
155156
suite.Require().NoError(err)
156157
origStartTs := sess.OperationTime()
157158
suite.Require().NotNil(origStartTs)
158-
verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader)
159+
verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader, &errgroup.Group{})
159160
err = verifier.srcChangeStreamReader.StartChangeStream(ctx)
160161
suite.Require().NoError(err)
161162
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
@@ -176,7 +177,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
176177

177178
origSessionTime := sess.OperationTime()
178179
suite.Require().NotNil(origSessionTime)
179-
verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader)
180+
verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader, &errgroup.Group{})
180181
err = verifier.srcChangeStreamReader.StartChangeStream(ctx)
181182
suite.Require().NoError(err)
182183

@@ -228,7 +229,7 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() {
228229
suite.Require().NoError(err)
229230
origStartTs := sess.OperationTime()
230231
suite.Require().NotNil(origStartTs)
231-
verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader)
232+
verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader, &errgroup.Group{})
232233
err = verifier.srcChangeStreamReader.StartChangeStream(ctx)
233234
suite.Require().NoError(err)
234235
suite.Require().NotNil(verifier.srcChangeStreamReader.startAtTs)
@@ -248,7 +249,7 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
248249

249250
verifier := suite.BuildVerifier()
250251

251-
verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader)
252+
verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader, &errgroup.Group{})
252253
suite.Require().NoError(verifier.srcChangeStreamReader.StartChangeStream(ctx))
253254

254255
_, err := coll1.InsertOne(ctx, bson.D{{"_id", 1}})
@@ -389,6 +390,7 @@ func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {
389390
lo.ToAnySlice(docs),
390391
)
391392
suite.Require().NoError(err)
393+
//fmt.Println(fmt.Sprintf("src cluster time %v", suite.getClusterTime(ctx, suite.srcMongoClient)))
392394

393395
suite.Require().NoError(verifier.WritesOff(ctx))
394396

@@ -472,7 +474,7 @@ func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() {
472474
verifier.SetDstNamespaces([]string{dstDBName + ".dstColl1", dstDBName + ".dstColl2"})
473475
verifier.SetNamespaceMap()
474476

475-
verifier.StartChangeEventHandler(ctx, verifier.dstChangeStreamReader)
477+
verifier.StartChangeEventHandler(ctx, verifier.dstChangeStreamReader, &errgroup.Group{})
476478
suite.Require().NoError(verifier.dstChangeStreamReader.StartChangeStream(ctx))
477479

478480
_, err := coll1.InsertOne(ctx, bson.D{{"_id", 1}})

internal/verifier/check.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
207207
verifier.phase = Idle
208208
}()
209209

210+
ceHandlerGroup := &errgroup.Group{}
210211
for _, csReader := range []*ChangeStreamReader{verifier.srcChangeStreamReader, verifier.dstChangeStreamReader} {
211212
if csReader.changeStreamRunning {
212213
verifier.logger.Debug().Msgf("Check: %s already running.", csReader)
@@ -217,7 +218,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
217218
if err != nil {
218219
return errors.Wrapf(err, "failed to start %s", csReader)
219220
}
220-
verifier.StartChangeEventHandler(ctx, csReader)
221+
verifier.StartChangeEventHandler(ctx, csReader, ceHandlerGroup)
221222
}
222223
}
223224

@@ -293,6 +294,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
293294
if err = verifier.waitForChangeStream(ctx, verifier.dstChangeStreamReader); err != nil {
294295
return err
295296
}
297+
ceHandlerGroup.Wait()
296298
verifier.mux.Lock()
297299
verifier.lastGeneration = true
298300
}

0 commit comments

Comments
 (0)