88 "github.com/10gen/migration-verifier/internal/keystring"
99 "github.com/10gen/migration-verifier/internal/retry"
1010 "github.com/10gen/migration-verifier/internal/util"
11- "github.com/10gen/migration-verifier/option"
1211 "github.com/pkg/errors"
1312 "github.com/rs/zerolog"
1413 "github.com/samber/mo"
@@ -139,15 +138,15 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
139138// is unideal but shouldn’t impede correctness since post-writesOff events
140139// shouldn’t really happen anyway by definition.
141140func (verifier * Verifier ) readAndHandleOneChangeEventBatch (
142- sctx mongo. SessionContext ,
141+ ctx context. Context ,
143142 ri * retry.Info ,
144143 cs * mongo.ChangeStream ,
145144) error {
146145 eventsRead := 0
147146 var changeEventBatch []ParsedEvent
148147
149148 for hasEventInBatch := true ; hasEventInBatch ; hasEventInBatch = cs .RemainingBatchLength () > 0 {
150- gotEvent := cs .TryNext (sctx )
149+ gotEvent := cs .TryNext (ctx )
151150
152151 if cs .Err () != nil {
153152 return errors .Wrap (cs .Err (), "change stream iteration failed" )
@@ -174,15 +173,7 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
174173 return nil
175174 }
176175
177- // Update the change stream lag, which we’ll display in the logs.
178- lag , err := computeChangeStreamLag (sctx , cs )
179- if err == nil {
180- verifier .changeStreamLag .Store (option .Some (lag ))
181- } else {
182- verifier .logger .Debug ().Err (err ).Msg ("Failed to compute change stream lag." )
183- }
184-
185- err = verifier .HandleChangeStreamEvents (sctx , changeEventBatch )
176+ err := verifier .HandleChangeStreamEvents (ctx , changeEventBatch )
186177 if err != nil {
187178 return errors .Wrap (err , "failed to handle change events" )
188179 }
@@ -191,7 +182,7 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
191182}
192183
193184func (verifier * Verifier ) iterateChangeStream (
194- sctx mongo. SessionContext ,
185+ ctx context. Context ,
195186 ri * retry.Info ,
196187 cs * mongo.ChangeStream ,
197188) error {
@@ -202,7 +193,7 @@ func (verifier *Verifier) iterateChangeStream(
202193 return nil
203194 }
204195
205- err := verifier .persistChangeStreamResumeToken (sctx , cs )
196+ err := verifier .persistChangeStreamResumeToken (ctx , cs )
206197 if err == nil {
207198 lastPersistedTime = time .Now ()
208199 }
@@ -217,8 +208,8 @@ func (verifier *Verifier) iterateChangeStream(
217208 select {
218209
219210 // If the context is canceled, return immmediately.
220- case <- sctx .Done ():
221- return sctx .Err ()
211+ case <- ctx .Done ():
212+ return ctx .Err ()
222213
223214 // If the changeStreamEnderChan has a message, the user has indicated that
224215 // source writes are ended. This means we should exit rather than continue
@@ -250,15 +241,15 @@ func (verifier *Verifier) iterateChangeStream(
250241 break
251242 }
252243
253- err = verifier .readAndHandleOneChangeEventBatch (sctx , ri , cs )
244+ err = verifier .readAndHandleOneChangeEventBatch (ctx , ri , cs )
254245
255246 if err != nil {
256247 return err
257248 }
258249 }
259250
260251 default :
261- err = verifier .readAndHandleOneChangeEventBatch (sctx , ri , cs )
252+ err = verifier .readAndHandleOneChangeEventBatch (ctx , ri , cs )
262253
263254 if err == nil {
264255 err = persistResumeTokenIfNeeded ()
@@ -296,7 +287,7 @@ func (verifier *Verifier) iterateChangeStream(
296287}
297288
298289func (verifier * Verifier ) createChangeStream (
299- sctx mongo. SessionContext ,
290+ ctx context. Context ,
300291) (* mongo.ChangeStream , primitive.Timestamp , error ) {
301292 pipeline := verifier .GetChangeStreamFilter ()
302293 opts := options .ChangeStream ().
@@ -307,7 +298,7 @@ func (verifier *Verifier) createChangeStream(
307298 opts = opts .SetCustomPipeline (bson.M {"showExpandedEvents" : true })
308299 }
309300
310- savedResumeToken , err := verifier .loadChangeStreamResumeToken (sctx )
301+ savedResumeToken , err := verifier .loadChangeStreamResumeToken (ctx )
311302 if err != nil {
312303 return nil , primitive.Timestamp {}, errors .Wrap (err , "failed to load persisted change stream resume token" )
313304 }
@@ -334,12 +325,17 @@ func (verifier *Verifier) createChangeStream(
334325 csStartLogEvent .Msg ("Starting change stream from current source cluster time." )
335326 }
336327
328+ sess , err := verifier .srcClient .StartSession ()
329+ if err != nil {
330+ return nil , primitive.Timestamp {}, errors .Wrap (err , "failed to start session" )
331+ }
332+ sctx := mongo .NewSessionContext (ctx , sess )
337333 srcChangeStream , err := verifier .srcClient .Watch (sctx , pipeline , opts )
338334 if err != nil {
339335 return nil , primitive.Timestamp {}, errors .Wrap (err , "failed to open change stream" )
340336 }
341337
342- err = verifier .persistChangeStreamResumeToken (sctx , srcChangeStream )
338+ err = verifier .persistChangeStreamResumeToken (ctx , srcChangeStream )
343339 if err != nil {
344340 return nil , primitive.Timestamp {}, err
345341 }
@@ -352,7 +348,7 @@ func (verifier *Verifier) createChangeStream(
352348 // With sharded clusters the resume token might lead the cluster time
353349 // by 1 increment. In that case we need the actual cluster time;
354350 // otherwise we will get errors.
355- clusterTime , err := getClusterTimeFromSession (sctx )
351+ clusterTime , err := getClusterTimeFromSession (sess )
356352 if err != nil {
357353 return nil , primitive.Timestamp {}, errors .Wrap (err , "failed to read cluster time from session" )
358354 }
@@ -382,14 +378,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
382378 ctx ,
383379 verifier .logger ,
384380 func (ri * retry.Info ) error {
385- sess , err := verifier .srcClient .StartSession ()
386- if err != nil {
387- return errors .Wrap (err , "failed to start change stream session" )
388- }
389-
390- sctx := mongo .NewSessionContext (ctx , sess )
391-
392- srcChangeStream , startTs , err := verifier .createChangeStream (sctx )
381+ srcChangeStream , startTs , err := verifier .createChangeStream (ctx )
393382 if err != nil {
394383 if parentThreadWaiting {
395384 initialCreateResultChan <- mo.Err [primitive.Timestamp ](err )
@@ -407,7 +396,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
407396 parentThreadWaiting = false
408397 }
409398
410- return verifier .iterateChangeStream (sctx , ri , srcChangeStream )
399+ return verifier .iterateChangeStream (ctx , ri , srcChangeStream )
411400 },
412401 )
413402
@@ -460,26 +449,12 @@ func (verifier *Verifier) loadChangeStreamResumeToken(ctx context.Context) (bson
460449 return token , err
461450}
462451
463- func computeChangeStreamLag (
464- sctx mongo.SessionContext ,
465- cs * mongo.ChangeStream ,
466- ) (time.Duration , error ) {
467- token := cs .ResumeToken ()
468- ts , err := extractTimestampFromResumeToken (token )
469- if err != nil {
470- return 0 , errors .Wrap (err , "failed to extract timestamp from change stream resume token" )
471- }
472-
473- optime := sctx .OperationTime ()
474- return time .Second * time .Duration (optime .T - ts .T ), nil
475- }
476-
477- func (verifier * Verifier ) persistChangeStreamResumeToken (sctx mongo.SessionContext , cs * mongo.ChangeStream ) error {
452+ func (verifier * Verifier ) persistChangeStreamResumeToken (ctx context.Context , cs * mongo.ChangeStream ) error {
478453 token := cs .ResumeToken ()
479454
480455 coll := verifier .getChangeStreamMetadataCollection ()
481456 _ , err := coll .ReplaceOne (
482- sctx ,
457+ ctx ,
483458 bson.D {{"_id" , "resumeToken" }},
484459 token ,
485460 options .Replace ().SetUpsert (true ),
0 commit comments