From 27ec13f53fc8940d2cd1934569dc187b9aea5b0c Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 26 Nov 2024 05:22:00 -0500 Subject: [PATCH 1/2] save --- internal/verifier/change_stream.go | 24 +++++++++- internal/verifier/migration_verifier.go | 3 ++ internal/verifier/summary.go | 6 +++ msync/typed_atomic.go | 50 +++++++++++++++++++++ msync/typed_atomic_test.go | 59 +++++++++++++++++++++++++ 5 files changed, 140 insertions(+), 2 deletions(-) create mode 100644 msync/typed_atomic.go create mode 100644 msync/typed_atomic_test.go diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 483af491..8005a54a 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -8,6 +8,7 @@ import ( "github.com/10gen/migration-verifier/internal/keystring" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/samber/mo" @@ -166,8 +167,15 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch( eventsRead := 0 var changeEventBatch []ParsedEvent + sess, err := verifier.srcClient.StartSession() + if err != nil { + return errors.Wrap(err, "failed to start session to read change stream") + } + + sctx := mongo.NewSessionContext(ctx, sess) + for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 { - gotEvent := cs.TryNext(ctx) + gotEvent := cs.TryNext(sctx) if cs.Err() != nil { return errors.Wrap(cs.Err(), "change stream iteration failed") @@ -194,7 +202,19 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch( return nil } - err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch) + fmt.Printf("\n======== events batch: %+v\n\n", changeEventBatch) + + var curTs primitive.Timestamp + curTs, err = extractTimestampFromResumeToken(cs.ResumeToken()) + if err == nil { + lagSecs := curTs.T - sctx.OperationTime().T + verifier.changeStreamLag.Store(option.Some(time.Second * time.Duration(lagSecs))) + } else { + // TODO warn + //return errors.Wrap(err, "failed to extract timestamp from change stream's resume token") + } + + err = verifier.HandleChangeStreamEvents(ctx, changeEventBatch) if err != nil { return errors.Wrap(err, "failed to handle change events") } diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 485b59c4..eb464311 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -24,6 +24,7 @@ import ( "github.com/10gen/migration-verifier/internal/uuidutil" "github.com/10gen/migration-verifier/mbson" "github.com/10gen/migration-verifier/mslices" + "github.com/10gen/migration-verifier/msync" "github.com/10gen/migration-verifier/option" "github.com/olekukonko/tablewriter" "github.com/pkg/errors" @@ -130,6 +131,7 @@ type Verifier struct { changeStreamWritesOffTsChan chan primitive.Timestamp changeStreamErrChan chan error changeStreamDoneChan chan struct{} + changeStreamLag *msync.TypedAtomic[option.Option[time.Duration]] lastChangeEventTime *primitive.Timestamp writesOffTimestamp *primitive.Timestamp @@ -201,6 +203,7 @@ func NewVerifier(settings VerifierSettings) *Verifier { changeStreamWritesOffTsChan: make(chan primitive.Timestamp), changeStreamErrChan: make(chan error), changeStreamDoneChan: make(chan struct{}), + changeStreamLag: msync.NewTypedAtomic(option.None[time.Duration]()), readConcernSetting: readConcern, // This will get recreated once gen0 starts, but we want it diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index 54e45de1..67f33ecf 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -424,4 +424,10 @@ func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder) { builder.WriteString("\nMost frequently-changing namespaces:\n") table.Render() + + if lag, hasLag := verifier.changeStreamLag.Load().Get(); hasLag { + builder.WriteString( + fmt.Sprintf("\nChange stream lag: %s\n", reportutils.DurationToHMS(lag)), + ) + } } diff --git a/msync/typed_atomic.go b/msync/typed_atomic.go new file mode 100644 index 00000000..219fa34f --- /dev/null +++ b/msync/typed_atomic.go @@ -0,0 +1,50 @@ +package msync + +import "sync/atomic" + +// TypedAtomic is a type-safe wrapper around the standard-library atomic.Value. +// TypedAtomic serves largely the same purpose as atomic.Pointer but stores +// the value itself rather than a pointer to it. This is often more ergonomic +// than an atomic.Pointer: it can be used to store constants directly (where +// taking a pointer is inconvenient), and it defaults to the type's zero value +// rather than a nil pointer. +type TypedAtomic[T any] struct { + v atomic.Value +} + +// NewTypedAtomic returns a new TypedAtomic, initialized to val. +func NewTypedAtomic[T any](val T) *TypedAtomic[T] { + var v atomic.Value + v.Store(val) + return &TypedAtomic[T]{v} +} + +// Load returns the value set by the most recent Store. It returns the zero +// value for the type if there has been no call to Store. +func (ta *TypedAtomic[T]) Load() T { + return orZero[T](ta.v.Load()) +} + +// Store sets the value TypedAtomic to val. Store(nil) panics. +func (ta *TypedAtomic[T]) Store(val T) { + ta.v.Store(val) +} + +// Swap stores newVal into the TypedAtomic and returns the previous value. It +// returns the zero value for the type if the value is empty. +func (ta *TypedAtomic[T]) Swap(newVal T) T { + return orZero[T](ta.v.Swap(newVal)) +} + +// CompareAndSwap executes the compare-and-swap operation for the TypedAtomic. +func (ta *TypedAtomic[T]) CompareAndSwap(oldVal, newVal T) bool { + return ta.v.CompareAndSwap(oldVal, newVal) +} + +func orZero[T any](val any) T { + if val == nil { + return *new(T) + } + + return val.(T) +} diff --git a/msync/typed_atomic_test.go b/msync/typed_atomic_test.go new file mode 100644 index 00000000..d4789137 --- /dev/null +++ b/msync/typed_atomic_test.go @@ -0,0 +1,59 @@ +package msync + +import ( + "sync" +) + +func (s *unitTestSuite) TestTypedAtomic() { + ta := NewTypedAtomic(42) + + s.Require().Equal(42, ta.Load()) + s.Require().False(ta.CompareAndSwap(17, 99)) + s.Require().True(ta.CompareAndSwap(42, 99)) + s.Require().Equal(99, ta.Load()) + s.Require().Equal(99, ta.Swap(42)) + s.Require().Equal(42, ta.Load()) + + ta.Store(17) + s.Require().Equal(17, ta.Load()) + + // This block is for race detection under -race. + var wg sync.WaitGroup + for i := range 100 { + wg.Add(1) + go func() { + defer wg.Done() + ta.Load() + ta.Store(i) + }() + } + wg.Wait() +} + +func (s *unitTestSuite) TestAtomicZeroValues() { + s.Run("string", func() { + var ta TypedAtomic[string] + s.Require().Equal("", ta.Load()) + s.Require().Equal("", ta.Swap("foo")) + s.Require().Equal("foo", ta.Load()) + }) + + s.Run("int", func() { + var ta TypedAtomic[int] + s.Require().Equal(0, ta.Load()) + s.Require().Equal(0, ta.Swap(42)) + s.Require().Equal(42, ta.Load()) + }) + + s.Run("arbitrary data", func() { + type data struct { + I int + S string + } + + var ta TypedAtomic[data] + s.Require().Equal(data{}, ta.Load()) + s.Require().Equal(data{}, ta.Swap(data{76, "trombones"})) + s.Require().Equal(data{76, "trombones"}, ta.Load()) + }) +} From 56b7719a12a639458569c27a7cdc40e73ac18a08 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 26 Nov 2024 22:58:49 -0500 Subject: [PATCH 2/2] displaying --- internal/verifier/change_stream.go | 46 +++++++++------------ internal/verifier/summary.go | 66 +++++++++++++++--------------- 2 files changed, 52 insertions(+), 60 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 8005a54a..dcb16c84 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -163,19 +163,13 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch( ctx context.Context, ri *retry.Info, cs *mongo.ChangeStream, + sess mongo.Session, ) error { eventsRead := 0 var changeEventBatch []ParsedEvent - sess, err := verifier.srcClient.StartSession() - if err != nil { - return errors.Wrap(err, "failed to start session to read change stream") - } - - sctx := mongo.NewSessionContext(ctx, sess) - for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 { - gotEvent := cs.TryNext(sctx) + gotEvent := cs.TryNext(ctx) if cs.Err() != nil { return errors.Wrap(cs.Err(), "change stream iteration failed") @@ -202,16 +196,15 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch( return nil } - fmt.Printf("\n======== events batch: %+v\n\n", changeEventBatch) - var curTs primitive.Timestamp - curTs, err = extractTimestampFromResumeToken(cs.ResumeToken()) + curTs, err := extractTimestampFromResumeToken(cs.ResumeToken()) if err == nil { - lagSecs := curTs.T - sctx.OperationTime().T + lagSecs := curTs.T - sess.OperationTime().T verifier.changeStreamLag.Store(option.Some(time.Second * time.Duration(lagSecs))) } else { - // TODO warn - //return errors.Wrap(err, "failed to extract timestamp from change stream's resume token") + verifier.logger.Warn(). + Err(err). + Msg("Failed to extract timestamp from change stream’s resume token to compute change stream lag.") } err = verifier.HandleChangeStreamEvents(ctx, changeEventBatch) @@ -226,6 +219,7 @@ func (verifier *Verifier) iterateChangeStream( ctx context.Context, ri *retry.Info, cs *mongo.ChangeStream, + sess mongo.Session, ) error { var lastPersistedTime time.Time @@ -282,7 +276,7 @@ func (verifier *Verifier) iterateChangeStream( break } - err = verifier.readAndHandleOneChangeEventBatch(ctx, ri, cs) + err = verifier.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess) if err != nil { return err @@ -290,7 +284,7 @@ func (verifier *Verifier) iterateChangeStream( } default: - err = verifier.readAndHandleOneChangeEventBatch(ctx, ri, cs) + err = verifier.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess) if err == nil { err = persistResumeTokenIfNeeded() @@ -329,7 +323,7 @@ func (verifier *Verifier) iterateChangeStream( func (verifier *Verifier) createChangeStream( ctx context.Context, -) (*mongo.ChangeStream, primitive.Timestamp, error) { +) (*mongo.ChangeStream, mongo.Session, primitive.Timestamp, error) { pipeline := verifier.GetChangeStreamFilter() opts := options.ChangeStream(). SetMaxAwaitTime(1 * time.Second). @@ -341,7 +335,7 @@ func (verifier *Verifier) createChangeStream( savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx) if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") } csStartLogEvent := verifier.logger.Info() @@ -368,22 +362,22 @@ func (verifier *Verifier) createChangeStream( sess, err := verifier.srcClient.StartSession() if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session") } sctx := mongo.NewSessionContext(ctx, sess) srcChangeStream, err := verifier.srcClient.Watch(sctx, pipeline, opts) if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream") } err = verifier.persistChangeStreamResumeToken(ctx, srcChangeStream) if err != nil { - return nil, primitive.Timestamp{}, err + return nil, nil, primitive.Timestamp{}, err } startTs, err := extractTimestampFromResumeToken(srcChangeStream.ResumeToken()) if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token") } // With sharded clusters the resume token might lead the cluster time @@ -391,14 +385,14 @@ func (verifier *Verifier) createChangeStream( // otherwise we will get errors. clusterTime, err := getClusterTimeFromSession(sess) if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") } if startTs.After(clusterTime) { startTs = clusterTime } - return srcChangeStream, startTs, nil + return srcChangeStream, sess, startTs, nil } // StartChangeStream starts the change stream. @@ -419,7 +413,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error { ctx, verifier.logger, func(ri *retry.Info) error { - srcChangeStream, startTs, err := verifier.createChangeStream(ctx) + srcChangeStream, csSess, startTs, err := verifier.createChangeStream(ctx) if err != nil { if parentThreadWaiting { initialCreateResultChan <- mo.Err[primitive.Timestamp](err) @@ -437,7 +431,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error { parentThreadWaiting = false } - return verifier.iterateChangeStream(ctx, ri, srcChangeStream) + return verifier.iterateChangeStream(ctx, ri, srcChangeStream, csSess) }, ) diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index 67f33ecf..684425aa 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -387,43 +387,41 @@ func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder) { builder.WriteString(fmt.Sprintf("\nChange events this generation: %s\n", eventsDescr)) - if totalEvents == 0 { - return - } + if totalEvents > 0 { + reverseSortedNamespaces := maps.Keys(nsTotals) + sort.Slice( + reverseSortedNamespaces, + func(i, j int) bool { + return nsTotals[reverseSortedNamespaces[i]] > nsTotals[reverseSortedNamespaces[j]] + }, + ) - reverseSortedNamespaces := maps.Keys(nsTotals) - sort.Slice( - reverseSortedNamespaces, - func(i, j int) bool { - return nsTotals[reverseSortedNamespaces[i]] > nsTotals[reverseSortedNamespaces[j]] - }, - ) - - // Only report the busiest namespaces. - if len(reverseSortedNamespaces) > changeEventsTableMaxSize { - reverseSortedNamespaces = reverseSortedNamespaces[:changeEventsTableMaxSize] - } + // Only report the busiest namespaces. + if len(reverseSortedNamespaces) > changeEventsTableMaxSize { + reverseSortedNamespaces = reverseSortedNamespaces[:changeEventsTableMaxSize] + } - table := tablewriter.NewWriter(builder) - table.SetHeader([]string{"Namespace", "Insert", "Update", "Replace", "Delete", "Total"}) - - for _, ns := range reverseSortedNamespaces { - curNsStats := nsStats[ns] - - table.Append( - append( - []string{ns}, - strconv.Itoa(curNsStats.Insert), - strconv.Itoa(curNsStats.Update), - strconv.Itoa(curNsStats.Replace), - strconv.Itoa(curNsStats.Delete), - strconv.Itoa(curNsStats.Total()), - ), - ) - } + table := tablewriter.NewWriter(builder) + table.SetHeader([]string{"Namespace", "Insert", "Update", "Replace", "Delete", "Total"}) + + for _, ns := range reverseSortedNamespaces { + curNsStats := nsStats[ns] + + table.Append( + append( + []string{ns}, + strconv.Itoa(curNsStats.Insert), + strconv.Itoa(curNsStats.Update), + strconv.Itoa(curNsStats.Replace), + strconv.Itoa(curNsStats.Delete), + strconv.Itoa(curNsStats.Total()), + ), + ) + } - builder.WriteString("\nMost frequently-changing namespaces:\n") - table.Render() + builder.WriteString("\nMost frequently-changing namespaces:\n") + table.Render() + } if lag, hasLag := verifier.changeStreamLag.Load().Get(); hasLag { builder.WriteString(