diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index e7c367c9..9a8c5897 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -1,8 +1,14 @@ package testutil import ( + "context" + "testing" + + "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readconcern" ) // Marshal wraps `bsonMarshal` with a panic on failure. @@ -34,3 +40,64 @@ func convertDocsToAnys(docs []bson.D) []any { return anys } + +func KillApplicationChangeStreams( + ctx context.Context, + t *testing.T, + client *mongo.Client, + appName string, +) error { + // Kill verifier’s change stream. + cursor, err := client.Database( + "admin", + options.Database().SetReadConcern(readconcern.Local()), + ).Aggregate( + ctx, + mongo.Pipeline{ + { + {"$currentOp", bson.D{ + {"idleCursors", true}, + }}, + }, + { + {"$match", bson.D{ + {"clientMetadata.application.name", appName}, + {"command.collection", "$cmd.aggregate"}, + {"cursor.originatingCommand.pipeline.0.$_internalChangeStreamOplogMatch", + bson.D{{"$type", "object"}}, + }, + }}, + }, + }, + ) + if err != nil { + return errors.Wrapf(err, "failed to list %#q's change streams", appName) + } + + ops := []struct { + Opid any + }{} + err = cursor.All(ctx, &ops) + if err != nil { + return errors.Wrapf(err, "failed to decode %#q's change streams", appName) + } + + for _, op := range ops { + t.Logf("Killing change stream op %+v", op.Opid) + + err := + client.Database("admin").RunCommand( + ctx, + bson.D{ + {"killOp", 1}, + {"op", op.Opid}, + }, + ).Err() + + if err != nil { + return errors.Wrapf(err, "failed to kill change stream with opId %#q", op.Opid) + } + } + + return nil +} diff --git a/internal/util/cluster_time.go b/internal/util/cluster_time.go new file mode 100644 index 00000000..761cdc38 --- /dev/null +++ b/internal/util/cluster_time.go @@ -0,0 +1,24 @@ +package util + +import ( + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" +) + +func GetClusterTimeFromSession(sess mongo.Session) (primitive.Timestamp, error) { + ctStruct := struct { + ClusterTime struct { + ClusterTime primitive.Timestamp `bson:"clusterTime"` + } `bson:"$clusterTime"` + }{} + + clusterTimeRaw := sess.ClusterTime() + err := bson.Unmarshal(sess.ClusterTime(), &ctStruct) + if err != nil { + return primitive.Timestamp{}, errors.Wrapf(err, "failed to find clusterTime in session cluster time document (%v)", clusterTimeRaw) + } + + return ctStruct.ClusterTime.ClusterTime, nil +} diff --git a/internal/util/eventual.go b/internal/util/eventual.go new file mode 100644 index 00000000..ad2c6dd7 --- /dev/null +++ b/internal/util/eventual.go @@ -0,0 +1,63 @@ +package util + +import ( + "sync" + + "github.com/10gen/migration-verifier/option" +) + +// Eventual solves the “one writer, many readers” problem: a value gets +// written once, then the readers will see that the value is `Ready()` and +// can then `Get()` it. +// +// It’s like how `context.Context`’s `Done()` and `Err()` methods work, but +// generalized to any data type. +type Eventual[T any] struct { + ready chan struct{} + val option.Option[T] + mux sync.RWMutex +} + +// NewEventual creates an Eventual and returns a pointer +// to it. +func NewEventual[T any]() *Eventual[T] { + return &Eventual[T]{ + ready: make(chan struct{}), + } +} + +// Ready returns a channel that closes once the Eventual’s value is ready. +func (e *Eventual[T]) Ready() <-chan struct{} { + return e.ready +} + +// Get returns the Eventual’s value if it’s ready. +// It panics otherwise. +func (e *Eventual[T]) Get() T { + e.mux.RLock() + defer e.mux.RUnlock() + + val, has := e.val.Get() + if has { + return val + } + + panic("Eventual's Get() called before value was ready.") +} + +// Set sets the Eventual’s value. It may be called only once; +// if called again it will panic. +func (e *Eventual[T]) Set(val T) { + e.mux.Lock() + defer e.mux.Unlock() + + if e.val.IsSome() { + panic("Tried to set an eventual twice!") + } + + // NB: This *must* happen before the close(), or else a fast reader may + // not see this value. + e.val = option.Some(val) + + close(e.ready) +} diff --git a/internal/util/eventual_test.go b/internal/util/eventual_test.go new file mode 100644 index 00000000..17f4c4b7 --- /dev/null +++ b/internal/util/eventual_test.go @@ -0,0 +1,34 @@ +package util + +import ( + "time" +) + +func (s *UnitTestSuite) TestEventual() { + eventual := NewEventual[int]() + + s.Assert().Panics( + func() { eventual.Get() }, + "Get() should panic before the value is set", + ) + + select { + case <-eventual.Ready(): + s.Require().Fail("should not be ready") + case <-time.NewTimer(time.Second).C: + } + + eventual.Set(123) + + select { + case <-eventual.Ready(): + case <-time.NewTimer(time.Second).C: + s.Require().Fail("should be ready") + } + + s.Assert().Equal( + 123, + eventual.Get(), + "Get() should return the value", + ) +} diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 4c5bc1d9..04f1a2c9 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -44,6 +44,7 @@ type DocKey struct { const ( minChangeStreamPersistInterval = time.Second * 10 + maxChangeStreamAwaitTime = time.Second metadataChangeStreamCollectionName = "changeStream" ) @@ -68,8 +69,8 @@ type ChangeStreamReader struct { changeStreamRunning bool changeEventBatchChan chan []ParsedEvent - writesOffTsChan chan primitive.Timestamp - errChan chan error + writesOffTs *util.Eventual[primitive.Timestamp] + error *util.Eventual[error] doneChan chan struct{} startAtTs *primitive.Timestamp @@ -87,8 +88,8 @@ func (verifier *Verifier) initializeChangeStreamReaders() { clusterInfo: *verifier.srcClusterInfo, changeStreamRunning: false, changeEventBatchChan: make(chan []ParsedEvent), - writesOffTsChan: make(chan primitive.Timestamp), - errChan: make(chan error), + writesOffTs: util.NewEventual[primitive.Timestamp](), + error: util.NewEventual[error](), doneChan: make(chan struct{}), lag: msync.NewTypedAtomic(option.None[time.Duration]()), } @@ -101,8 +102,8 @@ func (verifier *Verifier) initializeChangeStreamReaders() { clusterInfo: *verifier.dstClusterInfo, changeStreamRunning: false, changeEventBatchChan: make(chan []ParsedEvent), - writesOffTsChan: make(chan primitive.Timestamp), - errChan: make(chan error), + writesOffTs: util.NewEventual[primitive.Timestamp](), + error: util.NewEventual[error](), doneChan: make(chan struct{}), lag: msync.NewTypedAtomic(option.None[time.Duration]()), } @@ -123,7 +124,6 @@ func (verifier *Verifier) StartChangeEventHandler(ctx context.Context, reader *C verifier.logger.Trace().Msgf("Verifier is handling a change event batch from %s: %v", reader, batch) err := verifier.HandleChangeStreamEvents(ctx, batch, reader.readerType) if err != nil { - reader.errChan <- err return err } } @@ -268,6 +268,8 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( eventsRead := 0 var changeEventBatch []ParsedEvent + latestEvent := option.None[ParsedEvent]() + for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 { gotEvent := cs.TryNext(ctx) @@ -293,7 +295,9 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( if changeEventBatch[eventsRead].ClusterTime != nil && (csr.lastChangeEventTime == nil || csr.lastChangeEventTime.Before(*changeEventBatch[eventsRead].ClusterTime)) { + csr.lastChangeEventTime = changeEventBatch[eventsRead].ClusterTime + latestEvent = option.Some(changeEventBatch[eventsRead]) } eventsRead++ @@ -305,6 +309,12 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( return nil } + if event, has := latestEvent.Get(); has { + csr.logger.Trace(). + Interface("event", event). + Msg("Updated lastChangeEventTime.") + } + var curTs primitive.Timestamp curTs, err := extractTimestampFromResumeToken(cs.ResumeToken()) if err == nil { @@ -355,7 +365,9 @@ func (csr *ChangeStreamReader) iterateChangeStream( // source writes are ended and the migration tool is finished / committed. // This means we should exit rather than continue reading the change stream // since there should be no more events. - case writesOffTs := <-csr.writesOffTsChan: + case <-csr.writesOffTs.Ready(): + writesOffTs := csr.writesOffTs.Get() + csr.logger.Debug(). Interface("writesOffTimestamp", writesOffTs). Msgf("%s thread received writesOff timestamp. Finalizing change stream.", csr) @@ -408,7 +420,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( } // since we have started Recheck, we must signal that we have // finished the change stream changes so that Recheck can continue. - csr.doneChan <- struct{}{} + close(csr.doneChan) break } } @@ -430,7 +442,7 @@ func (csr *ChangeStreamReader) createChangeStream( ) (*mongo.ChangeStream, mongo.Session, primitive.Timestamp, error) { pipeline := csr.GetChangeStreamFilter() opts := options.ChangeStream(). - SetMaxAwaitTime(1 * time.Second). + SetMaxAwaitTime(maxChangeStreamAwaitTime). SetFullDocument(options.UpdateLookup) if csr.clusterInfo.VersionArray[0] >= 6 { @@ -487,11 +499,17 @@ func (csr *ChangeStreamReader) createChangeStream( // With sharded clusters the resume token might lead the cluster time // by 1 increment. In that case we need the actual cluster time; // otherwise we will get errors. - clusterTime, err := getClusterTimeFromSession(sess) + clusterTime, err := util.GetClusterTimeFromSession(sess) if err != nil { return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") } + csr.logger.Debug(). + Interface("resumeTokenTimestamp", startTs). + Interface("clusterTime", clusterTime). + Stringer("changeStreamReader", csr). + Msg("Using earlier time as start timestamp.") + if startTs.After(clusterTime) { startTs = clusterTime } @@ -542,10 +560,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { ).Run(ctx, csr.logger) if err != nil { - // NB: This failure always happens after the initial change stream - // creation. - csr.errChan <- err - close(csr.errChan) + csr.error.Set(err) } }() @@ -661,19 +676,3 @@ func extractTimestampFromResumeToken(resumeToken bson.Raw) (primitive.Timestamp, return resumeTokenTime, nil } - -func getClusterTimeFromSession(sess mongo.Session) (primitive.Timestamp, error) { - ctStruct := struct { - ClusterTime struct { - ClusterTime primitive.Timestamp `bson:"clusterTime"` - } `bson:"$clusterTime"` - }{} - - clusterTimeRaw := sess.ClusterTime() - err := bson.Unmarshal(sess.ClusterTime(), &ctStruct) - if err != nil { - return primitive.Timestamp{}, errors.Wrapf(err, "failed to find clusterTime in session cluster time document (%v)", clusterTimeRaw) - } - - return ctStruct.ClusterTime.ClusterTime, nil -} diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 6da20876..6c1ded7e 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -15,8 +15,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/mongo/readconcern" ) func (suite *IntegrationTestSuite) TestChangeStreamFilter_NoNamespaces() { @@ -172,7 +170,6 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { events = append(events, newEvent) } - suite.T().Logf("Change stream op time (got event? %v): %v", gotEvent, csOpTime) if csOpTime.After(*changeStreamStopTime) { break } @@ -286,7 +283,7 @@ func (suite *IntegrationTestSuite) getClusterTime(ctx context.Context, client *m sctx := mongo.NewSessionContext(ctx, sess) suite.Require().NoError(sess.Client().Ping(sctx, nil)) - newTime, err := getClusterTimeFromSession(sess) + newTime, err := util.GetClusterTimeFromSession(sess) suite.Require().NoError(err, "should fetch cluster time") return newTime @@ -307,21 +304,43 @@ func (suite *IntegrationTestSuite) fetchVerifierRechecks(ctx context.Context, ve } func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { - verifier := suite.BuildVerifier() - ctx := suite.Context() - sess, err := suite.srcMongoClient.StartSession() - suite.Require().NoError(err) - sctx := mongo.NewSessionContext(ctx, sess) - _, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne( - sctx, bson.D{{"_id", 0}}) - suite.Require().NoError(err) - origStartTs := sess.OperationTime() - suite.Require().NotNil(origStartTs) - suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) - suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs) - verifier.srcChangeStreamReader.writesOffTsChan <- *origStartTs - <-verifier.srcChangeStreamReader.doneChan - suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs) + zerolog.SetGlobalLevel(zerolog.TraceLevel) + + // Each of these takes ~1s, so don’t do too many of them. + for range 5 { + verifier := suite.BuildVerifier() + ctx := suite.Context() + sess, err := suite.srcMongoClient.StartSession() + suite.Require().NoError(err) + sctx := mongo.NewSessionContext(ctx, sess) + _, err = suite.srcMongoClient. + Database(suite.DBNameForTest()). + Collection("testColl"). + InsertOne(sctx, bson.D{}) + suite.Require().NoError(err, "should insert doc") + + insertTs, err := util.GetClusterTimeFromSession(sess) + suite.Require().NoError(err, "should get cluster time") + + suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) + + startAtTs := verifier.srcChangeStreamReader.startAtTs + suite.Require().NotNil(startAtTs) + + suite.Require().False( + startAtTs.After(insertTs), + "change stream should start no later than the last operation", + ) + + verifier.srcChangeStreamReader.writesOffTs.Set(insertTs) + + <-verifier.srcChangeStreamReader.doneChan + + suite.Require().False( + verifier.srcChangeStreamReader.startAtTs.Before(*startAtTs), + "new startAtTs should be no earlier than last one", + ) + } } func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { @@ -365,7 +384,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { "session time after events should exceed the original", ) - verifier.srcChangeStreamReader.writesOffTsChan <- *postEventsSessionTime + verifier.srcChangeStreamReader.writesOffTs.Set(*postEventsSessionTime) <-verifier.srcChangeStreamReader.doneChan suite.Assert().Equal( @@ -427,6 +446,52 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() { ) } +func (suite *IntegrationTestSuite) TestWritesOffCursorKilledResilience() { + ctx := suite.Context() + + coll := suite.srcMongoClient. + Database(suite.DBNameForTest()). + Collection("mycoll") + + suite.Require().NoError( + coll.Database().CreateCollection( + ctx, + coll.Name(), + ), + ) + + suite.Require().NoError( + suite.dstMongoClient. + Database(coll.Database().Name()). + CreateCollection( + ctx, + coll.Name(), + ), + ) + + for range 100 { + verifier := suite.BuildVerifier() + + docs := lo.RepeatBy(1_000, func(_ int) bson.D { return bson.D{} }) + _, err := coll.InsertMany( + ctx, + lo.ToAnySlice(docs), + ) + suite.Require().NoError(err) + + suite.Require().NoError(verifier.WritesOff(ctx)) + + suite.Require().NoError( + testutil.KillApplicationChangeStreams( + suite.Context(), + suite.T(), + suite.srcMongoClient, + clientAppName, + ), + ) + } +} + func (suite *IntegrationTestSuite) TestCursorKilledResilience() { ctx := suite.Context() @@ -444,54 +509,16 @@ func (suite *IntegrationTestSuite) TestCursorKilledResilience() { // wait for generation 0 to end suite.Require().NoError(verifierRunner.AwaitGenerationEnd()) - const mvName = "Migration Verifier" - - // Kill verifier’s change stream. - cursor, err := suite.srcMongoClient.Database( - "admin", - options.Database().SetReadConcern(readconcern.Local()), - ).Aggregate( - ctx, - mongo.Pipeline{ - { - {"$currentOp", bson.D{ - {"idleCursors", true}, - }}, - }, - { - {"$match", bson.D{ - {"clientMetadata.application.name", mvName}, - {"command.collection", "$cmd.aggregate"}, - {"cursor.originatingCommand.pipeline.0.$_internalChangeStreamOplogMatch", - bson.D{{"$type", "object"}}, - }, - }}, - }, - }, + suite.Require().NoError( + testutil.KillApplicationChangeStreams( + suite.Context(), + suite.T(), + suite.srcMongoClient, + clientAppName, + ), ) - suite.Require().NoError(err) - - var ops []bson.Raw - suite.Require().NoError(cursor.All(ctx, &ops)) - for _, cursorRaw := range ops { - opId, err := cursorRaw.LookupErr("opid") - suite.Require().NoError(err, "should get opid from op") - - suite.T().Logf("Killing change stream op %+v", opId) - - suite.Require().NoError( - suite.srcMongoClient.Database("admin").RunCommand( - ctx, - bson.D{ - {"killOp", 1}, - {"op", opId}, - }, - ).Err(), - ) - } - - _, err = coll.InsertOne( + _, err := coll.InsertOne( ctx, bson.D{{"_id", "after kill"}}, ) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 365cd987..910d2796 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -49,7 +49,8 @@ func (verifier *Verifier) waitForChangeStream(ctx context.Context, csr *ChangeSt select { case <-ctx.Done(): return ctx.Err() - case err := <-csr.errChan: + case <-csr.error.Ready(): + err := csr.error.Get() verifier.logger.Warn().Err(err). Msgf("Received error from %s.", csr) return err @@ -87,9 +88,11 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { // If the change stream fails, everything should stop. eg.Go(func() error { select { - case err := <-verifier.srcChangeStreamReader.errChan: + case <-verifier.srcChangeStreamReader.error.Ready(): + err := verifier.srcChangeStreamReader.error.Get() return errors.Wrapf(err, "%s failed", verifier.srcChangeStreamReader) - case err := <-verifier.dstChangeStreamReader.errChan: + case <-verifier.dstChangeStreamReader.error.Ready(): + err := verifier.dstChangeStreamReader.error.Get() return errors.Wrapf(err, "%s failed", verifier.dstChangeStreamReader) case <-ctx.Done(): return nil diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 3fdcb12e..d999f047 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -375,12 +375,17 @@ func iterateCursorToChannel( cursor *mongo.Cursor, writer chan<- bson.Raw, ) error { + defer close(writer) + for cursor.Next(ctx) { state.NoteSuccess() - writer <- slices.Clone(cursor.Current) - } - close(writer) + select { + case <-ctx.Done(): + return ctx.Err() + case writer <- slices.Clone(cursor.Current): + } + } return errors.Wrap(cursor.Err(), "failed to iterate cursor") } diff --git a/internal/verifier/compare_test.go b/internal/verifier/compare_test.go new file mode 100644 index 00000000..fcc59e59 --- /dev/null +++ b/internal/verifier/compare_test.go @@ -0,0 +1,85 @@ +package verifier + +import ( + "context" + "math/rand" + "sync/atomic" + "time" + + "github.com/10gen/migration-verifier/internal/partitions" + "github.com/10gen/migration-verifier/mslices" + "github.com/pkg/errors" + "github.com/samber/lo" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +// TestFetchAndCompareDocuments_ContextCancellation ensures that nothing hangs +// when a context is canceled during FetchAndCompareDocuments(). +func (s *IntegrationTestSuite) TestFetchAndCompareDocuments_Context() { + ctx := s.Context() + + for _, client := range mslices.Of(s.srcMongoClient, s.dstMongoClient) { + docs := lo.RepeatBy( + 10_000, + func(i int) bson.D { + return bson.D{} + }, + ) + + _, err := client.Database(s.DBNameForTest()).Collection("stuff"). + InsertMany(ctx, lo.ToAnySlice(docs)) + + s.Require().NoError(err) + } + + task := VerificationTask{ + PrimaryKey: primitive.NewObjectID(), + Type: verificationTaskVerifyDocuments, + Status: verificationTaskProcessing, + QueryFilter: QueryFilter{ + Namespace: s.DBNameForTest() + ".stuff", + Partition: &partitions.Partition{ + Key: partitions.PartitionKey{ + Lower: primitive.MinKey{}, + }, + Upper: primitive.MaxKey{}, + }, + }, + } + + verifier := s.BuildVerifier() + + for range 100 { + cancelableCtx, cancel := context.WithCancelCause(ctx) + + var done atomic.Bool + go func() { + _, _, _, err := verifier.FetchAndCompareDocuments( + cancelableCtx, + &task, + ) + if err != nil { + s.Assert().ErrorIs( + err, + context.Canceled, + "only failure should be context cancellation", + ) + } + done.Store(true) + }() + + delay := time.Duration(100 * float64(time.Millisecond) * rand.Float64()) + time.Sleep(delay) + cancel(errors.Errorf("canceled after %s", delay)) + + s.Assert().Eventually( + func() bool { + return done.Load() + }, + time.Minute, + 10*time.Millisecond, + "cancellation after %s should not cause hang", + ) + } +} diff --git a/internal/verifier/integration_test_suite.go b/internal/verifier/integration_test_suite.go index 0ba77ff2..b87e874b 100644 --- a/internal/verifier/integration_test_suite.go +++ b/internal/verifier/integration_test_suite.go @@ -79,6 +79,8 @@ func (suite *IntegrationTestSuite) SetupSuite() { func (suite *IntegrationTestSuite) SetupTest() { ctx, canceller := context.WithCancelCause(context.Background()) + suite.testContext, suite.contextCanceller = ctx, canceller + suite.zerologGlobalLogLevel = zerolog.GlobalLevel() dbname := suite.DBNameForTest() @@ -112,9 +114,6 @@ func (suite *IntegrationTestSuite) SetupTest() { suite.initialDbNames.Add(dbName) } } - - suite.testContext, suite.contextCanceller = ctx, canceller - suite.zerologGlobalLogLevel = zerolog.GlobalLevel() } func (suite *IntegrationTestSuite) TearDownTest() { diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 78b9e7d3..eaab6511 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -68,6 +68,8 @@ const ( okSymbol = "\u2705" // white heavy check mark infoSymbol = "\u24d8" // circled Latin small letter I notOkSymbol = "\u2757" // heavy exclamation mark symbol + + clientAppName = "Migration Verifier" ) type whichCluster string @@ -221,7 +223,7 @@ func (verifier *Verifier) ConfigureReadConcern(setting ReadConcernSetting) { } func (verifier *Verifier) getClientOpts(uri string) *options.ClientOptions { - appName := "Migration Verifier" + appName := clientAppName opts := &options.ClientOptions{ AppName: &appName, } @@ -279,15 +281,19 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error { // might be inserting docs into the recheck queue, which happens // under the lock. select { - case verifier.srcChangeStreamReader.writesOffTsChan <- srcFinalTs: - case err := <-verifier.srcChangeStreamReader.errChan: + case <-verifier.srcChangeStreamReader.error.Ready(): + err := verifier.srcChangeStreamReader.error.Get() return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.srcChangeStreamReader) + default: + verifier.srcChangeStreamReader.writesOffTs.Set(srcFinalTs) } select { - case verifier.dstChangeStreamReader.writesOffTsChan <- dstFinalTs: - case err := <-verifier.dstChangeStreamReader.errChan: + case <-verifier.dstChangeStreamReader.error.Ready(): + err := verifier.dstChangeStreamReader.error.Get() return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.dstChangeStreamReader) + default: + verifier.dstChangeStreamReader.writesOffTs.Set(dstFinalTs) } return nil diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 2dd5569a..a889ac28 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -1622,6 +1622,8 @@ func (suite *IntegrationTestSuite) TestChangesOnDstBeforeSrc() { suite.Assert().Equal( 1, status.FailedTasks, + "failed tasks as expected (status=%+v)", + status, ) // Patch up the other mismatched document in generation 4.