Skip to content

Commit d1dff2b

Browse files
committed
Prevent WritesOff from hanging when the change stream has failed.
1 parent af00bf0 commit d1dff2b

File tree

3 files changed

+14
-3
lines changed

3 files changed

+14
-3
lines changed

internal/verifier/change_stream_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,9 +344,13 @@ func (suite *IntegrationTestSuite) TestCreateForbidden() {
344344
db.CreateCollection(ctx, coll.Name()),
345345
)
346346

347-
suite.Require().NoError(verifier.WritesOff(ctx))
347+
// The error from the create event will come either at WritesOff
348+
// or when we finalize the change stream.
349+
err = verifier.WritesOff(ctx)
350+
if err == nil {
351+
err = verifierRunner.Await()
352+
}
348353

349-
err = verifierRunner.Await()
350354
suite.Require().Error(err, "should detect forbidden create event")
351355

352356
eventErr := UnknownEventError{}

internal/verifier/check.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
236236
// paying attention. Also, this should not matter too much because any failures will be
237237
// caught again on the next iteration.
238238
if verifier.writesOff {
239+
verifier.logger.Debug().
240+
Msg("Waiting for change stream to end.")
241+
239242
// It's necessary to wait for the change stream to finish before incrementing the
240243
// generation number, or the last changes will not be checked.
241244
verifier.mux.Unlock()

internal/verifier/migration_verifier.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,11 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
261261
// This has to happen under the lock because the change stream
262262
// might be inserting docs into the recheck queue, which happens
263263
// under the lock.
264-
verifier.changeStreamWritesOffTsChan <- finalTs
264+
select {
265+
case verifier.changeStreamWritesOffTsChan <- finalTs:
266+
case err := <-verifier.changeStreamErrChan:
267+
return errors.Wrap(err, "tried to send writes-off timestamp to change stream, but change stream already failed")
268+
}
265269
} else {
266270
verifier.mux.Unlock()
267271
}

0 commit comments

Comments
 (0)