diff --git a/internal/util/eventual.go b/internal/util/eventual.go index ad2c6dd7..901d2a1f 100644 --- a/internal/util/eventual.go +++ b/internal/util/eventual.go @@ -2,8 +2,6 @@ package util import ( "sync" - - "github.com/10gen/migration-verifier/option" ) // Eventual solves the “one writer, many readers” problem: a value gets @@ -14,7 +12,7 @@ import ( // generalized to any data type. type Eventual[T any] struct { ready chan struct{} - val option.Option[T] + val T mux sync.RWMutex } @@ -37,12 +35,14 @@ func (e *Eventual[T]) Get() T { e.mux.RLock() defer e.mux.RUnlock() - val, has := e.val.Get() - if has { - return val + // If the ready channel is still open then there’s no value yet, + // which means this method should not have been called. + select { + case <-e.ready: + return e.val + default: + panic("Eventual's Get() called before value was ready.") } - - panic("Eventual's Get() called before value was ready.") } // Set sets the Eventual’s value. It may be called only once; @@ -51,13 +51,16 @@ func (e *Eventual[T]) Set(val T) { e.mux.Lock() defer e.mux.Unlock() - if e.val.IsSome() { + select { + case <-e.ready: panic("Tried to set an eventual twice!") + default: } // NB: This *must* happen before the close(), or else a fast reader may // not see this value. - e.val = option.Some(val) + e.val = val + // This allows Get() to work: close(e.ready) } diff --git a/internal/util/eventual_test.go b/internal/util/eventual_test.go index 17f4c4b7..4bcc209e 100644 --- a/internal/util/eventual_test.go +++ b/internal/util/eventual_test.go @@ -15,14 +15,14 @@ func (s *UnitTestSuite) TestEventual() { select { case <-eventual.Ready(): s.Require().Fail("should not be ready") - case <-time.NewTimer(time.Second).C: + case <-time.NewTimer(time.Millisecond).C: } eventual.Set(123) select { case <-eventual.Ready(): - case <-time.NewTimer(time.Second).C: + case <-time.NewTimer(time.Millisecond).C: s.Require().Fail("should be ready") } @@ -31,4 +31,30 @@ func (s *UnitTestSuite) TestEventual() { eventual.Get(), "Get() should return the value", ) + + s.Assert().Equal( + 123, + eventual.Get(), + "Get() should return the value a 2nd time", + ) +} + +func (s *UnitTestSuite) TestEventualNil() { + eventual := NewEventual[error]() + + select { + case <-eventual.Ready(): + s.Require().Fail("should not be ready") + case <-time.NewTimer(time.Millisecond).C: + } + + eventual.Set(nil) + + select { + case <-eventual.Ready(): + case <-time.NewTimer(time.Millisecond).C: + s.Require().Fail("should be ready") + } + + s.Assert().Nil(eventual.Get()) } diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 01579434..143f5f04 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -15,6 +15,7 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" + "golang.org/x/sync/errgroup" ) type ddlEventHandling string @@ -33,15 +34,12 @@ const ( type changeReader interface { getWhichCluster() whichCluster getReadChannel() <-chan changeEventBatch - getError() *util.Eventual[error] getStartTimestamp() option.Option[bson.Timestamp] getEventsPerSecond() option.Option[float64] getLag() option.Option[time.Duration] getBufferSaturation() float64 setWritesOff(bson.Timestamp) - setPersistorError(error) - start(context.Context) error - done() <-chan struct{} + start(context.Context, *errgroup.Group) error persistResumeToken(context.Context, bson.Raw) error isRunning() bool String() string @@ -63,9 +61,6 @@ type ChangeReaderCommon struct { running bool changeEventBatchChan chan changeEventBatch writesOffTs *util.Eventual[bson.Timestamp] - readerError *util.Eventual[error] - persistorError *util.Eventual[error] - doneChan chan struct{} startAtTs *bson.Timestamp @@ -79,14 +74,6 @@ func (rc *ChangeReaderCommon) getWhichCluster() whichCluster { return rc.readerType } -func (rc *ChangeReaderCommon) setPersistorError(err error) { - rc.persistorError.Set(err) -} - -func (rc *ChangeReaderCommon) getError() *util.Eventual[error] { - return rc.readerError -} - func (rc *ChangeReaderCommon) getStartTimestamp() option.Option[bson.Timestamp] { return option.FromPointer(rc.startAtTs) } @@ -103,10 +90,6 @@ func (rc *ChangeReaderCommon) getReadChannel() <-chan changeEventBatch { return rc.changeEventBatchChan } -func (rc *ChangeReaderCommon) done() <-chan struct{} { - return rc.doneChan -} - // getBufferSaturation returns the reader’s internal buffer’s saturation level // as a fraction. If saturation rises, that means we’re reading events faster // than we can persist them. @@ -224,13 +207,6 @@ func (rc *ChangeReaderCommon) logIgnoredDDL(rawEvent bson.Raw) { Msg("Ignoring event with unrecognized type on destination. (It’s assumedly internal to the migration.)") } -func (rc *ChangeReaderCommon) wrapPersistorErrorForReader() error { - return errors.Wrap( - rc.persistorError.Get(), - "event persistor failed, so no more events can be processed", - ) -} - func addTimestampToLogEvent(ts bson.Timestamp, event *zerolog.Event) *zerolog.Event { return event. Any("timestamp", ts). diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index c3841a86..4a91a3c3 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -5,13 +5,10 @@ import ( "fmt" "time" - "github.com/10gen/migration-verifier/history" "github.com/10gen/migration-verifier/internal/keystring" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mbson" - "github.com/10gen/migration-verifier/mslices" - "github.com/10gen/migration-verifier/msync" "github.com/10gen/migration-verifier/option" mapset "github.com/deckarep/golang-set/v2" clone "github.com/huandu/go-clone/generic" @@ -21,6 +18,7 @@ import ( "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" "golang.org/x/exp/slices" + "golang.org/x/sync/errgroup" ) var supportedEventOpTypes = mapset.NewSet( @@ -49,43 +47,6 @@ type ChangeStreamReader struct { var _ changeReader = &ChangeStreamReader{} -func (verifier *Verifier) initializeChangeReaders() { - srcReader := &ChangeStreamReader{ - ChangeReaderCommon: ChangeReaderCommon{ - readerType: src, - namespaces: verifier.srcNamespaces, - watcherClient: verifier.srcClient, - clusterInfo: *verifier.srcClusterInfo, - }, - } - verifier.srcChangeReader = srcReader - - dstReader := &ChangeStreamReader{ - ChangeReaderCommon: ChangeReaderCommon{ - readerType: dst, - namespaces: verifier.dstNamespaces, - watcherClient: verifier.dstClient, - clusterInfo: *verifier.dstClusterInfo, - onDDLEvent: onDDLEventAllow, - }, - } - verifier.dstChangeReader = dstReader - - // Common elements in both readers: - for _, csr := range mslices.Of(srcReader, dstReader) { - csr.logger = verifier.logger - csr.metaDB = verifier.metaClient.Database(verifier.metaDBName) - csr.changeEventBatchChan = make(chan changeEventBatch, batchChanBufferSize) - csr.writesOffTs = util.NewEventual[bson.Timestamp]() - csr.readerError = util.NewEventual[error]() - csr.persistorError = util.NewEventual[error]() - csr.doneChan = make(chan struct{}) - csr.lag = msync.NewTypedAtomic(option.None[time.Duration]()) - csr.batchSizeHistory = history.New[int](time.Minute) - csr.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken - } -} - // GetChangeStreamFilter returns an aggregation pipeline that filters // namespaces as per configuration. // @@ -265,8 +226,6 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( select { case <-ctx.Done(): return util.WrapCtxErrWithCause(ctx) - case <-csr.persistorError.Ready(): - return csr.wrapPersistorErrorForReader() case csr.changeEventBatchChan <- changeEventBatch{ events: changeEvents, @@ -305,9 +264,6 @@ func (csr *ChangeStreamReader) iterateChangeStream( return err - case <-csr.persistorError.Ready(): - return csr.wrapPersistorErrorForReader() - // If the ChangeStreamEnderChan has a message, the user has indicated that // source writes are ended and the migration tool is finished / committed. // This means we should exit rather than continue reading the change stream @@ -361,9 +317,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( if csr.lastChangeEventTime != nil { csr.startAtTs = csr.lastChangeEventTime } - // since we have started Recheck, we must signal that we have - // finished the change stream changes so that Recheck can continue. - close(csr.doneChan) + break } } @@ -467,79 +421,82 @@ func (csr *ChangeStreamReader) createChangeStream( } // StartChangeStream starts the change stream. -func (csr *ChangeStreamReader) start(ctx context.Context) error { +func (csr *ChangeStreamReader) start( + ctx context.Context, + eg *errgroup.Group, +) error { // This channel holds the first change stream creation's result, whether // success or failure. Rather than using a Result we could make separate // Timestamp and error channels, but the single channel is cleaner since // there's no chance of "nonsense" like both channels returning a payload. initialCreateResultChan := make(chan mo.Result[bson.Timestamp]) - go func() { - // Closing changeEventBatchChan at the end of change stream goroutine - // notifies the verifier's change event handler to exit. - defer func() { - csr.logger.Debug(). - Stringer("changeStreamReader", csr). - Msg("Closing change event batch channel.") + eg.Go( + func() error { + // Closing changeEventBatchChan at the end of change stream goroutine + // notifies the verifier's change event handler to exit. + defer func() { + csr.logger.Debug(). + Stringer("changeStreamReader", csr). + Msg("Closing change event batch channel.") - close(csr.changeEventBatchChan) - }() + close(csr.changeEventBatchChan) + }() - retryer := retry.New().WithErrorCodes(util.CursorKilledErrCode) + retryer := retry.New().WithErrorCodes(util.CursorKilledErrCode) - parentThreadWaiting := true + parentThreadWaiting := true - err := retryer.WithCallback( - func(ctx context.Context, ri *retry.FuncInfo) error { - changeStream, sess, startTs, err := csr.createChangeStream(ctx) - if err != nil { - logEvent := csr.logger.Debug(). - Err(err). - Stringer("changeStreamReader", csr) + err := retryer.WithCallback( + func(ctx context.Context, ri *retry.FuncInfo) error { + changeStream, sess, startTs, err := csr.createChangeStream(ctx) + if err != nil { + logEvent := csr.logger.Debug(). + Err(err). + Stringer("changeStreamReader", csr) - if parentThreadWaiting { - logEvent.Msg("First change stream open failed.") + if parentThreadWaiting { + logEvent.Msg("First change stream open failed.") - initialCreateResultChan <- mo.Err[bson.Timestamp](err) - return nil - } + initialCreateResultChan <- mo.Err[bson.Timestamp](err) + return nil + } - logEvent.Msg("Retried change stream open failed.") + logEvent.Msg("Retried change stream open failed.") - return err - } + return err + } - defer changeStream.Close(ctx) + defer changeStream.Close(ctx) - logEvent := csr.logger.Debug(). - Stringer("changeStreamReader", csr). - Any("startTimestamp", startTs) + logEvent := csr.logger.Debug(). + Stringer("changeStreamReader", csr). + Any("startTimestamp", startTs) - if parentThreadWaiting { - logEvent.Msg("First change stream open succeeded.") + if parentThreadWaiting { + logEvent.Msg("First change stream open succeeded.") - initialCreateResultChan <- mo.Ok(startTs) - close(initialCreateResultChan) - parentThreadWaiting = false - } else { - logEvent.Msg("Retried change stream open succeeded.") - } + initialCreateResultChan <- mo.Ok(startTs) + close(initialCreateResultChan) + parentThreadWaiting = false + } else { + logEvent.Msg("Retried change stream open succeeded.") + } - return csr.iterateChangeStream(ctx, ri, changeStream, sess) - }, - "running %s", csr, - ).Run(ctx, csr.logger) + return csr.iterateChangeStream(ctx, ri, changeStream, sess) + }, + "running %s", csr, + ).Run(ctx, csr.logger) - if err != nil { - csr.readerError.Set(err) - } - }() + return err + }, + ) result := <-initialCreateResultChan startTs, err := result.Get() if err != nil { - return err + return errors.Wrapf(err, "creating change stream") } csr.startAtTs = &startTs diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 0c12a64f..5f746f14 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -25,6 +25,7 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" + "golang.org/x/sync/errgroup" ) func (suite *IntegrationTestSuite) TestChangeStreamFilter_NoNamespaces() { @@ -253,16 +254,21 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { ) } -func (suite *IntegrationTestSuite) startSrcChangeStreamReaderAndHandler(ctx context.Context, verifier *Verifier) { - err := verifier.srcChangeReader.start(ctx) +func (suite *IntegrationTestSuite) startSrcChangeStreamReaderAndHandler( + ctx context.Context, + verifier *Verifier, +) *errgroup.Group { + eg, egCtx := contextplus.ErrGroup(ctx) + + err := verifier.srcChangeReader.start(egCtx, eg) suite.Require().NoError(err) - go func() { - err := verifier.RunChangeEventPersistor(ctx, verifier.srcChangeReader) - if errors.Is(err, context.Canceled) { - return - } - suite.Require().NoError(err) - }() + eg.Go( + func() error { + return verifier.RunChangeEventPersistor(egCtx, verifier.srcChangeReader) + }, + ) + + return eg } func (suite *IntegrationTestSuite) TestChangeStream_Resume_NoSkip() { @@ -623,14 +629,14 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { insertTs, err := util.GetClusterTimeFromSession(sess) suite.Require().NoError(err, "should get cluster time") - suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) + eg := suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get() suite.Require().True(hasStartAtTs, "startAtTs should be set") verifier.srcChangeReader.setWritesOff(insertTs) - <-verifier.srcChangeReader.done() + suite.Require().NoError(eg.Wait()) startAtTs2 := verifier.srcChangeReader.getStartTimestamp().MustGet() @@ -655,7 +661,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { origSessionTime := sess.OperationTime() suite.Require().NotNil(origSessionTime) - suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) + eg := suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get() suite.Require().True(hasStartAtTs, "startAtTs should be set") @@ -688,7 +694,8 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { ) verifier.srcChangeReader.setWritesOff(*postEventsSessionTime) - <-verifier.srcChangeReader.done() + + suite.Require().NoError(eg.Wait()) startAtTs, hasStartAtTs = verifier.srcChangeReader.getStartTimestamp().Get() suite.Require().True(hasStartAtTs, "startAtTs should be set") @@ -1063,7 +1070,9 @@ func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() { verifier.SetDstNamespaces([]string{dstDBName + ".dstColl1", dstDBName + ".dstColl2"}) verifier.SetNamespaceMap() - suite.Require().NoError(verifier.dstChangeReader.start(ctx)) + eg, egCtx := contextplus.ErrGroup(ctx) + + suite.Require().NoError(verifier.dstChangeReader.start(egCtx, eg)) go func() { err := verifier.RunChangeEventPersistor(ctx, verifier.dstChangeReader) if errors.Is(err, context.Canceled) { diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 794a7dbf..529c3e3f 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -6,10 +6,13 @@ import ( "time" "github.com/10gen/migration-verifier/contextplus" + "github.com/10gen/migration-verifier/history" "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mslices" + "github.com/10gen/migration-verifier/msync" + "github.com/10gen/migration-verifier/option" mapset "github.com/deckarep/golang-set/v2" "github.com/goaux/timer" "github.com/pkg/errors" @@ -27,9 +30,11 @@ const ( findTaskTimeWarnThreshold = 5 * time.Second ) -var failedStatuses = mapset.NewSet( - verificationTaskFailed, - verificationTaskMetadataMismatch, +var ( + failedStatuses = mapset.NewSet( + verificationTaskFailed, + verificationTaskMetadataMismatch, + ) ) // Check is the asynchronous entry point to Check, should only be called by the web server. Use @@ -50,24 +55,6 @@ func (verifier *Verifier) Check(ctx context.Context, filter bson.D) { verifier.MaybeStartPeriodicHeapProfileCollection(ctx) } -func (verifier *Verifier) waitForChangeReader(ctx context.Context, csr changeReader) error { - select { - case <-ctx.Done(): - return util.WrapCtxErrWithCause(ctx) - case <-csr.getError().Ready(): - err := csr.getError().Get() - verifier.logger.Warn().Err(err). - Msgf("Received error from %s.", csr) - return err - case <-csr.done(): - verifier.logger.Debug(). - Msgf("Received completion signal from %s.", csr) - break - } - - return nil -} - func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { generation := verifier.generation @@ -93,12 +80,11 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { // If the change reader fails, everything should stop. eg.Go(func() error { select { - case <-verifier.srcChangeReader.getError().Ready(): - err := verifier.srcChangeReader.getError().Get() - return errors.Wrapf(err, "%s failed", verifier.srcChangeReader) - case <-verifier.dstChangeReader.getError().Ready(): - err := verifier.dstChangeReader.getError().Get() - return errors.Wrapf(err, "%s failed", verifier.dstChangeReader) + case <-verifier.changeHandlingErr.Ready(): + return errors.Wrap( + verifier.changeHandlingErr.Get(), + verifier.dstChangeReader.String(), + ) case <-ctx.Done(): return nil } @@ -269,23 +255,28 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh verifier.phase = Idle }() - ceHandlerGroup, groupCtx := contextplus.ErrGroup(ctx) + changeReaderGroup, groupCtx := contextplus.ErrGroup(ctx) for _, changeReader := range mslices.Of(verifier.srcChangeReader, verifier.dstChangeReader) { if changeReader.isRunning() { verifier.logger.Debug().Msgf("Check: %s already running.", changeReader) } else { verifier.logger.Debug().Msgf("%s not running; starting change reader", changeReader) - err = changeReader.start(ctx) + err = changeReader.start(groupCtx, changeReaderGroup) if err != nil { return errors.Wrapf(err, "failed to start %s", changeReader) } - ceHandlerGroup.Go(func() error { + changeReaderGroup.Go(func() error { return verifier.RunChangeEventPersistor(groupCtx, changeReader) }) } } + changeHandlingErr := verifier.changeHandlingErr + go func() { + changeHandlingErr.Set(changeReaderGroup.Wait()) + }() + // Log the verification status when initially booting up so it's easy to see the current state verificationStatus, err := verifier.GetVerificationStatus(ctx) if err != nil { @@ -370,23 +361,19 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh // generation number, or the last changes will not be checked. verifier.mux.Unlock() - for _, csr := range mslices.Of(verifier.srcChangeReader, verifier.dstChangeReader) { - if err = verifier.waitForChangeReader(ctx, csr); err != nil { - return errors.Wrapf( - err, - "an error interrupted the wait for closure of %s", - csr, - ) + select { + case <-ctx.Done(): + return ctx.Err() + case <-verifier.changeHandlingErr.Ready(): + err := verifier.changeHandlingErr.Get() + if err != nil { + return errors.Wrap(err, "handling change events") } verifier.logger.Debug(). - Stringer("changeReader", csr). - Msg("Change reader finished.") + Msg("Change handling finished.") } - if err = ceHandlerGroup.Wait(); err != nil { - return err - } verifier.mux.Lock() verifier.lastGeneration = true } @@ -612,3 +599,37 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error { } } } + +func (verifier *Verifier) initializeChangeReaders() { + srcReader := &ChangeStreamReader{ + ChangeReaderCommon: ChangeReaderCommon{ + readerType: src, + namespaces: verifier.srcNamespaces, + watcherClient: verifier.srcClient, + clusterInfo: *verifier.srcClusterInfo, + }, + } + verifier.srcChangeReader = srcReader + + dstReader := &ChangeStreamReader{ + ChangeReaderCommon: ChangeReaderCommon{ + readerType: dst, + namespaces: verifier.dstNamespaces, + watcherClient: verifier.dstClient, + clusterInfo: *verifier.dstClusterInfo, + onDDLEvent: onDDLEventAllow, + }, + } + verifier.dstChangeReader = dstReader + + // Common elements in both readers: + for _, csr := range mslices.Of(srcReader, dstReader) { + csr.logger = verifier.logger + csr.metaDB = verifier.metaClient.Database(verifier.metaDBName) + csr.changeEventBatchChan = make(chan changeEventBatch, batchChanBufferSize) + csr.writesOffTs = util.NewEventual[bson.Timestamp]() + csr.lag = msync.NewTypedAtomic(option.None[time.Duration]()) + csr.batchSizeHistory = history.New[int](time.Minute) + csr.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken + } +} diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 9e75bf18..e23445cb 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -103,6 +103,8 @@ type Verifier struct { srcEventRecorder *EventRecorder dstEventRecorder *EventRecorder + changeHandlingErr *util.Eventual[error] + // Used only with generation 0 to defer the first // progress report until after we’ve finished partitioning // every collection. @@ -196,6 +198,8 @@ func NewVerifier(settings VerifierSettings, logPath string) *Verifier { verificationStatusCheckInterval: 2 * time.Second, nsMap: NewNSMap(), + + changeHandlingErr: util.NewEventual[error](), } } @@ -272,20 +276,23 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error { // This has to happen outside the lock because the change readers // might be inserting docs into the recheck queue, which happens // under the lock. - select { - case <-verifier.srcChangeReader.getError().Ready(): - err := verifier.srcChangeReader.getError().Get() - return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change reader already failed", verifier.srcChangeReader) - default: - verifier.srcChangeReader.setWritesOff(srcFinalTs) - } - - select { - case <-verifier.dstChangeReader.getError().Ready(): - err := verifier.dstChangeReader.getError().Get() - return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change reader already failed", verifier.dstChangeReader) - default: - verifier.dstChangeReader.setWritesOff(dstFinalTs) + for _, readerAndTS := range []struct { + reader changeReader + ts bson.Timestamp + }{ + {verifier.srcChangeReader, srcFinalTs}, + {verifier.dstChangeReader, dstFinalTs}, + } { + select { + case <-ctx.Done(): + return ctx.Err() + case <-verifier.changeHandlingErr.Ready(): + return errors.Wrapf( + verifier.changeHandlingErr.Get(), + "tried to send writes-off timestamp to %s, but change handling already failed", readerAndTS.reader) + default: + readerAndTS.reader.setWritesOff(readerAndTS.ts) + } } return nil diff --git a/internal/verifier/recheck_persist.go b/internal/verifier/recheck_persist.go index 799040be..564e33a2 100644 --- a/internal/verifier/recheck_persist.go +++ b/internal/verifier/recheck_persist.go @@ -80,12 +80,6 @@ HandlerLoop: } } - // This will prevent the reader from hanging because the reader checks - // this along with checks for context expiry. - if err != nil { - reader.setPersistorError(err) - } - return err } diff --git a/internal/verifier/timeseries_test.go b/internal/verifier/timeseries_test.go index ae8548d9..9497b0a9 100644 --- a/internal/verifier/timeseries_test.go +++ b/internal/verifier/timeseries_test.go @@ -298,7 +298,8 @@ func (suite *IntegrationTestSuite) TestTimeSeries_Simple() { suite.Assert().Equal( 0, verificationStatus.FailedTasks, - "should be no failed tasks", + "should be no failed tasks (status: %+v)", + verificationStatus, ) suite.Assert().Equal( 3,