Skip to content

Commit 0f43765

Browse files
committed
Prevent WritesOff from hanging when the change stream has failed.
1 parent cd1fc57 commit 0f43765

File tree

4 files changed

+17
-3
lines changed

4 files changed

+17
-3
lines changed

internal/verifier/change_stream_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,9 +331,13 @@ func (suite *IntegrationTestSuite) TestCreateForbidden() {
331331
db.CreateCollection(ctx, coll.Name()),
332332
)
333333

334-
suite.Require().NoError(verifier.WritesOff(ctx))
334+
// The error from the create event will come either at WritesOff
335+
// or when we finalize the change stream.
336+
err = verifier.WritesOff(ctx)
337+
if err == nil {
338+
err = verifierRunner.Await()
339+
}
335340

336-
err = verifierRunner.Await()
337341
suite.Require().Error(err, "should detect forbidden create event")
338342

339343
eventErr := UnknownEventError{}

internal/verifier/check.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
236236
// paying attention. Also, this should not matter too much because any failures will be
237237
// caught again on the next iteration.
238238
if verifier.writesOff {
239+
verifier.logger.Debug().
240+
Msg("Waiting for change stream to end.")
241+
239242
// It's necessary to wait for the change stream to finish before incrementing the
240243
// generation number, or the last changes will not be checked.
241244
verifier.mux.Unlock()

internal/verifier/check_runner.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package verifier
22

33
import (
44
"context"
5+
"fmt"
56
"testing"
67
)
78

@@ -49,7 +50,9 @@ func (cr *CheckRunner) Await() error {
4950
return err
5051

5152
case <-cr.generationDoneChan:
53+
fmt.Printf("===== verifier finished a generation\n")
5254
case cr.doNextGenerationChan <- struct{}{}:
55+
fmt.Printf("===== starting next generation\n")
5356
}
5457
}
5558
}

internal/verifier/migration_verifier.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,11 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
259259
// This has to happen under the lock because the change stream
260260
// might be inserting docs into the recheck queue, which happens
261261
// under the lock.
262-
verifier.changeStreamWritesOffTsChan <- finalTs
262+
select {
263+
case verifier.changeStreamWritesOffTsChan <- finalTs:
264+
case err := <-verifier.changeStreamErrChan:
265+
return errors.Wrap(err, "tried to send writes-off timestamp to change stream, but change stream already failed")
266+
}
263267
} else {
264268
verifier.mux.Unlock()
265269
}

0 commit comments

Comments
 (0)