Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type ParsedEvent struct {
}

func (pe *ParsedEvent) String() string {
return fmt.Sprintf("{ OpType: %s, namespace: %s, docID: %v}", pe.OpType, pe.Ns, pe.DocKey.ID)
return fmt.Sprintf("{OpType: %s, namespace: %s, docID: %v, clusterTime: %v}", pe.OpType, pe.Ns, pe.DocKey.ID, pe.ClusterTime)
}

// DocKey is a deserialized form for the ChangeEvent documentKey field. We currently only care about
Expand All @@ -44,7 +44,7 @@ type UnknownEventError struct {
}

func (uee UnknownEventError) Error() string {
return fmt.Sprintf("Unknown event type: %#q", uee.Event.OpType)
return fmt.Sprintf("Received event with unknown optype: %+v", uee.Event)
}

// HandleChangeStreamEvent performs the necessary work for change stream events that occur during
Expand Down Expand Up @@ -83,8 +83,6 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
}

func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) {
var changeEvent ParsedEvent

var lastPersistedTime time.Time

persistResumeTokenIfNeeded := func() error {
Expand All @@ -103,6 +101,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
readOneChangeEvent := func() (bool, error) {
gotEvent := cs.TryNext(ctx)
if gotEvent {
var changeEvent ParsedEvent
if err := cs.Decode(&changeEvent); err != nil {
return false, errors.Wrap(err, "failed to decode change event")
}
Expand All @@ -129,24 +128,22 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
// source writes are ended. This means we should exit rather than continue
// reading the change stream since there should be no more events.
case <-verifier.changeStreamEnderChan:
var gotEvent bool
verifier.logger.Debug().
Msg("Change stream thread received shutdown request.")

changeStreamEnded = true

// Read all change events until the source reports no events.
// (i.e., the `getMore` call returns empty)
for {
var gotEvent bool
gotEvent, err = readOneChangeEvent()

if !gotEvent || err != nil {
break
}
}

if err != nil {
break
}

default:
_, err = readOneChangeEvent()
}
Expand All @@ -156,7 +153,15 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
}

if err != nil && !errors.Is(err, context.Canceled) {
verifier.logger.Debug().
Err(err).
Msg("Sending change stream error.")

verifier.changeStreamErrChan <- err

if !changeStreamEnded {
break
}
}

if changeStreamEnded {
Expand All @@ -169,11 +174,18 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
// since we have started Recheck, we must signal that we have
// finished the change stream changes so that Recheck can continue.
verifier.changeStreamDoneChan <- struct{}{}
// since the changeStream is exhausted, we now return
verifier.logger.Debug().Msg("Change stream is done")
return
break
}
}

infoLog := verifier.logger.Info()
if verifier.lastChangeEventTime == nil {
infoLog = infoLog.Str("changeStreamStopTime", "none")
} else {
infoLog = infoLog.Interface("changeStreamStopTime", *verifier.lastChangeEventTime)
}

infoLog.Msg("Change stream is done.")
}

// StartChangeStream starts the change stream.
Expand Down
23 changes: 21 additions & 2 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ func (verifier *Verifier) waitForChangeStream() error {
verifier.changeStreamEnderChan <- struct{}{}
select {
case err := <-verifier.changeStreamErrChan:
verifier.logger.Warn().Err(err).
Msg("Received error from change stream.")
return err
case <-verifier.changeStreamDoneChan:
verifier.logger.Debug().
Msg("Received completion signal from change stream.")
break
}
}
Expand Down Expand Up @@ -188,16 +192,31 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
return err
}
// we will only coordinate when the number of channels is exactly 2.
// * Channel 0 signals a generation is done
// * Channel 1 signals to check to continue the next generation
// * Channel 0 informs the test of a generation bounary.
// * Block until the test (via channel 1) tells us to do the
// next generation.
if len(testChan) == 2 {

verifier.logger.Debug().
Msg("Telling test about generation boundary.")
testChan[0] <- struct{}{}

verifier.logger.Debug().
Msg("Awaiting test's signal to continue.")
<-testChan[1]

verifier.logger.Debug().
Msg("Received test's signal. Continuing.")
}
time.Sleep(verifier.generationPauseDelayMillis * time.Millisecond)
verifier.mux.Lock()
if verifier.lastGeneration {
verifier.mux.Unlock()

verifier.logger.Debug().
Int("generation", verifier.generation).
Msg("Final generation done.")

return nil
}
// TODO: wait here until writesOff is hit or enough time has passed, so we don't spin
Expand Down
4 changes: 4 additions & 0 deletions internal/verifier/migration_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,10 @@ func (suite *MultiDataVersionTestSuite) TestVerifierWithFilter() {

// Turn writes off.
verifier.WritesOff(ctx)

// Tell CheckDriver to do one more pass. This should terminate the change stream.
checkContinueChan <- struct{}{}
<-checkDoneChan
}

func (suite *MultiDataVersionTestSuite) TestPartitionWithFilter() {
Expand Down
Loading