Skip to content

Commit ab154c4

Browse files
committed
call IterationSuccess()
1 parent 74377be commit ab154c4

File tree

1 file changed

+12
-5
lines changed

1 file changed

+12
-5
lines changed

internal/verifier/change_stream.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
140140
// shouldn’t really happen anyway by definition.
141141
func (verifier *Verifier) readAndHandleOneChangeEventBatch(
142142
sctx mongo.SessionContext,
143+
ri *retry.Info,
143144
cs *mongo.ChangeStream,
144145
) error {
145146
eventsRead := 0
@@ -167,6 +168,8 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
167168
eventsRead++
168169
}
169170

171+
ri.IterationSuccess()
172+
170173
if eventsRead == 0 {
171174
return nil
172175
}
@@ -187,7 +190,11 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
187190
return nil
188191
}
189192

190-
func (verifier *Verifier) iterateChangeStream(sctx mongo.SessionContext, cs *mongo.ChangeStream) error {
193+
func (verifier *Verifier) iterateChangeStream(
194+
sctx mongo.SessionContext,
195+
ri *retry.Info,
196+
cs *mongo.ChangeStream,
197+
) error {
191198
var lastPersistedTime time.Time
192199

193200
persistResumeTokenIfNeeded := func() error {
@@ -243,15 +250,15 @@ func (verifier *Verifier) iterateChangeStream(sctx mongo.SessionContext, cs *mon
243250
break
244251
}
245252

246-
err = verifier.readAndHandleOneChangeEventBatch(sctx, cs)
253+
err = verifier.readAndHandleOneChangeEventBatch(sctx, ri, cs)
247254

248255
if err != nil {
249256
return err
250257
}
251258
}
252259

253260
default:
254-
err = verifier.readAndHandleOneChangeEventBatch(sctx, cs)
261+
err = verifier.readAndHandleOneChangeEventBatch(sctx, ri, cs)
255262

256263
if err == nil {
257264
err = persistResumeTokenIfNeeded()
@@ -372,7 +379,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
372379
RunForTransientErrorsOnly(
373380
ctx,
374381
verifier.logger,
375-
func(i *retry.Info) error {
382+
func(ri *retry.Info) error {
376383
sess, err := verifier.srcClient.StartSession()
377384
if err != nil {
378385
return errors.Wrap(err, "failed to start change stream session")
@@ -393,7 +400,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
393400
parentThreadWaiting = false
394401
}
395402

396-
return verifier.iterateChangeStream(sctx, srcChangeStream)
403+
return verifier.iterateChangeStream(sctx, ri, srcChangeStream)
397404
},
398405
)
399406

0 commit comments

Comments
 (0)