@@ -163,19 +163,13 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
163163 ctx context.Context ,
164164 ri * retry.Info ,
165165 cs * mongo.ChangeStream ,
166+ sess mongo.Session ,
166167) error {
167168 eventsRead := 0
168169 var changeEventBatch []ParsedEvent
169170
170- sess , err := verifier .srcClient .StartSession ()
171- if err != nil {
172- return errors .Wrap (err , "failed to start session to read change stream" )
173- }
174-
175- sctx := mongo .NewSessionContext (ctx , sess )
176-
177171 for hasEventInBatch := true ; hasEventInBatch ; hasEventInBatch = cs .RemainingBatchLength () > 0 {
178- gotEvent := cs .TryNext (sctx )
172+ gotEvent := cs .TryNext (ctx )
179173
180174 if cs .Err () != nil {
181175 return errors .Wrap (cs .Err (), "change stream iteration failed" )
@@ -202,16 +196,15 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
202196 return nil
203197 }
204198
205- fmt .Printf ("\n ======== events batch: %+v\n \n " , changeEventBatch )
206-
207199 var curTs primitive.Timestamp
208- curTs , err = extractTimestampFromResumeToken (cs .ResumeToken ())
200+ curTs , err : = extractTimestampFromResumeToken (cs .ResumeToken ())
209201 if err == nil {
210- lagSecs := curTs .T - sctx .OperationTime ().T
202+ lagSecs := curTs .T - sess .OperationTime ().T
211203 verifier .changeStreamLag .Store (option .Some (time .Second * time .Duration (lagSecs )))
212204 } else {
213- // TODO warn
214- //return errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
205+ verifier .logger .Warn ().
206+ Err (err ).
207+ Msg ("Failed to extract timestamp from change stream’s resume token to compute change stream lag." )
215208 }
216209
217210 err = verifier .HandleChangeStreamEvents (ctx , changeEventBatch )
@@ -226,6 +219,7 @@ func (verifier *Verifier) iterateChangeStream(
226219 ctx context.Context ,
227220 ri * retry.Info ,
228221 cs * mongo.ChangeStream ,
222+ sess mongo.Session ,
229223) error {
230224 var lastPersistedTime time.Time
231225
@@ -282,15 +276,15 @@ func (verifier *Verifier) iterateChangeStream(
282276 break
283277 }
284278
285- err = verifier .readAndHandleOneChangeEventBatch (ctx , ri , cs )
279+ err = verifier .readAndHandleOneChangeEventBatch (ctx , ri , cs , sess )
286280
287281 if err != nil {
288282 return err
289283 }
290284 }
291285
292286 default :
293- err = verifier .readAndHandleOneChangeEventBatch (ctx , ri , cs )
287+ err = verifier .readAndHandleOneChangeEventBatch (ctx , ri , cs , sess )
294288
295289 if err == nil {
296290 err = persistResumeTokenIfNeeded ()
@@ -329,7 +323,7 @@ func (verifier *Verifier) iterateChangeStream(
329323
330324func (verifier * Verifier ) createChangeStream (
331325 ctx context.Context ,
332- ) (* mongo.ChangeStream , primitive.Timestamp , error ) {
326+ ) (* mongo.ChangeStream , mongo. Session , primitive.Timestamp , error ) {
333327 pipeline := verifier .GetChangeStreamFilter ()
334328 opts := options .ChangeStream ().
335329 SetMaxAwaitTime (1 * time .Second ).
@@ -341,7 +335,7 @@ func (verifier *Verifier) createChangeStream(
341335
342336 savedResumeToken , err := verifier .loadChangeStreamResumeToken (ctx )
343337 if err != nil {
344- return nil , primitive.Timestamp {}, errors .Wrap (err , "failed to load persisted change stream resume token" )
338+ return nil , nil , primitive.Timestamp {}, errors .Wrap (err , "failed to load persisted change stream resume token" )
345339 }
346340
347341 csStartLogEvent := verifier .logger .Info ()
@@ -368,37 +362,37 @@ func (verifier *Verifier) createChangeStream(
368362
369363 sess , err := verifier .srcClient .StartSession ()
370364 if err != nil {
371- return nil , primitive.Timestamp {}, errors .Wrap (err , "failed to start session" )
365+ return nil , nil , primitive.Timestamp {}, errors .Wrap (err , "failed to start session" )
372366 }
373367 sctx := mongo .NewSessionContext (ctx , sess )
374368 srcChangeStream , err := verifier .srcClient .Watch (sctx , pipeline , opts )
375369 if err != nil {
376- return nil , primitive.Timestamp {}, errors .Wrap (err , "failed to open change stream" )
370+ return nil , nil , primitive.Timestamp {}, errors .Wrap (err , "failed to open change stream" )
377371 }
378372
379373 err = verifier .persistChangeStreamResumeToken (ctx , srcChangeStream )
380374 if err != nil {
381- return nil , primitive.Timestamp {}, err
375+ return nil , nil , primitive.Timestamp {}, err
382376 }
383377
384378 startTs , err := extractTimestampFromResumeToken (srcChangeStream .ResumeToken ())
385379 if err != nil {
386- return nil , primitive.Timestamp {}, errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
380+ return nil , nil , primitive.Timestamp {}, errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
387381 }
388382
389383 // With sharded clusters the resume token might lead the cluster time
390384 // by 1 increment. In that case we need the actual cluster time;
391385 // otherwise we will get errors.
392386 clusterTime , err := getClusterTimeFromSession (sess )
393387 if err != nil {
394- return nil , primitive.Timestamp {}, errors .Wrap (err , "failed to read cluster time from session" )
388+ return nil , nil , primitive.Timestamp {}, errors .Wrap (err , "failed to read cluster time from session" )
395389 }
396390
397391 if startTs .After (clusterTime ) {
398392 startTs = clusterTime
399393 }
400394
401- return srcChangeStream , startTs , nil
395+ return srcChangeStream , sess , startTs , nil
402396}
403397
404398// StartChangeStream starts the change stream.
@@ -419,7 +413,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
419413 ctx ,
420414 verifier .logger ,
421415 func (ri * retry.Info ) error {
422- srcChangeStream , startTs , err := verifier .createChangeStream (ctx )
416+ srcChangeStream , csSess , startTs , err := verifier .createChangeStream (ctx )
423417 if err != nil {
424418 if parentThreadWaiting {
425419 initialCreateResultChan <- mo.Err [primitive.Timestamp ](err )
@@ -437,7 +431,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
437431 parentThreadWaiting = false
438432 }
439433
440- return verifier .iterateChangeStream (ctx , ri , srcChangeStream )
434+ return verifier .iterateChangeStream (ctx , ri , srcChangeStream , csSess )
441435 },
442436 )
443437
0 commit comments