Skip to content

Commit 7dda460

Browse files
committed
Convert change stream err and writesOffTs channels to Eventuals.
1 parent 4302d79 commit 7dda460

File tree

5 files changed

+82
-22
lines changed

5 files changed

+82
-22
lines changed

internal/util/eventual.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package util
2+
3+
import (
4+
"sync"
5+
6+
"github.com/10gen/migration-verifier/option"
7+
)
8+
9+
// Eventual represents a value that isn’t available when this struct is created
10+
// but can be awaited via a channel.
11+
//
12+
// This is much like how context.Context’s Done() and Err() methods work.
13+
// It’s useful to await a value’s readiness via channel but then read it
14+
// multiple times.
15+
type Eventual[T any] struct {
16+
ready chan struct{}
17+
val option.Option[T]
18+
mux sync.RWMutex
19+
}
20+
21+
// NewEventual creates an Eventual and returns a pointer
22+
// to it.
23+
func NewEventual[T any]() *Eventual[T] {
24+
return &Eventual[T]{
25+
ready: make(chan struct{}),
26+
}
27+
}
28+
29+
// Ready returns a channel that closes once the Eventual’s value is ready.
30+
func (e *Eventual[T]) Ready() <-chan struct{} {
31+
return e.ready
32+
}
33+
34+
// Get returns an option that contains the Eventual’s value, or
35+
// empty if the value isn’t ready yet.
36+
func (e *Eventual[T]) Get() option.Option[T] {
37+
e.mux.RLock()
38+
defer e.mux.RUnlock()
39+
40+
return e.val
41+
}
42+
43+
// Set
44+
func (e *Eventual[T]) Set(val T) {
45+
e.mux.Lock()
46+
defer e.mux.Unlock()
47+
48+
if e.val.IsSome() {
49+
panic("Double set on eventual!")
50+
}
51+
52+
e.val = option.Some(val)
53+
54+
close(e.ready)
55+
}

internal/verifier/change_stream.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ type ChangeStreamReader struct {
6666

6767
changeStreamRunning bool
6868
changeEventBatchChan chan []ParsedEvent
69-
writesOffTsChan chan primitive.Timestamp
70-
errChan chan error
69+
writesOffTs *util.Eventual[primitive.Timestamp]
70+
error *util.Eventual[error]
7171
doneChan chan struct{}
7272

7373
startAtTs *primitive.Timestamp
@@ -83,8 +83,8 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
8383
clusterInfo: *verifier.srcClusterInfo,
8484
changeStreamRunning: false,
8585
changeEventBatchChan: make(chan []ParsedEvent),
86-
writesOffTsChan: make(chan primitive.Timestamp),
87-
errChan: make(chan error),
86+
writesOffTs: util.NewEventual[primitive.Timestamp](),
87+
error: util.NewEventual[error](),
8888
doneChan: make(chan struct{}),
8989
}
9090
verifier.dstChangeStreamReader = &ChangeStreamReader{
@@ -96,8 +96,8 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
9696
clusterInfo: *verifier.dstClusterInfo,
9797
changeStreamRunning: false,
9898
changeEventBatchChan: make(chan []ParsedEvent),
99-
writesOffTsChan: make(chan primitive.Timestamp),
100-
errChan: make(chan error),
99+
writesOffTs: util.NewEventual[primitive.Timestamp](),
100+
error: util.NewEventual[error](),
101101
doneChan: make(chan struct{}),
102102
}
103103
}
@@ -117,7 +117,6 @@ func (verifier *Verifier) StartChangeEventHandler(ctx context.Context, reader *C
117117
verifier.logger.Trace().Msgf("Verifier is handling a change event batch from %s: %v", reader, batch)
118118
err := verifier.HandleChangeStreamEvents(ctx, batch, reader.readerType)
119119
if err != nil {
120-
reader.errChan <- err
121120
return err
122121
}
123122
}
@@ -336,7 +335,9 @@ func (csr *ChangeStreamReader) iterateChangeStream(
336335
// source writes are ended and the migration tool is finished / committed.
337336
// This means we should exit rather than continue reading the change stream
338337
// since there should be no more events.
339-
case writesOffTs := <-csr.writesOffTsChan:
338+
case <-csr.writesOffTs.Ready():
339+
writesOffTs := csr.writesOffTs.Get().MustGet()
340+
340341
csr.logger.Debug().
341342
Interface("writesOffTimestamp", writesOffTs).
342343
Msgf("%s thread received writesOff timestamp. Finalizing change stream.", csr)
@@ -389,7 +390,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(
389390
}
390391
// since we have started Recheck, we must signal that we have
391392
// finished the change stream changes so that Recheck can continue.
392-
csr.doneChan <- struct{}{}
393+
close(csr.doneChan)
393394
break
394395
}
395396
}
@@ -525,10 +526,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
525526
)
526527

