Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/10gen/migration-verifier/internal/keystring"
"github.com/10gen/migration-verifier/internal/retry"
"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/option"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/samber/mo"
Expand Down Expand Up @@ -162,6 +163,7 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
ctx context.Context,
ri *retry.Info,
cs *mongo.ChangeStream,
sess mongo.Session,
) error {
eventsRead := 0
var changeEventBatch []ParsedEvent
Expand Down Expand Up @@ -194,7 +196,18 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
return nil
}

err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
var curTs primitive.Timestamp
curTs, err := extractTimestampFromResumeToken(cs.ResumeToken())
if err == nil {
lagSecs := curTs.T - sess.OperationTime().T
verifier.changeStreamLag.Store(option.Some(time.Second * time.Duration(lagSecs)))
} else {
verifier.logger.Warn().
Err(err).
Msg("Failed to extract timestamp from change stream’s resume token to compute change stream lag.")
}

err = verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
if err != nil {
return errors.Wrap(err, "failed to handle change events")
}
Expand All @@ -206,6 +219,7 @@ func (verifier *Verifier) iterateChangeStream(
ctx context.Context,
ri *retry.Info,
cs *mongo.ChangeStream,
sess mongo.Session,
) error {
var lastPersistedTime time.Time

Expand Down Expand Up @@ -262,15 +276,15 @@ func (verifier *Verifier) iterateChangeStream(
break
}

err = verifier.readAndHandleOneChangeEventBatch(ctx, ri, cs)
err = verifier.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess)

if err != nil {
return err
}
}

default:
err = verifier.readAndHandleOneChangeEventBatch(ctx, ri, cs)
err = verifier.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess)

