Skip to content

Commit 0e200cf

Browse files
committed
eventual feels better
1 parent 6d7bd6b commit 0e200cf

File tree

6 files changed

+61
-106
lines changed

6 files changed

+61
-106
lines changed

internal/replay/replay.go

Lines changed: 0 additions & 42 deletions
This file was deleted.

internal/replay/replay_test.go

Lines changed: 0 additions & 46 deletions
This file was deleted.

internal/util/eventual.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package util
2+
3+
import (
4+
"sync"
5+
6+
"github.com/10gen/migration-verifier/option"
7+
)
8+
9+
// Eventual represents a value that isn’t available when
10+
// this struct is created but can be awaited via a channel.
11+
type Eventual[T any] struct {
12+
ready chan struct{}
13+
val option.Option[T]
14+
mux sync.RWMutex
15+
}
16+
17+
// NewEventual creates an Eventual and returns a pointer
18+
// to it.
19+
func NewEventual[T any]() *Eventual[T] {
20+
return &Eventual[T]{
21+
ready: make(chan struct{}),
22+
}
23+
}
24+
25+
// Ready returns a channel that closes once the Eventual’s value is ready.
26+
func (e *Eventual[T]) Ready() <-chan struct{} {
27+
return e.ready
28+
}
29+
30+
// Get returns an option that contains the Eventual’s value, or
31+
// empty if the value isn’t ready yet.
32+
func (e *Eventual[T]) Get() option.Option[T] {
33+
e.mux.RLock()
34+
defer e.mux.RUnlock()
35+
36+
return e.val
37+
}
38+
39+
// Set
40+
func (e *Eventual[T]) Set(val T) {
41+
e.mux.Lock()
42+
defer e.mux.Unlock()
43+
44+
if e.val.IsSome() {
45+
panic("Double set on eventual!")
46+
}
47+
48+
e.val = option.Some(val)
49+
50+
close(e.ready)
51+
}

internal/verifier/change_stream.go

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77

88
"github.com/10gen/migration-verifier/internal/keystring"
99
"github.com/10gen/migration-verifier/internal/logger"
10-
"github.com/10gen/migration-verifier/internal/replay"
1110
"github.com/10gen/migration-verifier/internal/retry"
1211
"github.com/10gen/migration-verifier/internal/util"
1312
"github.com/10gen/migration-verifier/msync"
@@ -69,8 +68,7 @@ type ChangeStreamReader struct {
6968

7069
changeStreamRunning bool
7170
changeEventBatchChan chan []ParsedEvent
72-
writesOffTsChan <-chan primitive.Timestamp
73-
writesOffTsWriteFunc func(primitive.Timestamp)
71+
writesOffTs *util.Eventual[primitive.Timestamp]
7472
errChan chan error
7573
doneChan chan struct{}
7674

@@ -80,7 +78,6 @@ type ChangeStreamReader struct {
8078
}
8179

8280
func (verifier *Verifier) initializeChangeStreamReaders(ctx context.Context) {
83-
srcWritesOffReaderChan, srcWritesOffWriter := replay.CreateChannel[primitive.Timestamp](ctx)
8481

8582
verifier.srcChangeStreamReader = &ChangeStreamReader{
8683
readerType: src,
@@ -91,15 +88,12 @@ func (verifier *Verifier) initializeChangeStreamReaders(ctx context.Context) {
9188
clusterInfo: *verifier.srcClusterInfo,
9289
changeStreamRunning: false,
9390
changeEventBatchChan: make(chan []ParsedEvent),
94-
writesOffTsChan: srcWritesOffReaderChan,
95-
writesOffTsWriteFunc: srcWritesOffWriter,
91+
writesOffTs: util.NewEventual[primitive.Timestamp](),
9692
errChan: make(chan error),
9793
doneChan: make(chan struct{}),
9894
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
9995
}
10096

101-
dstWritesOffReaderChan, dstWritesOffWriter := replay.CreateChannel[primitive.Timestamp](ctx)
102-
10397
verifier.dstChangeStreamReader = &ChangeStreamReader{
10498
readerType: dst,
10599
logger: verifier.logger,
@@ -109,8 +103,7 @@ func (verifier *Verifier) initializeChangeStreamReaders(ctx context.Context) {
109103
clusterInfo: *verifier.dstClusterInfo,
110104
changeStreamRunning: false,
111105
changeEventBatchChan: make(chan []ParsedEvent),
112-
writesOffTsChan: dstWritesOffReaderChan,
113-
writesOffTsWriteFunc: dstWritesOffWriter,
106+
writesOffTs: util.NewEventual[primitive.Timestamp](),
114107
errChan: make(chan error),
115108
doneChan: make(chan struct{}),
116109
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
@@ -363,10 +356,9 @@ func (csr *ChangeStreamReader) iterateChangeStream(
363356
// source writes are ended and the migration tool is finished / committed.
364357
// This means we should exit rather than continue reading the change stream
365358
// since there should be no more events.
366-
case writesOffTs, more := <-csr.writesOffTsChan:
367-
if !more {
368-
panic(csr.String() + ": attempted read from already-closed writesOffTsChan")
369-
}
359+
case <-csr.writesOffTs.Ready():
360+
writesOffTs := csr.writesOffTs.Get().MustGet()
361+
370362
csr.logger.Debug().
371363
Interface("writesOffTimestamp", writesOffTs).
372364
Msgf("%s thread received writesOff timestamp. Finalizing change stream.", csr)

internal/verifier/change_stream_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() {
318318
suite.Require().NotNil(origStartTs)
319319
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
320320
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
321-
verifier.srcChangeStreamReader.writesOffTsWriteFunc(*origStartTs)
321+
verifier.srcChangeStreamReader.writesOffTs.Set(*origStartTs)
322322
<-verifier.srcChangeStreamReader.doneChan
323323
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
324324
}
@@ -364,7 +364,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
364364
"session time after events should exceed the original",
365365
)
366366

367-
verifier.srcChangeStreamReader.writesOffTsWriteFunc(*postEventsSessionTime)
367+
verifier.srcChangeStreamReader.writesOffTs.Set(*postEventsSessionTime)
368368
<-verifier.srcChangeStreamReader.doneChan
369369

370370
suite.Assert().Equal(

internal/verifier/migration_verifier.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,14 +284,14 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
284284
case err := <-verifier.srcChangeStreamReader.errChan:
285285
return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.srcChangeStreamReader)
286286
default:
287-
verifier.srcChangeStreamReader.writesOffTsWriteFunc(srcFinalTs)
287+
verifier.srcChangeStreamReader.writesOffTs.Set(srcFinalTs)
288288
}
289289

290290
select {
291291
case err := <-verifier.dstChangeStreamReader.errChan:
292292
return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.dstChangeStreamReader)
293293
default:
294-
verifier.dstChangeStreamReader.writesOffTsWriteFunc(dstFinalTs)
294+
verifier.dstChangeStreamReader.writesOffTs.Set(dstFinalTs)
295295
}
296296

297297
return nil

0 commit comments

Comments
 (0)