diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index dfeb53c2..f7ab8e97 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -100,45 +100,66 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha return err } + readOneChangeEvent := func() (bool, error) { + gotEvent := cs.TryNext(ctx) + if gotEvent { + if err := cs.Decode(&changeEvent); err != nil { + return false, errors.Wrap(err, "failed to decode change event") + } + err := verifier.HandleChangeStreamEvent(ctx, &changeEvent) + if err != nil { + return false, errors.Wrap(err, "failed to handle change event") + } + } + + return gotEvent, errors.Wrap(cs.Err(), "change stream iteration failed") + } + for { var err error + var changeStreamEnded bool - for cs.TryNext(ctx) { - if err = cs.Decode(&changeEvent); err != nil { - err = errors.Wrap(err, "failed to decode change event") - break + select { + + // If the context is canceled, return immmediately. + case <-ctx.Done(): + return + + // If the changeStreamEnderChan has a message, the user has indicated that + // source writes are ended. This means we should exit rather than continue + // reading the change stream since there should be no more events. + case <-verifier.changeStreamEnderChan: + var gotEvent bool + + changeStreamEnded = true + + // Read all change events until the source reports no events. + // (i.e., the `getMore` call returns empty) + for { + gotEvent, err = readOneChangeEvent() + + if !gotEvent || err != nil { + break + } } - err = verifier.HandleChangeStreamEvent(ctx, &changeEvent) + if err != nil { - err = errors.Wrap(err, "failed to handle change event") break } - } - if cs.Err() != nil { - err = errors.Wrap( - cs.Err(), - "change stream iteration failed", - ) + default: + _, err = readOneChangeEvent() } if err == nil { err = persistResumeTokenIfNeeded() } - if err != nil { - if !errors.Is(err, context.Canceled) { - verifier.changeStreamErrChan <- err - } - - return + if err != nil && !errors.Is(err, context.Canceled) { + verifier.changeStreamErrChan <- err } - select { - // If the changeStreamEnderChan has a message, the user has indicated that - // source writes are ended. This means we should exit rather than continue - // reading the change stream since there should be no more events. - case <-verifier.changeStreamEnderChan: + if changeStreamEnded { verifier.mux.Lock() verifier.changeStreamRunning = false if verifier.lastChangeEventTime != nil { @@ -151,7 +172,6 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha // since the changeStream is exhausted, we now return verifier.logger.Debug().Msg("Change stream is done") return - default: } } }