@@ -126,11 +126,26 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
126126 return []bson.D {stage }
127127}
128128
129- func (verifier * Verifier ) readAndHandleOneChangeEventBatch (ctx context.Context , cs * mongo.ChangeStream ) error {
129+ func (verifier * Verifier ) readAndHandleOneChangeEventBatch (
130+ ctx context.Context ,
131+ cs * mongo.ChangeStream ,
132+ finalTs * primitive.Timestamp ,
133+ ) error {
130134 eventsRead := 0
131135 var changeEventBatch []ParsedEvent
132136
133137 for hasEventInBatch := true ; hasEventInBatch ; hasEventInBatch = cs .RemainingBatchLength () > 0 {
138+ // Once the change stream reaches the final timestamp we should stop reading.
139+ if finalTs != nil {
140+ csTimestamp , err := extractTimestampFromResumeToken (cs .ResumeToken ())
141+ if err != nil {
142+ return errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
143+ }
144+ if ! csTimestamp .Before (* finalTs ) {
145+ break
146+ }
147+ }
148+
134149 gotEvent := cs .TryNext (ctx )
135150
136151 if cs .Err () != nil {
@@ -184,7 +199,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
184199
185200 for {
186201 var err error
187- var changeStreamEnded bool
202+ var gotFinalTimestamp bool
188203
189204 select {
190205
@@ -203,7 +218,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
203218 Interface ("finalTimestamp" , finalTs ).
204219 Msg ("Change stream thread received final timestamp. Finalizing change stream." )
205220
206- changeStreamEnded = true
221+ gotFinalTimestamp = true
207222
208223 // Read all change events until the source reports no events.
209224 // (i.e., the `getMore` call returns empty)
@@ -224,15 +239,15 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
224239 break
225240 }
226241
227- err = verifier .readAndHandleOneChangeEventBatch (ctx , cs )
242+ err = verifier .readAndHandleOneChangeEventBatch (ctx , cs , & finalTs )
228243
229244 if err != nil {
230245 break
231246 }
232247 }
233248
234249 default :
235- err = verifier .readAndHandleOneChangeEventBatch (ctx , cs )
250+ err = verifier .readAndHandleOneChangeEventBatch (ctx , cs , nil )
236251
237252 if err == nil {
238253 err = persistResumeTokenIfNeeded ()
@@ -246,12 +261,12 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
246261
247262 verifier .changeStreamErrChan <- err
248263
249- if ! changeStreamEnded {
264+ if ! gotFinalTimestamp {
250265 break
251266 }
252267 }
253268
254- if changeStreamEnded {
269+ if gotFinalTimestamp {
255270 verifier .mux .Lock ()
256271 verifier .changeStreamRunning = false
257272 if verifier .lastChangeEventTime != nil {
@@ -260,7 +275,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
260275 verifier .mux .Unlock ()
261276 // since we have started Recheck, we must signal that we have
262277 // finished the change stream changes so that Recheck can continue.
263- verifier .changeStreamDoneChan <- struct {}{}
278+ close ( verifier .changeStreamDoneChan )
264279 break
265280 }
266281 }
0 commit comments