Skip to content
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
90f7502
recreate
FGasper Dec 4, 2024
aea6160
revamp retryer
FGasper Dec 5, 2024
9c0cf51
note success too
FGasper Dec 5, 2024
5dfafab
real clones
FGasper Dec 5, 2024
1652778
a bit more debugging
FGasper Dec 5, 2024
b8ed20d
wording tweaks, and a return that was missing
FGasper Dec 5, 2024
ac170b4
s/failed/discrepancies
FGasper Dec 5, 2024
e424812
Merge branch 'felipe_log_lag_redux' into REP-5358-add-retry-description
FGasper Dec 6, 2024
5f9733c
fix/beef-up
FGasper Dec 6, 2024
90aa035
add more detail to retries
FGasper Dec 6, 2024
6bc2e10
tweak
FGasper Dec 6, 2024
7dda460
Convert change stream err and writesOffTs channels to Eventuals.
FGasper Dec 6, 2024
78c609f
add test for Eventual
FGasper Dec 6, 2024
9c17faf
trace for test that flapped
FGasper Dec 6, 2024
fa63ff7
godoc
FGasper Dec 6, 2024
ab10df3
Merge branch 'main' into REP-5358-retry-and-channels
FGasper Dec 6, 2024
668e194
Merge branch 'main' into REP-5358-add-retry-description
FGasper Dec 7, 2024
50708b7
Merge branch 'REP-5358-add-retry-description' into REP-5358-retry-and…
FGasper Dec 7, 2024
28b8525
fix case where context closes docs reader
FGasper Dec 7, 2024
490b2d6
add test
FGasper Dec 9, 2024
c199009
revert retryer changes
FGasper Dec 9, 2024
b2a8c6d
Merge branch 'main' into REP-5358-retry-and-channels
FGasper Dec 9, 2024
bab60a5
docs tweak
FGasper Dec 9, 2024
250a659
add test
FGasper Dec 9, 2024
be2638e
change stream test
FGasper Dec 9, 2024
41229d1
Fix flapping TestStartAtTimeNoChanges.
FGasper Dec 9, 2024
b4dad36
cluster_time.go
FGasper Dec 9, 2024
e602c9e
check err
FGasper Dec 9, 2024
3b2ff9f
only log once
FGasper Dec 9, 2024
d11cf4a
pruning
FGasper Dec 9, 2024
42376f5
Make Eventual Get() panic if the value is unset.
FGasper Dec 9, 2024
5b5e871
use DBNameForTest
FGasper Dec 9, 2024
f26d63d
beef up test description … why flapping?
FGasper Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions internal/testutil/testutil.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package testutil

import (
"context"
"testing"

"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
)

// Marshal wraps `bsonMarshal` with a panic on failure.
Expand Down Expand Up @@ -34,3 +40,64 @@ func convertDocsToAnys(docs []bson.D) []any {

return anys
}

func KillApplicationChangeStreams(
ctx context.Context,
t *testing.T,
client *mongo.Client,
appName string,
) error {
// Kill verifier’s change stream.
cursor, err := client.Database(
"admin",
options.Database().SetReadConcern(readconcern.Local()),
).Aggregate(
ctx,
mongo.Pipeline{
{
{"$currentOp", bson.D{
{"idleCursors", true},
}},
},
{
{"$match", bson.D{
{"clientMetadata.application.name", appName},
{"command.collection", "$cmd.aggregate"},
{"cursor.originatingCommand.pipeline.0.$_internalChangeStreamOplogMatch",
bson.D{{"$type", "object"}},
},
}},
},
},
)
if err != nil {
return errors.Wrapf(err, "failed to list %#q's change streams", appName)
}

ops := []struct {
Opid any
}{}
err = cursor.All(ctx, &ops)
if err != nil {
return errors.Wrapf(err, "failed to decode %#q's change streams", appName)
}

for _, op := range ops {
t.Logf("Killing change stream op %+v", op.Opid)

err :=
client.Database("admin").RunCommand(
ctx,
bson.D{
{"killOp", 1},
{"op", op.Opid},
},
).Err()

if err != nil {
return errors.Wrapf(err, "failed to kill change stream with opId %#q", op.Opid)
}
}

return nil
}
24 changes: 24 additions & 0 deletions internal/util/cluster_time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package util

import (
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
)

