Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 44 additions & 24 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,45 +100,66 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
return err
}

readOneChangeEvent := func() (bool, error) {
gotEvent := cs.TryNext(ctx)
if gotEvent {
if err := cs.Decode(&changeEvent); err != nil {
return false, errors.Wrap(err, "failed to decode change event")
}
err := verifier.HandleChangeStreamEvent(ctx, &changeEvent)
if err != nil {
return false, errors.Wrap(err, "failed to handle change event")
}
}

return gotEvent, errors.Wrap(cs.Err(), "change stream iteration failed")
}

for {
var err error
var changeStreamEnded bool

for cs.TryNext(ctx) {
if err = cs.Decode(&changeEvent); err != nil {
err = errors.Wrap(err, "failed to decode change event")
break
select {

// If the context is canceled, return immmediately.
case <-ctx.Done():
return

// If the changeStreamEnderChan has a message, the user has indicated that
// source writes are ended. This means we should exit rather than continue
// reading the change stream since there should be no more events.
case <-verifier.changeStreamEnderChan:
var gotEvent bool

changeStreamEnded = true

// Read all change events until the source reports no events.
// (i.e., the `getMore` call returns empty)
for {
gotEvent, err = readOneChangeEvent()

if !gotEvent || err != nil {
break
}
}
err = verifier.HandleChangeStreamEvent(ctx, &changeEvent)

if err != nil {
err = errors.Wrap(err, "failed to handle change event")
break
}
}

if cs.Err() != nil {
err = errors.Wrap(
cs.Err(),
"change stream iteration failed",
)
default:
_, err = readOneChangeEvent()
}

if err == nil {
err = persistResumeTokenIfNeeded()
}

if err != nil {
if !errors.Is(err, context.Canceled) {
verifier.changeStreamErrChan <- err
}

return
if err != nil && !errors.Is(err, context.Canceled) {
verifier.changeStreamErrChan <- err
}

select {
// If the changeStreamEnderChan has a message, the user has indicated that
// source writes are ended. This means we should exit rather than continue
// reading the change stream since there should be no more events.
case <-verifier.changeStreamEnderChan:
if changeStreamEnded {
verifier.mux.Lock()
verifier.changeStreamRunning = false
if verifier.lastChangeEventTime != nil {
Expand All @@ -151,7 +172,6 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
// since the changeStream is exhausted, we now return
verifier.logger.Debug().Msg("Change stream is done")
return
default:
}
}
}
Expand Down
Loading