Skip to content

Commit 0c7f117

Browse files
committed
always read full batch
1 parent 35bbfb4 commit 0c7f117

File tree

1 file changed

+2
-14
lines changed

1 file changed

+2
-14
lines changed

internal/verifier/change_stream.go

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -139,23 +139,11 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
139139
func (verifier *Verifier) readAndHandleOneChangeEventBatch(
140140
ctx context.Context,
141141
cs *mongo.ChangeStream,
142-
writesOffTs *primitive.Timestamp,
143142
) error {
144143
eventsRead := 0
145144
var changeEventBatch []bson.Raw
146145

147146
for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 {
148-
// Once the change stream reaches the writesOff timestamp we should stop reading.
149-
if writesOffTs != nil {
150-
csTimestamp, err := extractTimestampFromResumeToken(cs.ResumeToken())
151-
if err != nil {
152-
return errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
153-
}
154-
if !csTimestamp.Before(*writesOffTs) {
155-
break
156-
}
157-
}
158-
159147
gotEvent := cs.TryNext(ctx)
160148

161149
if cs.Err() != nil {
@@ -252,15 +240,15 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
252240
break
253241
}
254242

255-
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs, &writesOffTs)
243+
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs)
256244

257245
if err != nil {
258246
break
259247
}
260248
}
261249

262250
default:
263-
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs, nil)
251+
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs)
264252

265253
if err == nil {
266254
err = persistResumeTokenIfNeeded()

0 commit comments

Comments
 (0)