Skip to content

Commit 0c59cbc

Browse files
committed
comment
1 parent 51a823d commit 0c59cbc

File tree

1 file changed

+10
-14
lines changed

1 file changed

+10
-14
lines changed

internal/verifier/change_stream.go

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -126,26 +126,22 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
126126
return []bson.D{stage}
127127
}
128128

129+
// This function reads a single `getMore` response into a slice.
130+
//
131+
// Note that this doesn’t care about the writesOff timestamp. Thus,
132+
// if writesOff has happened and a `getMore` response’s events straddle
133+
// the writesOff timestamp (i.e., some events precede it & others follow it),
134+
// the verifier will enqueue rechecks from those post-writesOff events. This
135+
// is unideal but shouldn’t impede correctness since post-writesOff events
136+
// shouldn’t really happen anyway by definition.
129137
func (verifier *Verifier) readAndHandleOneChangeEventBatch(
130138
ctx context.Context,
131139
cs *mongo.ChangeStream,
132-
writesOffTs *primitive.Timestamp,
133140
) error {
134141
eventsRead := 0
135142
var changeEventBatch []ParsedEvent
136143

137144
for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 {
138-
// Once the change stream reaches the writesOff timestamp we should stop reading.
139-
if writesOffTs != nil {
140-
csTimestamp, err := extractTimestampFromResumeToken(cs.ResumeToken())
141-
if err != nil {
142-
return errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
143-
}
144-
if !csTimestamp.Before(*writesOffTs) {
145-
break
146-
}
147-
}
148-
149145
gotEvent := cs.TryNext(ctx)
150146

151147
if cs.Err() != nil {
@@ -239,15 +235,15 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
239235
break
240236
}
241237

242-
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs, &writesOffTs)
238+
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs)
243239

244240
if err != nil {
245241
break
246242
}
247243
}
248244

249245
default:
250-
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs, nil)
246+
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs)
251247

252248
if err == nil {
253249
err = persistResumeTokenIfNeeded()

0 commit comments

Comments
 (0)