if err == nil {
err = persistResumeTokenIfNeeded()
Expand Down Expand Up @@ -309,7 +323,7 @@ func (verifier *Verifier) iterateChangeStream(

func (verifier *Verifier) createChangeStream(
ctx context.Context,
) (*mongo.ChangeStream, primitive.Timestamp, error) {
) (*mongo.ChangeStream, mongo.Session, primitive.Timestamp, error) {
pipeline := verifier.GetChangeStreamFilter()
opts := options.ChangeStream().
SetMaxAwaitTime(1 * time.Second).
Expand All @@ -321,7 +335,7 @@ func (verifier *Verifier) createChangeStream(

savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx)
if err != nil {
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token")
return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token")
}

csStartLogEvent := verifier.logger.Info()
Expand All @@ -348,37 +362,37 @@ func (verifier *Verifier) createChangeStream(

sess, err := verifier.srcClient.StartSession()
if err != nil {
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session")
return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session")
}
sctx := mongo.NewSessionContext(ctx, sess)
srcChangeStream, err := verifier.srcClient.Watch(sctx, pipeline, opts)
if err != nil {
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream")
return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream")
}

err = verifier.persistChangeStreamResumeToken(ctx, srcChangeStream)
if err != nil {
return nil, primitive.Timestamp{}, err
return nil, nil, primitive.Timestamp{}, err
}

startTs, err := extractTimestampFromResumeToken(srcChangeStream.ResumeToken())
if err != nil {
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
}

// With sharded clusters the resume token might lead the cluster time
// by 1 increment. In that case we need the actual cluster time;
// otherwise we will get errors.
clusterTime, err := getClusterTimeFromSession(sess)
if err != nil {
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session")
return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session")
}

if startTs.After(clusterTime) {
startTs = clusterTime
}

return srcChangeStream, startTs, nil
return srcChangeStream, sess, startTs, nil
}

// StartChangeStream starts the change stream.
Expand All @@ -399,7 +413,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
ctx,
verifier.logger,
func(ri *retry.Info) error {
srcChangeStream, startTs, err := verifier.createChangeStream(ctx)
srcChangeStream, csSess, startTs, err := verifier.createChangeStream(ctx)
if err != nil {
if parentThreadWaiting {
initialCreateResultChan <- mo.Err[primitive.Timestamp](err)
Expand All @@ -417,7 +431,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
parentThreadWaiting = false
}

return verifier.iterateChangeStream(ctx, ri, srcChangeStream)
return verifier.iterateChangeStream(ctx, ri, srcChangeStream, csSess)
},
)

Expand Down
3 changes: 3 additions & 0 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/10gen/migration-verifier/internal/uuidutil"
"github.com/10gen/migration-verifier/mbson"
"github.com/10gen/migration-verifier/mslices"
"github.com/10gen/migration-verifier/msync"
"github.com/10gen/migration-verifier/option"
"github.com/olekukonko/tablewriter"
"github.com/pkg/errors"
Expand Down Expand Up @@ -130,6 +131,7 @@ type Verifier struct {
changeStreamWritesOffTsChan chan primitive.Timestamp
changeStreamErrChan chan error
changeStreamDoneChan chan struct{}
changeStreamLag *msync.TypedAtomic[option.Option[time.Duration]]
lastChangeEventTime *primitive.Timestamp
writesOffTimestamp *primitive.Timestamp

Expand Down Expand Up @@ -202,6 +204,7 @@ func NewVerifier(settings VerifierSettings) *Verifier {
changeStreamWritesOffTsChan: make(chan primitive.Timestamp),
changeStreamErrChan: make(chan error),
changeStreamDoneChan: make(chan struct{}),
changeStreamLag: msync.NewTypedAtomic(option.None[time.Duration]()),
readConcernSetting: readConcern,

// This will get recreated once gen0 starts, but we want it
Expand Down
66 changes: 35 additions & 31 deletions internal/verifier/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,43 +388,47 @@ func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder) {

builder.WriteString(fmt.Sprintf("\nChange events this generation: %s\n", eventsDescr))

if totalEvents == 0 {
return
}
if totalEvents > 0 {
reverseSortedNamespaces := maps.Keys(nsTotals)
sort.Slice(
reverseSortedNamespaces,
func(i, j int) bool {
return nsTotals[reverseSortedNamespaces[i]] > nsTotals[reverseSortedNamespaces[j]]
},
)

reverseSortedNamespaces := maps.Keys(nsTotals)
sort.Slice(
reverseSortedNamespaces,
func(i, j int) bool {
return nsTotals[reverseSortedNamespaces[i]] > nsTotals[reverseSortedNamespaces[j]]
},
)

// Only report the busiest namespaces.
if len(reverseSortedNamespaces) > changeEventsTableMaxSize {
reverseSortedNamespaces = reverseSortedNamespaces[:changeEventsTableMaxSize]
}
// Only report the busiest namespaces.
if len(reverseSortedNamespaces) > changeEventsTableMaxSize {
reverseSortedNamespaces = reverseSortedNamespaces[:changeEventsTableMaxSize]
}

table := tablewriter.NewWriter(builder)
table.SetHeader([]string{"Namespace", "Insert", "Update", "Replace", "Delete", "Total"})
table := tablewriter.NewWriter(builder)
table.SetHeader([]string{"Namespace", "Insert", "Update", "Replace", "Delete", "Total"})

for _, ns := range reverseSortedNamespaces {
curNsStats := nsStats[ns]

table.Append(
append(
[]string{ns},
strconv.Itoa(curNsStats.Insert),
strconv.Itoa(curNsStats.Update),
strconv.Itoa(curNsStats.Replace),
strconv.Itoa(curNsStats.Delete),
strconv.Itoa(curNsStats.Total()),
),
)
}

for _, ns := range reverseSortedNamespaces {
curNsStats := nsStats[ns]
builder.WriteString("\nMost frequently-changing namespaces:\n")
table.Render()
}

table.Append(
append(
[]string{ns},
strconv.Itoa(curNsStats.Insert),
strconv.Itoa(curNsStats.Update),
strconv.Itoa(curNsStats.Replace),
strconv.Itoa(curNsStats.Delete),
strconv.Itoa(curNsStats.Total()),
),
if lag, hasLag := verifier.changeStreamLag.Load().Get(); hasLag {
builder.WriteString(
fmt.Sprintf("\nChange stream lag: %s\n", reportutils.DurationToHMS(lag)),
)
}

builder.WriteString("\nMost frequently-changing namespaces:\n")
table.Render()
}

func (verifier *Verifier) printWorkerStatus(builder *strings.Builder) {
Expand Down
50 changes: 50 additions & 0 deletions msync/typed_atomic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package msync

import "sync/atomic"

// TypedAtomic is a type-safe wrapper around the standard-library atomic.Value.
// TypedAtomic serves largely the same purpose as atomic.Pointer but stores
// the value itself rather than a pointer to it. This is often more ergonomic
// than an atomic.Pointer: it can be used to store constants directly (where
// taking a pointer is inconvenient), and it defaults to the type's zero value
// rather than a nil pointer.
type TypedAtomic[T any] struct {
v atomic.Value
}

// NewTypedAtomic returns a new TypedAtomic, initialized to val.
func NewTypedAtomic[T any](val T) *TypedAtomic[T] {
var v atomic.Value
v.Store(val)
return &TypedAtomic[T]{v}
}

// Load returns the value set by the most recent Store. It returns the zero
// value for the type if there has been no call to Store.
func (ta *TypedAtomic[T]) Load() T {
return orZero[T](ta.v.Load())
}

// Store sets the value TypedAtomic to val. Store(nil) panics.
func (ta *TypedAtomic[T]) Store(val T) {
ta.v.Store(val)
}

// Swap stores newVal into the TypedAtomic and returns the previous value. It
// returns the zero value for the type if the value is empty.
func (ta *TypedAtomic[T]) Swap(newVal T) T {
return orZero[T](ta.v.Swap(newVal))
}

// CompareAndSwap executes the compare-and-swap operation for the TypedAtomic.
func (ta *TypedAtomic[T]) CompareAndSwap(oldVal, newVal T) bool {
return ta.v.CompareAndSwap(oldVal, newVal)
}

func orZero[T any](val any) T {
if val == nil {
return *new(T)
}

return val.(T)
}
59 changes: 59 additions & 0 deletions msync/typed_atomic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package msync

import (
"sync"
)

func (s *unitTestSuite) TestTypedAtomic() {
ta := NewTypedAtomic(42)

s.Require().Equal(42, ta.Load())
s.Require().False(ta.CompareAndSwap(17, 99))
s.Require().True(ta.CompareAndSwap(42, 99))
s.Require().Equal(99, ta.Load())
s.Require().Equal(99, ta.Swap(42))
s.Require().Equal(42, ta.Load())

ta.Store(17)
s.Require().Equal(17, ta.Load())

// This block is for race detection under -race.
var wg sync.WaitGroup
for i := range 100 {
wg.Add(1)
go func() {
defer wg.Done()
ta.Load()
ta.Store(i)
}()
}
wg.Wait()
}

func (s *unitTestSuite) TestAtomicZeroValues() {
s.Run("string", func() {
var ta TypedAtomic[string]
s.Require().Equal("", ta.Load())
s.Require().Equal("", ta.Swap("foo"))
s.Require().Equal("foo", ta.Load())
})

s.Run("int", func() {
var ta TypedAtomic[int]
s.Require().Equal(0, ta.Load())
s.Require().Equal(0, ta.Swap(42))
s.Require().Equal(42, ta.Load())
})

s.Run("arbitrary data", func() {
type data struct {
I int
S string
}

var ta TypedAtomic[data]
s.Require().Equal(data{}, ta.Load())
s.Require().Equal(data{}, ta.Swap(data{76, "trombones"}))
s.Require().Equal(data{76, "trombones"}, ta.Load())
})
}