527528
if err != nil {
528-
// NB: This failure always happens after the initial change stream
529-
// creation.
530-
csr.errChan <- err
531-
close(csr.errChan)
529+
csr.error.Set(err)
532530
}
533531
}()
534532

internal/verifier/change_stream_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() {
319319
suite.Require().NotNil(origStartTs)
320320
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
321321
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
322-
verifier.srcChangeStreamReader.writesOffTsChan <- *origStartTs
322+
verifier.srcChangeStreamReader.writesOffTs.Set(*origStartTs)
323323
<-verifier.srcChangeStreamReader.doneChan
324324
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
325325
}
@@ -365,7 +365,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
365365
"session time after events should exceed the original",
366366
)
367367

368-
verifier.srcChangeStreamReader.writesOffTsChan <- *postEventsSessionTime
368+
verifier.srcChangeStreamReader.writesOffTs.Set(*postEventsSessionTime)
369369
<-verifier.srcChangeStreamReader.doneChan
370370

371371
suite.Assert().Equal(

internal/verifier/check.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ func (verifier *Verifier) waitForChangeStream(ctx context.Context, csr *ChangeSt
4949
select {
5050
case <-ctx.Done():
5151
return ctx.Err()
52-
case err := <-csr.errChan:
52+
case <-csr.error.Ready():
53+
err := csr.error.Get().MustGet()
5354
verifier.logger.Warn().Err(err).
5455
Msgf("Received error from %s.", csr)
5556
return err
@@ -87,9 +88,11 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error {
8788
// If the change stream fails, everything should stop.
8889
eg.Go(func() error {
8990
select {
90-
case err := <-verifier.srcChangeStreamReader.errChan:
91+
case <-verifier.srcChangeStreamReader.error.Ready():
92+
err := verifier.srcChangeStreamReader.error.Get().MustGet()
9193
return errors.Wrapf(err, "%s failed", verifier.srcChangeStreamReader)
92-
case err := <-verifier.dstChangeStreamReader.errChan:
94+
case <-verifier.dstChangeStreamReader.error.Ready():
95+
err := verifier.dstChangeStreamReader.error.Get().MustGet()
9396
return errors.Wrapf(err, "%s failed", verifier.dstChangeStreamReader)
9497
case <-ctx.Done():
9598
return nil

internal/verifier/migration_verifier.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -279,15 +279,19 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
279279
// might be inserting docs into the recheck queue, which happens
280280
// under the lock.
281281
select {
282-
case verifier.srcChangeStreamReader.writesOffTsChan <- srcFinalTs:
283-
case err := <-verifier.srcChangeStreamReader.errChan:
282+
case <-verifier.srcChangeStreamReader.error.Ready():
283+
err := verifier.srcChangeStreamReader.error.Get().MustGet()
284284
return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.srcChangeStreamReader)
285+
default:
286+
verifier.srcChangeStreamReader.writesOffTs.Set(srcFinalTs)
285287
}
286288

287289
select {
288-
case verifier.dstChangeStreamReader.writesOffTsChan <- dstFinalTs:
289-
case err := <-verifier.dstChangeStreamReader.errChan:
290+
case <-verifier.dstChangeStreamReader.error.Ready():
291+
err := verifier.dstChangeStreamReader.error.Get().MustGet()
290292
return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.dstChangeStreamReader)
293+
default:
294+
verifier.dstChangeStreamReader.writesOffTs.Set(dstFinalTs)
291295
}
292296

293297
return nil

0 commit comments

Comments
 (0)