Skip to content

Commit 3f45788

Browse files
committed
eventual for change stream error too
1 parent 0e200cf commit 3f45788

File tree

4 files changed

+22
-13
lines changed

4 files changed

+22
-13
lines changed

internal/util/eventual.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@ import (
66
"github.com/10gen/migration-verifier/option"
77
)
88

9-
// Eventual represents a value that isn’t available when
10-
// this struct is created but can be awaited via a channel.
9+
// Eventual represents a value that isn’t available when this struct is created
10+
// but can be awaited via a channel.
11+
//
12+
// This is much like how context.Context’s Done() and Err() methods work.
13+
// It’s useful to await a value’s readiness via channel but then read it
14+
// multiple times.
1115
type Eventual[T any] struct {
1216
ready chan struct{}
1317
val option.Option[T]

internal/verifier/change_stream.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ type ChangeStreamReader struct {
6969
changeStreamRunning bool
7070
changeEventBatchChan chan []ParsedEvent
7171
writesOffTs *util.Eventual[primitive.Timestamp]
72-
errChan chan error
72+
error *util.Eventual[error]
7373
doneChan chan struct{}
7474

7575
startAtTs *primitive.Timestamp
@@ -89,7 +89,7 @@ func (verifier *Verifier) initializeChangeStreamReaders(ctx context.Context) {
8989
changeStreamRunning: false,
9090
changeEventBatchChan: make(chan []ParsedEvent),
9191
writesOffTs: util.NewEventual[primitive.Timestamp](),
92-
errChan: make(chan error),
92+
error: util.NewEventual[error](),
9393
doneChan: make(chan struct{}),
9494
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
9595
}
@@ -104,7 +104,7 @@ func (verifier *Verifier) initializeChangeStreamReaders(ctx context.Context) {
104104
changeStreamRunning: false,
105105
changeEventBatchChan: make(chan []ParsedEvent),
106106
writesOffTs: util.NewEventual[primitive.Timestamp](),
107-
errChan: make(chan error),
107+
error: util.NewEventual[error](),
108108
doneChan: make(chan struct{}),
109109
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
110110
}
@@ -548,8 +548,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
548548
if err != nil {
549549
// NB: This failure always happens after the initial change stream
550550
// creation.
551-
csr.errChan <- err
552-
close(csr.errChan)
551+
csr.error.Set(err)
553552
}
554553
}()
555554

internal/verifier/check.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ func (verifier *Verifier) waitForChangeStream(ctx context.Context, csr *ChangeSt
4949
select {
5050
case <-ctx.Done():
5151
return ctx.Err()
52-
case err := <-csr.errChan:
52+
case <-csr.error.Ready():
53+
err := csr.error.Get().MustGet()
54+
5355
verifier.logger.Warn().Err(err).
5456
Msgf("Received error from %s.", csr)
5557
return err
@@ -84,12 +86,14 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error {
8486
cancelableCtx, canceler := context.WithCancelCause(ctxIn)
8587
eg, ctx := errgroup.WithContext(cancelableCtx)
8688

87-
// If the change stream fails, everything should stop.
89+
// If a change stream fails, everything should stop.
8890
eg.Go(func() error {
8991
select {
90-
case err := <-verifier.srcChangeStreamReader.errChan:
92+
case <-verifier.srcChangeStreamReader.error.Ready():
93+
err := verifier.srcChangeStreamReader.error.Get().MustGet()
9194
return errors.Wrapf(err, "%s failed", verifier.srcChangeStreamReader)
92-
case err := <-verifier.dstChangeStreamReader.errChan:
95+
case <-verifier.dstChangeStreamReader.error.Ready():
96+
err := verifier.dstChangeStreamReader.error.Get().MustGet()
9397
return errors.Wrapf(err, "%s failed", verifier.dstChangeStreamReader)
9498
case <-ctx.Done():
9599
return nil

internal/verifier/migration_verifier.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,14 +281,16 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
281281
// might be inserting docs into the recheck queue, which happens
282282
// under the lock.
283283
select {
284-
case err := <-verifier.srcChangeStreamReader.errChan:
284+
case <-verifier.srcChangeStreamReader.error.Ready():
285+
err := verifier.srcChangeStreamReader.error.Get().MustGet()
285286
return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.srcChangeStreamReader)
286287
default:
287288
verifier.srcChangeStreamReader.writesOffTs.Set(srcFinalTs)
288289
}
289290

290291
select {
291-
case err := <-verifier.dstChangeStreamReader.errChan:
292+
case <-verifier.dstChangeStreamReader.error.Ready():
293+
err := verifier.dstChangeStreamReader.error.Get().MustGet()
292294
return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.dstChangeStreamReader)
293295
default:
294296
verifier.dstChangeStreamReader.writesOffTs.Set(dstFinalTs)

0 commit comments

Comments
 (0)