Skip to content

Commit 258ba2d

Browse files
committed
Merge branch 'main' into rep-5219-recheck-batch
2 parents 86e8fb4 + fa17104 commit 258ba2d

File tree

1 file changed

+50
-30
lines changed

1 file changed

+50
-30
lines changed

internal/verifier/change_stream.go

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,14 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
9595

9696
var lastCheckpointTime time.Time
9797

98+
// Do a final flush of the buffered change event rechecks before the function returns.
99+
defer func() {
100+
err := verifier.flushAllBufferedChangeEventRechecks(ctx)
101+
if err != nil {
102+
verifier.changeStreamErrChan <- err
103+
}
104+
}()
105+
98106
doChangeStreamCheckpoint := func() error {
99107
if time.Since(lastCheckpointTime) <= minChangeStreamCheckpointInterval {
100108
return nil
@@ -114,53 +122,66 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
114122
return nil
115123
}
116124

117-
// Do a final flush of the buffered change event rechecks before the function returns.
118-
defer func() {
119-
err := verifier.flushAllBufferedChangeEventRechecks(ctx)
120-
if err != nil {
121-
verifier.changeStreamErrChan <- err
125+
readOneChangeEvent := func() (bool, error) {
126+
gotEvent := cs.TryNext(ctx)
127+
if gotEvent {
128+
if err := cs.Decode(&changeEvent); err != nil {
129+
return false, errors.Wrap(err, "failed to decode change event")
130+
}
131+
err := verifier.HandleChangeStreamEvent(ctx, &changeEvent)
132+
if err != nil {
133+
return false, errors.Wrap(err, "failed to handle change event")
134+
}
122135
}
123-
}()
136+
137+
return gotEvent, errors.Wrap(cs.Err(), "change stream iteration failed")
138+
}
124139

125140
for {
126141
var err error
142+
var changeStreamEnded bool
127143

128-
for cs.TryNext(ctx) {
129-
if err = cs.Decode(&changeEvent); err != nil {
130-
err = errors.Wrap(err, "failed to decode change event")
131-
break
144+
select {
145+
146+
// If the context is canceled, return immmediately.
147+
case <-ctx.Done():
148+
return
149+
150+
// If the changeStreamEnderChan has a message, the user has indicated that
151+
// source writes are ended. This means we should exit rather than continue
152+
// reading the change stream since there should be no more events.
153+
case <-verifier.changeStreamEnderChan:
154+
var gotEvent bool
155+
156+
changeStreamEnded = true
157+
158+
// Read all change events until the source reports no events.
159+
// (i.e., the `getMore` call returns empty)
160+
for {
161+
gotEvent, err = readOneChangeEvent()
162+
163+
if !gotEvent || err != nil {
164+
break
165+
}
132166
}
133-
err = verifier.HandleChangeStreamEvent(ctx, &changeEvent)
167+
134168
if err != nil {
135-
err = errors.Wrap(err, "failed to handle change event")
136169
break
137170
}
138-
}
139171

140-
if cs.Err() != nil {
141-
err = errors.Wrap(
142-
cs.Err(),
143-
"change stream iteration failed",
144-
)
172+
default:
173+
_, err = readOneChangeEvent()
145174
}
146175

147176
if err == nil {
148177
err = doChangeStreamCheckpoint()
149178
}
150179

151-
if err != nil {
152-
if !errors.Is(err, context.Canceled) {
153-
verifier.changeStreamErrChan <- err
154-
}
155-
156-
return
180+
if err != nil && !errors.Is(err, context.Canceled) {
181+
verifier.changeStreamErrChan <- err
157182
}
158183

159-
select {
160-
// If the changeStreamEnderChan has a message, the user has indicated that
161-
// source writes are ended. This means we should exit rather than continue
162-
// reading the change stream since there should be no more events.
163-
case <-verifier.changeStreamEnderChan:
184+
if changeStreamEnded {
164185
verifier.mux.Lock()
165186
verifier.changeStreamRunning = false
166187
if verifier.lastChangeEventTime != nil {
@@ -173,7 +194,6 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
173194
// since the changeStream is exhausted, we now return
174195
verifier.logger.Debug().Msg("Change stream is done")
175196
return
176-
default:
177197
}
178198
}
179199
}

0 commit comments

Comments
 (0)