Skip to content

Commit 2325945

Browse files
authored
REP-6804 Simplify change handling logic (#173)
Previously the code maintained: - an Eventual for the error in persisting rechecks - an Eventual for the error in reading changes - an empty channel to indicate when the change reader is done This is overly complex. This changeset simplifies the above. Now there is a single Eventual that holds the final status of both readers & both persistors. Together the reader & persistor threads are called “change handling”. This also fixes a small bug in Eventual: previously it was impossible to store a nil value because Eventual was implemented via Option. Now it’s possible to store nil.
1 parent 434b11b commit 2325945

File tree

9 files changed

+205
-211
lines changed

9 files changed

+205
-211
lines changed

internal/util/eventual.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package util
22

33
import (
44
"sync"
5-
6-
"github.com/10gen/migration-verifier/option"
75
)
86

97
// Eventual solves the “one writer, many readers” problem: a value gets
@@ -14,7 +12,7 @@ import (
1412
// generalized to any data type.
1513
type Eventual[T any] struct {
1614
ready chan struct{}
17-
val option.Option[T]
15+
val T
1816
mux sync.RWMutex
1917
}
2018

@@ -37,12 +35,14 @@ func (e *Eventual[T]) Get() T {
3735
e.mux.RLock()
3836
defer e.mux.RUnlock()
3937

40-
val, has := e.val.Get()
41-
if has {
42-
return val
38+
// If the ready channel is still open then there’s no value yet,
39+
// which means this method should not have been called.
40+
select {
41+
case <-e.ready:
42+
return e.val
43+
default:
44+
panic("Eventual's Get() called before value was ready.")
4345
}
44-
45-
panic("Eventual's Get() called before value was ready.")
4646
}
4747

4848
// Set sets the Eventual’s value. It may be called only once;
@@ -51,13 +51,16 @@ func (e *Eventual[T]) Set(val T) {
5151
e.mux.Lock()
5252
defer e.mux.Unlock()
5353

54-
if e.val.IsSome() {
54+
select {
55+
case <-e.ready:
5556
panic("Tried to set an eventual twice!")
57+
default:
5658
}
5759

5860
// NB: This *must* happen before the close(), or else a fast reader may
5961
// not see this value.
60-
e.val = option.Some(val)
62+
e.val = val
6163

64+
// This allows Get() to work:
6265
close(e.ready)
6366
}

internal/util/eventual_test.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ func (s *UnitTestSuite) TestEventual() {
1515
select {
1616
case <-eventual.Ready():
1717
s.Require().Fail("should not be ready")
18-
case <-time.NewTimer(time.Second).C:
18+
case <-time.NewTimer(time.Millisecond).C:
1919
}
2020

2121
eventual.Set(123)
2222

2323
select {
2424
case <-eventual.Ready():
25-
case <-time.NewTimer(time.Second).C:
25+
case <-time.NewTimer(time.Millisecond).C:
2626
s.Require().Fail("should be ready")
2727
}
2828

@@ -31,4 +31,30 @@ func (s *UnitTestSuite) TestEventual() {
3131
eventual.Get(),
3232
"Get() should return the value",
3333
)
34+
35+
s.Assert().Equal(
36+
123,
37+
eventual.Get(),
38+
"Get() should return the value a 2nd time",
39+
)
40+
}
41+
42+
func (s *UnitTestSuite) TestEventualNil() {
43+
eventual := NewEventual[error]()
44+
45+
select {
46+
case <-eventual.Ready():
47+
s.Require().Fail("should not be ready")
48+
case <-time.NewTimer(time.Millisecond).C:
49+
}
50+
51+
eventual.Set(nil)
52+
53+
select {
54+
case <-eventual.Ready():
55+
case <-time.NewTimer(time.Millisecond).C:
56+
s.Require().Fail("should be ready")
57+
}
58+
59+
s.Assert().Nil(eventual.Get())
3460
}

internal/verifier/change_reader.go

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.mongodb.org/mongo-driver/v2/bson"
1616
"go.mongodb.org/mongo-driver/v2/mongo"
1717
"go.mongodb.org/mongo-driver/v2/mongo/options"
18+
"golang.org/x/sync/errgroup"
1819
)
1920

2021
type ddlEventHandling string
@@ -33,15 +34,12 @@ const (
3334
type changeReader interface {
3435
getWhichCluster() whichCluster
3536
getReadChannel() <-chan changeEventBatch
36-
getError() *util.Eventual[error]
3737
getStartTimestamp() option.Option[bson.Timestamp]
3838
getEventsPerSecond() option.Option[float64]
3939
getLag() option.Option[time.Duration]
4040
getBufferSaturation() float64
4141
setWritesOff(bson.Timestamp)
42-
setPersistorError(error)
43-
start(context.Context) error
44-
done() <-chan struct{}
42+
start(context.Context, *errgroup.Group) error
4543
persistResumeToken(context.Context, bson.Raw) error
4644
isRunning() bool
4745
String() string
@@ -63,9 +61,6 @@ type ChangeReaderCommon struct {
6361
running bool
6462
changeEventBatchChan chan changeEventBatch
6563
writesOffTs *util.Eventual[bson.Timestamp]
66-
readerError *util.Eventual[error]
67-
persistorError *util.Eventual[error]
68-
doneChan chan struct{}
6964

7065
startAtTs *bson.Timestamp
7166

@@ -79,14 +74,6 @@ func (rc *ChangeReaderCommon) getWhichCluster() whichCluster {
7974
return rc.readerType
8075
}
8176

82-
func (rc *ChangeReaderCommon) setPersistorError(err error) {
83-
rc.persistorError.Set(err)
84-
}
85-
86-
func (rc *ChangeReaderCommon) getError() *util.Eventual[error] {
87-
return rc.readerError
88-
}
89-
9077
func (rc *ChangeReaderCommon) getStartTimestamp() option.Option[bson.Timestamp] {
9178
return option.FromPointer(rc.startAtTs)
9279
}
@@ -103,10 +90,6 @@ func (rc *ChangeReaderCommon) getReadChannel() <-chan changeEventBatch {
10390
return rc.changeEventBatchChan
10491
}
10592

106-
func (rc *ChangeReaderCommon) done() <-chan struct{} {
107-
return rc.doneChan
108-
}
109-
11093
// getBufferSaturation returns the reader’s internal buffer’s saturation level
11194
// as a fraction. If saturation rises, that means we’re reading events faster
11295
// than we can persist them.
@@ -224,13 +207,6 @@ func (rc *ChangeReaderCommon) logIgnoredDDL(rawEvent bson.Raw) {
224207
Msg("Ignoring event with unrecognized type on destination. (It’s assumedly internal to the migration.)")
225208
}
226209

227-
func (rc *ChangeReaderCommon) wrapPersistorErrorForReader() error {
228-
return errors.Wrap(
229-
rc.persistorError.Get(),
230-
"event persistor failed, so no more events can be processed",
231-
)
232-
}
233-
234210
func addTimestampToLogEvent(ts bson.Timestamp, event *zerolog.Event) *zerolog.Event {
235211
return event.
236212
Any("timestamp", ts).

0 commit comments

Comments
 (0)