Skip to content

Commit f3a9b91

Browse files
committed
Prevent WritesOff from hanging when the change stream has failed.
1 parent cd1fc57 commit f3a9b91

File tree

3 files changed

+14
-3
lines changed

3 files changed

+14
-3
lines changed

internal/verifier/change_stream_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,9 +331,13 @@ func (suite *IntegrationTestSuite) TestCreateForbidden() {
331331
db.CreateCollection(ctx, coll.Name()),
332332
)
333333

334-
suite.Require().NoError(verifier.WritesOff(ctx))
334+
// The error from the create event will come either at WritesOff
335+
// or when we finalize the change stream.
336+
err = verifier.WritesOff(ctx)
337+
if err == nil {
338+
err = verifierRunner.Await()
339+
}
335340

336-
err = verifierRunner.Await()
337341
suite.Require().Error(err, "should detect forbidden create event")
338342

339343
eventErr := UnknownEventError{}

internal/verifier/check.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
236236
// paying attention. Also, this should not matter too much because any failures will be
237237
// caught again on the next iteration.
238238
if verifier.writesOff {
239+
verifier.logger.Debug().
240+
Msg("Waiting for change stream to end.")
241+
239242
// It's necessary to wait for the change stream to finish before incrementing the
240243
// generation number, or the last changes will not be checked.
241244
verifier.mux.Unlock()

internal/verifier/migration_verifier.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,11 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
259259
// This has to happen under the lock because the change stream
260260
// might be inserting docs into the recheck queue, which happens
261261
// under the lock.
262-
verifier.changeStreamWritesOffTsChan <- finalTs
262+
select {
263+
case verifier.changeStreamWritesOffTsChan <- finalTs:
264+
case err := <-verifier.changeStreamErrChan:
265+
return errors.Wrap(err, "tried to send writes-off timestamp to change stream, but change stream already failed")
266+
}
263267
} else {
264268
verifier.mux.Unlock()
265269
}

0 commit comments

Comments
 (0)