Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions internal/util/eventual.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package util

import (
"sync"

"github.com/10gen/migration-verifier/option"
)

// Eventual solves the “one writer, many readers” problem: a value gets
Expand All @@ -14,7 +12,7 @@ import (
// generalized to any data type.
type Eventual[T any] struct {
ready chan struct{}
val option.Option[T]
val T
mux sync.RWMutex
}

Expand All @@ -37,12 +35,12 @@ func (e *Eventual[T]) Get() T {
e.mux.RLock()
defer e.mux.RUnlock()

val, has := e.val.Get()
if has {
return val
select {
case <-e.ready:
return e.val
default:
panic("Eventual's Get() called before value was ready.")
Comment on lines +40 to +44
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that Eventual can only be read once per write. The comment says it's like context.Context’s Done() and Err(), is that changed with this PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test that confirms that a 2nd time works.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s not changed … I added a test that confirms that a 2nd read works.

}

panic("Eventual's Get() called before value was ready.")
}

// Set sets the Eventual’s value. It may be called only once;
Expand All @@ -51,13 +49,15 @@ func (e *Eventual[T]) Set(val T) {
e.mux.Lock()
defer e.mux.Unlock()

if e.val.IsSome() {
select {
case <-e.ready:
panic("Tried to set an eventual twice!")
default:
}

// NB: This *must* happen before the close(), or else a fast reader may
// not see this value.
e.val = option.Some(val)
e.val = val

close(e.ready)
}
24 changes: 22 additions & 2 deletions internal/util/eventual_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ func (s *UnitTestSuite) TestEventual() {
select {
case <-eventual.Ready():
s.Require().Fail("should not be ready")
case <-time.NewTimer(time.Second).C:
case <-time.NewTimer(time.Millisecond).C:
}

eventual.Set(123)

select {
case <-eventual.Ready():
case <-time.NewTimer(time.Second).C:
case <-time.NewTimer(time.Millisecond).C:
s.Require().Fail("should be ready")
}

Expand All @@ -32,3 +32,23 @@ func (s *UnitTestSuite) TestEventual() {
"Get() should return the value",
)
}

func (s *UnitTestSuite) TestEventualNil() {
eventual := NewEventual[error]()

select {
case <-eventual.Ready():
s.Require().Fail("should not be ready")
case <-time.NewTimer(time.Millisecond).C:
}

eventual.Set(nil)

select {
case <-eventual.Ready():
case <-time.NewTimer(time.Millisecond).C:
s.Require().Fail("should be ready")
}

s.Assert().Nil(eventual.Get())
}
28 changes: 2 additions & 26 deletions internal/verifier/change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"golang.org/x/sync/errgroup"
)

type ddlEventHandling string
Expand All @@ -33,15 +34,12 @@ const (
type changeReader interface {
getWhichCluster() whichCluster
getReadChannel() <-chan changeEventBatch
getError() *util.Eventual[error]
getStartTimestamp() option.Option[bson.Timestamp]
getEventsPerSecond() option.Option[float64]
getLag() option.Option[time.Duration]
getBufferSaturation() float64
setWritesOff(bson.Timestamp)
setPersistorError(error)
start(context.Context) error
done() <-chan struct{}
start(context.Context, *errgroup.Group) error
persistResumeToken(context.Context, bson.Raw) error
isRunning() bool
String() string
Expand All @@ -63,9 +61,6 @@ type ChangeReaderCommon struct {
running bool
changeEventBatchChan chan changeEventBatch
writesOffTs *util.Eventual[bson.Timestamp]
readerError *util.Eventual[error]
persistorError *util.Eventual[error]
doneChan chan struct{}

startAtTs *bson.Timestamp

Expand All @@ -79,14 +74,6 @@ func (rc *ChangeReaderCommon) getWhichCluster() whichCluster {
return rc.readerType
}

func (rc *ChangeReaderCommon) setPersistorError(err error) {
rc.persistorError.Set(err)
}

func (rc *ChangeReaderCommon) getError() *util.Eventual[error] {
return rc.readerError
}

func (rc *ChangeReaderCommon) getStartTimestamp() option.Option[bson.Timestamp] {
return option.FromPointer(rc.startAtTs)
}
Expand All @@ -103,10 +90,6 @@ func (rc *ChangeReaderCommon) getReadChannel() <-chan changeEventBatch {
return rc.changeEventBatchChan
}

func (rc *ChangeReaderCommon) done() <-chan struct{} {
return rc.doneChan
}

// getBufferSaturation returns the reader’s internal buffer’s saturation level
// as a fraction. If saturation rises, that means we’re reading events faster
// than we can persist them.
Expand Down Expand Up @@ -224,13 +207,6 @@ func (rc *ChangeReaderCommon) logIgnoredDDL(rawEvent bson.Raw) {
Msg("Ignoring event with unrecognized type on destination. (It’s assumedly internal to the migration.)")
}

func (rc *ChangeReaderCommon) wrapPersistorErrorForReader() error {
return errors.Wrap(
rc.persistorError.Get(),
"event persistor failed, so no more events can be processed",
)
}

func addTimestampToLogEvent(ts bson.Timestamp, event *zerolog.Event) *zerolog.Event {
return event.
Any("timestamp", ts).
Expand Down
149 changes: 53 additions & 96 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@ import (
"fmt"
"time"

"github.com/10gen/migration-verifier/history"
"github.com/10gen/migration-verifier/internal/keystring"
"github.com/10gen/migration-verifier/internal/retry"
"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/mbson"
"github.com/10gen/migration-verifier/mslices"
"github.com/10gen/migration-verifier/msync"
"github.com/10gen/migration-verifier/option"
mapset "github.com/deckarep/golang-set/v2"
clone "github.com/huandu/go-clone/generic"
Expand All @@ -21,6 +18,7 @@ import (
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
)

var supportedEventOpTypes = mapset.NewSet(
Expand Down Expand Up @@ -49,43 +47,6 @@ type ChangeStreamReader struct {

var _ changeReader = &ChangeStreamReader{}

func (verifier *Verifier) initializeChangeReaders() {
srcReader := &ChangeStreamReader{
ChangeReaderCommon: ChangeReaderCommon{
readerType: src,
namespaces: verifier.srcNamespaces,
watcherClient: verifier.srcClient,
clusterInfo: *verifier.srcClusterInfo,
},
}
verifier.srcChangeReader = srcReader

dstReader := &ChangeStreamReader{
ChangeReaderCommon: ChangeReaderCommon{
readerType: dst,
namespaces: verifier.dstNamespaces,
watcherClient: verifier.dstClient,
clusterInfo: *verifier.dstClusterInfo,
onDDLEvent: onDDLEventAllow,
},
}
verifier.dstChangeReader = dstReader

// Common elements in both readers:
for _, csr := range mslices.Of(srcReader, dstReader) {
csr.logger = verifier.logger
csr.metaDB = verifier.metaClient.Database(verifier.metaDBName)
csr.changeEventBatchChan = make(chan changeEventBatch, batchChanBufferSize)
csr.writesOffTs = util.NewEventual[bson.Timestamp]()
csr.readerError = util.NewEventual[error]()
csr.persistorError = util.NewEventual[error]()
csr.doneChan = make(chan struct{})
csr.lag = msync.NewTypedAtomic(option.None[time.Duration]())
csr.batchSizeHistory = history.New[int](time.Minute)
csr.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken
}
}

// GetChangeStreamFilter returns an aggregation pipeline that filters
// namespaces as per configuration.
//
Expand Down Expand Up @@ -265,8 +226,6 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
select {
case <-ctx.Done():
return util.WrapCtxErrWithCause(ctx)
case <-csr.persistorError.Ready():
return csr.wrapPersistorErrorForReader()
case csr.changeEventBatchChan <- changeEventBatch{
events: changeEvents,

Expand Down Expand Up @@ -305,9 +264,6 @@ func (csr *ChangeStreamReader) iterateChangeStream(

return err

case <-csr.persistorError.Ready():
return csr.wrapPersistorErrorForReader()

// If the ChangeStreamEnderChan has a message, the user has indicated that
// source writes are ended and the migration tool is finished / committed.
// This means we should exit rather than continue reading the change stream
Expand Down Expand Up @@ -361,9 +317,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(
if csr.lastChangeEventTime != nil {
csr.startAtTs = csr.lastChangeEventTime
}
// since we have started Recheck, we must signal that we have
// finished the change stream changes so that Recheck can continue.
close(csr.doneChan)

break
}
}
Expand Down Expand Up @@ -467,79 +421,82 @@ func (csr *ChangeStreamReader) createChangeStream(
}

// StartChangeStream starts the change stream.
func (csr *ChangeStreamReader) start(ctx context.Context) error {
func (csr *ChangeStreamReader) start(
ctx context.Context,
eg *errgroup.Group,
) error {
// This channel holds the first change stream creation's result, whether
// success or failure. Rather than using a Result we could make separate
// Timestamp and error channels, but the single channel is cleaner since
// there's no chance of "nonsense" like both channels returning a payload.
initialCreateResultChan := make(chan mo.Result[bson.Timestamp])

go func() {
// Closing changeEventBatchChan at the end of change stream goroutine
// notifies the verifier's change event handler to exit.
defer func() {
csr.logger.Debug().
Stringer("changeStreamReader", csr).
Msg("Closing change event batch channel.")
eg.Go(
func() error {
// Closing changeEventBatchChan at the end of change stream goroutine
// notifies the verifier's change event handler to exit.
defer func() {
csr.logger.Debug().
Stringer("changeStreamReader", csr).
Msg("Closing change event batch channel.")

close(csr.changeEventBatchChan)
}()
close(csr.changeEventBatchChan)
}()

retryer := retry.New().WithErrorCodes(util.CursorKilledErrCode)
retryer := retry.New().WithErrorCodes(util.CursorKilledErrCode)

parentThreadWaiting := true
parentThreadWaiting := true

err := retryer.WithCallback(
func(ctx context.Context, ri *retry.FuncInfo) error {
changeStream, sess, startTs, err := csr.createChangeStream(ctx)
if err != nil {
logEvent := csr.logger.Debug().
Err(err).
Stringer("changeStreamReader", csr)
err := retryer.WithCallback(
func(ctx context.Context, ri *retry.FuncInfo) error {
changeStream, sess, startTs, err := csr.createChangeStream(ctx)
if err != nil {
logEvent := csr.logger.Debug().
Err(err).
Stringer("changeStreamReader", csr)

if parentThreadWaiting {
logEvent.Msg("First change stream open failed.")
if parentThreadWaiting {
logEvent.Msg("First change stream open failed.")

initialCreateResultChan <- mo.Err[bson.Timestamp](err)
return nil
}
initialCreateResultChan <- mo.Err[bson.Timestamp](err)
return nil
}

logEvent.Msg("Retried change stream open failed.")
logEvent.Msg("Retried change stream open failed.")

return err
}
return err
}

defer changeStream.Close(ctx)
defer changeStream.Close(ctx)

logEvent := csr.logger.Debug().
Stringer("changeStreamReader", csr).
Any("startTimestamp", startTs)
logEvent := csr.logger.Debug().
Stringer("changeStreamReader", csr).
Any("startTimestamp", startTs)

if parentThreadWaiting {
logEvent.Msg("First change stream open succeeded.")
if parentThreadWaiting {
logEvent.Msg("First change stream open succeeded.")

initialCreateResultChan <- mo.Ok(startTs)
close(initialCreateResultChan)
parentThreadWaiting = false
} else {
logEvent.Msg("Retried change stream open succeeded.")
}
initialCreateResultChan <- mo.Ok(startTs)
close(initialCreateResultChan)
parentThreadWaiting = false
} else {
logEvent.Msg("Retried change stream open succeeded.")
}

return csr.iterateChangeStream(ctx, ri, changeStream, sess)
},
"running %s", csr,
).Run(ctx, csr.logger)
return csr.iterateChangeStream(ctx, ri, changeStream, sess)
},
"running %s", csr,
).Run(ctx, csr.logger)

if err != nil {
csr.readerError.Set(err)
}
}()
return err
},
)

result := <-initialCreateResultChan

startTs, err := result.Get()
if err != nil {
return err
return errors.Wrapf(err, "creating change stream")
}

csr.startAtTs = &startTs
Expand Down
Loading