func GetClusterTimeFromSession(sess mongo.Session) (primitive.Timestamp, error) {
ctStruct := struct {
ClusterTime struct {
ClusterTime primitive.Timestamp `bson:"clusterTime"`
} `bson:"$clusterTime"`
}{}

clusterTimeRaw := sess.ClusterTime()
err := bson.Unmarshal(sess.ClusterTime(), &ctStruct)
if err != nil {
return primitive.Timestamp{}, errors.Wrapf(err, "failed to find clusterTime in session cluster time document (%v)", clusterTimeRaw)
}

return ctStruct.ClusterTime.ClusterTime, nil
}
58 changes: 58 additions & 0 deletions internal/util/eventual.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package util

import (
"sync"

"github.com/10gen/migration-verifier/option"
)

// Eventual solves the “one writer, many readers” problem: a value gets
// written once, then the readers will see that the value is `Ready()` and
// can then `Get()` it.
//
// It’s like how `context.Context`’s `Done()` and `Err()` methods work, but
// generalized to any data type.
type Eventual[T any] struct {
ready chan struct{}
val option.Option[T]
mux sync.RWMutex
}

// NewEventual creates an Eventual and returns a pointer
// to it.
func NewEventual[T any]() *Eventual[T] {
return &Eventual[T]{
ready: make(chan struct{}),
}
}

// Ready returns a channel that closes once the Eventual’s value is ready.
func (e *Eventual[T]) Ready() <-chan struct{} {
return e.ready
}

// Get returns an option that contains the Eventual’s value, or
// empty if the value isn’t ready yet.
func (e *Eventual[T]) Get() option.Option[T] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any case that we want to call Get before it's ready? Looks like we're calling Get().MustGet() everywhere, can we make Get return T or fail if it's not ready?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that. I was/am a bit averse to proliferating panics, but you’re right that there’s little point in calling Get() before the value is ready. (It can be checked anyway by doing a select on Ready() with a default.)

I’ll change it.

e.mux.RLock()
defer e.mux.RUnlock()

return e.val
}

// Set sets the Eventual’s value. It may be called only once;
// if called again it will panic.
func (e *Eventual[T]) Set(val T) {
e.mux.Lock()
defer e.mux.Unlock()

if e.val.IsSome() {
panic("Tried to set an eventual twice!")
}

// NB: This *must* happen before the close(), or else a fast reader may
// not see this value.
e.val = option.Some(val)

close(e.ready)
}
37 changes: 37 additions & 0 deletions internal/util/eventual_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package util

import (
"time"

"github.com/10gen/migration-verifier/option"
)

func (s *UnitTestSuite) TestEventual() {
eventual := NewEventual[int]()

s.Assert().Equal(
option.None[int](),
eventual.Get(),
"Get() should return empty",
)

select {
case <-eventual.Ready():
s.Require().Fail("should not be ready")
case <-time.NewTimer(time.Second).C:
}

eventual.Set(123)

select {
case <-eventual.Ready():
case <-time.NewTimer(time.Second).C:
s.Require().Fail("should be ready")
}

s.Assert().Equal(
option.Some(123),
eventual.Get(),
"Get() should return the value",
)
}
61 changes: 30 additions & 31 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type DocKey struct {

const (
minChangeStreamPersistInterval = time.Second * 10
maxChangeStreamAwaitTime = time.Second
metadataChangeStreamCollectionName = "changeStream"
)

Expand All @@ -68,8 +69,8 @@ type ChangeStreamReader struct {

changeStreamRunning bool
changeEventBatchChan chan []ParsedEvent
writesOffTsChan chan primitive.Timestamp
errChan chan error
writesOffTs *util.Eventual[primitive.Timestamp]
error *util.Eventual[error]
doneChan chan struct{}

startAtTs *primitive.Timestamp
Expand All @@ -87,8 +88,8 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
clusterInfo: *verifier.srcClusterInfo,
changeStreamRunning: false,
changeEventBatchChan: make(chan []ParsedEvent),
writesOffTsChan: make(chan primitive.Timestamp),
errChan: make(chan error),
writesOffTs: util.NewEventual[primitive.Timestamp](),
error: util.NewEventual[error](),
doneChan: make(chan struct{}),
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
}
Expand All @@ -101,8 +102,8 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
clusterInfo: *verifier.dstClusterInfo,
changeStreamRunning: false,
changeEventBatchChan: make(chan []ParsedEvent),
writesOffTsChan: make(chan primitive.Timestamp),
errChan: make(chan error),
writesOffTs: util.NewEventual[primitive.Timestamp](),
error: util.NewEventual[error](),
doneChan: make(chan struct{}),
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
}
Expand All @@ -123,7 +124,6 @@ func (verifier *Verifier) StartChangeEventHandler(ctx context.Context, reader *C
verifier.logger.Trace().Msgf("Verifier is handling a change event batch from %s: %v", reader, batch)
err := verifier.HandleChangeStreamEvents(ctx, batch, reader.readerType)
if err != nil {
reader.errChan <- err
return err
}
}
Expand Down Expand Up @@ -268,6 +268,8 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
eventsRead := 0
var changeEventBatch []ParsedEvent

