Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions internal/util/eventual.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package util

import (
"sync"

"github.com/10gen/migration-verifier/option"
)

// Eventual solves the “one writer, many readers” problem: a value gets
Expand All @@ -14,7 +12,7 @@ import (
// generalized to any data type.
type Eventual[T any] struct {
ready chan struct{}
val option.Option[T]
val T
mux sync.RWMutex
}

Expand All @@ -37,12 +35,14 @@ func (e *Eventual[T]) Get() T {
e.mux.RLock()
defer e.mux.RUnlock()

val, has := e.val.Get()
if has {
return val
// If the ready channel is still open then there’s no value yet,
// which means this method should not have been called.
select {
case <-e.ready:
return e.val
default:
panic("Eventual's Get() called before value was ready.")
Comment on lines +40 to +44
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that Eventual can only be read once per write. The comment says it's like context.Context’s Done() and Err(), is that changed with this PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test that confirms that a 2nd time works.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s not changed … I added a test that confirms that a 2nd read works.

}

panic("Eventual's Get() called before value was ready.")
}

// Set sets the Eventual’s value. It may be called only once;
Expand All @@ -51,13 +51,16 @@ func (e *Eventual[T]) Set(val T) {
e.mux.Lock()
defer e.mux.Unlock()

if e.val.IsSome() {
select {
case <-e.ready:
panic("Tried to set an eventual twice!")
default:
}

// NB: This *must* happen before the close(), or else a fast reader may
// not see this value.
e.val = option.Some(val)
e.val = val

// This allows Get() to work:
close(e.ready)
}
30 changes: 28 additions & 2 deletions internal/util/eventual_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ func (s *UnitTestSuite) TestEventual() {
select {
case <-eventual.Ready():
s.Require().Fail("should not be ready")
case <-time.NewTimer(time.Second).C:
case <-time.NewTimer(time.Millisecond).C:
}

eventual.Set(123)

select {
case <-eventual.Ready():
case <-time.NewTimer(time.Second).C:
case <-time.NewTimer(time.Millisecond).C:
s.Require().Fail("should be ready")
}

Expand All @@ -31,4 +31,30 @@ func (s *UnitTestSuite) TestEventual() {
eventual.Get(),
"Get() should return the value",
)

s.Assert().Equal(
123,
eventual.Get(),
"Get() should return the value a 2nd time",
)
}

func (s *UnitTestSuite) TestEventualNil() {
eventual := NewEventual[error]()

select {
case <-eventual.Ready():
s.Require().Fail("should not be ready")
case <-time.NewTimer(time.Millisecond).C:
}

eventual.Set(nil)

select {
case <-eventual.Ready():
case <-time.NewTimer(time.Millisecond).C:
s.Require().Fail("should be ready")
}

s.Assert().Nil(eventual.Get())
}
28 changes: 2 additions & 26 deletions internal/verifier/change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"golang.org/x/sync/errgroup"
)

type ddlEventHandling string
Expand All @@ -33,15 +34,12 @@ const (
type changeReader interface {
getWhichCluster() whichCluster
getReadChannel() <-chan changeEventBatch
getError() *util.Eventual[error]
getStartTimestamp() option.Option[bson.Timestamp]
getEventsPerSecond() option.Option[float64]
getLag() option.Option[time.Duration]
getBufferSaturation() float64
setWritesOff(bson.Timestamp)
setPersistorError(error)
start(context.Context) error
done() <-chan struct{}
start(context.Context, *errgroup.Group) error
persistResumeToken(context.Context, bson.Raw) error
isRunning() bool
String() string
Expand All @@ -63,9 +61,6 @@ type ChangeReaderCommon struct {
running bool
changeEventBatchChan chan changeEventBatch
writesOffTs *util.Eventual[bson.Timestamp]
readerError *util.Eventual[error]
persistorError *util.Eventual[error]
doneChan chan struct{}

startAtTs *bson.Timestamp

Expand All @@ -79,14 +74,6 @@ func (rc *ChangeReaderCommon) getWhichCluster() whichCluster {
return rc.readerType
}

func (rc *ChangeReaderCommon) setPersistorError(err error) {
rc.persistorError.Set(err)
}

func (rc *ChangeReaderCommon) getError() *util.Eventual[error] {
return rc.readerError
}

func (rc *ChangeReaderCommon) getStartTimestamp() option.Option[bson.Timestamp] {
return option.FromPointer(rc.startAtTs)
}
Expand All @@ -103,10 +90,6 @@ func (rc *ChangeReaderCommon) getReadChannel() <-chan changeEventBatch {
return rc.changeEventBatchChan
}

func (rc *ChangeReaderCommon) done() <-chan struct{} {
return rc.doneChan
}

// getBufferSaturation returns the reader’s internal buffer’s saturation level
// as a fraction. If saturation rises, that means we’re reading events faster
// than we can persist them.
Expand Down Expand Up @@ -224,13 +207,6 @@ func (rc *ChangeReaderCommon) logIgnoredDDL(rawEvent bson.Raw) {
Msg("Ignoring event with unrecognized type on destination. (It’s assumedly internal to the migration.)")
}

func (rc *ChangeReaderCommon) wrapPersistorErrorForReader() error {
return errors.Wrap(
rc.persistorError.Get(),
"event persistor failed, so no more events can be processed",
)
}

func addTimestampToLogEvent(ts bson.Timestamp, event *zerolog.Event) *zerolog.Event {
return event.
Any("timestamp", ts).
Expand Down
Loading