latestEvent := option.None[ParsedEvent]()

for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 {
gotEvent := cs.TryNext(ctx)

Expand All @@ -293,7 +295,9 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
if changeEventBatch[eventsRead].ClusterTime != nil &&
(csr.lastChangeEventTime == nil ||
csr.lastChangeEventTime.Before(*changeEventBatch[eventsRead].ClusterTime)) {

csr.lastChangeEventTime = changeEventBatch[eventsRead].ClusterTime
latestEvent = option.Some(changeEventBatch[eventsRead])
}

eventsRead++
Expand All @@ -305,6 +309,12 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
return nil
}

if event, has := latestEvent.Get(); has {
csr.logger.Trace().
Interface("event", event).
Msg("Updated lastChangeEventTime.")
}

var curTs primitive.Timestamp
curTs, err := extractTimestampFromResumeToken(cs.ResumeToken())
if err == nil {
Expand Down Expand Up @@ -355,7 +365,9 @@ func (csr *ChangeStreamReader) iterateChangeStream(
// source writes are ended and the migration tool is finished / committed.
// This means we should exit rather than continue reading the change stream
// since there should be no more events.
case writesOffTs := <-csr.writesOffTsChan:
case <-csr.writesOffTs.Ready():
writesOffTs := csr.writesOffTs.Get().MustGet()

csr.logger.Debug().
Interface("writesOffTimestamp", writesOffTs).
Msgf("%s thread received writesOff timestamp. Finalizing change stream.", csr)
Expand Down Expand Up @@ -408,7 +420,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(
}
// since we have started Recheck, we must signal that we have
// finished the change stream changes so that Recheck can continue.
csr.doneChan <- struct{}{}
close(csr.doneChan)
break
}
}
Expand All @@ -430,7 +442,7 @@ func (csr *ChangeStreamReader) createChangeStream(
) (*mongo.ChangeStream, mongo.Session, primitive.Timestamp, error) {
pipeline := csr.GetChangeStreamFilter()
opts := options.ChangeStream().
SetMaxAwaitTime(1 * time.Second).
SetMaxAwaitTime(maxChangeStreamAwaitTime).
SetFullDocument(options.UpdateLookup)

if csr.clusterInfo.VersionArray[0] >= 6 {
Expand Down Expand Up @@ -487,11 +499,17 @@ func (csr *ChangeStreamReader) createChangeStream(
// With sharded clusters the resume token might lead the cluster time
// by 1 increment. In that case we need the actual cluster time;
// otherwise we will get errors.
clusterTime, err := getClusterTimeFromSession(sess)
clusterTime, err := util.GetClusterTimeFromSession(sess)
if err != nil {
return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session")
}

csr.logger.Debug().
Interface("resumeTokenTimestamp", startTs).
Interface("clusterTime", clusterTime).
Stringer("changeStreamReader", csr).
Msg("Using earlier time as start timestamp.")

if startTs.After(clusterTime) {
startTs = clusterTime
}
Expand Down Expand Up @@ -542,10 +560,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
).Run(ctx, csr.logger)

if err != nil {
// NB: This failure always happens after the initial change stream
// creation.
csr.errChan <- err
close(csr.errChan)
csr.error.Set(err)
}
}()

Expand Down Expand Up @@ -661,19 +676,3 @@ func extractTimestampFromResumeToken(resumeToken bson.Raw) (primitive.Timestamp,

return resumeTokenTime, nil
}

func getClusterTimeFromSession(sess mongo.Session) (primitive.Timestamp, error) {
ctStruct := struct {
ClusterTime struct {
ClusterTime primitive.Timestamp `bson:"clusterTime"`
} `bson:"$clusterTime"`
}{}

clusterTimeRaw := sess.ClusterTime()
err := bson.Unmarshal(sess.ClusterTime(), &ctStruct)
if err != nil {
return primitive.Timestamp{}, errors.Wrapf(err, "failed to find clusterTime in session cluster time document (%v)", clusterTimeRaw)
}

return ctStruct.ClusterTime.ClusterTime, nil
}
Loading
Loading