@@ -6,8 +6,11 @@ import (
66 "time"
77
88 "github.com/10gen/migration-verifier/internal/keystring"
9+ "github.com/10gen/migration-verifier/internal/retry"
10+ "github.com/10gen/migration-verifier/internal/util"
911 "github.com/pkg/errors"
1012 "github.com/rs/zerolog"
13+ "github.com/samber/mo"
1114 "go.mongodb.org/mongo-driver/bson"
1215 "go.mongodb.org/mongo-driver/bson/primitive"
1316 "go.mongodb.org/mongo-driver/mongo"
@@ -175,7 +178,7 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
175178 return nil
176179}
177180
178- func (verifier * Verifier ) iterateChangeStream (ctx context.Context , cs * mongo.ChangeStream ) {
181+ func (verifier * Verifier ) iterateChangeStream (ctx context.Context , cs * mongo.ChangeStream ) error {
179182 defer cs .Close (ctx )
180183
181184 var lastPersistedTime time.Time
@@ -201,10 +204,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
201204
202205 // If the context is canceled, return immmediately.
203206 case <- ctx .Done ():
204- verifier .logger .Debug ().
205- Err (ctx .Err ()).
206- Msg ("Change stream quitting." )
207- return
207+ return ctx .Err ()
208208
209209 // If the changeStreamEnderChan has a message, the user has indicated that
210210 // source writes are ended. This means we should exit rather than continue
@@ -222,8 +222,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
222222 var curTs primitive.Timestamp
223223 curTs , err = extractTimestampFromResumeToken (cs .ResumeToken ())
224224 if err != nil {
225- err = errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
226- break
225+ return errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
227226 }
228227
229228 if curTs == writesOffTs || curTs .After (writesOffTs ) {
@@ -238,7 +237,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
238237 err = verifier .readAndHandleOneChangeEventBatch (ctx , cs )
239238
240239 if err != nil {
241- break
240+ return err
242241 }
243242 }
244243
@@ -248,17 +247,9 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
248247 if err == nil {
249248 err = persistResumeTokenIfNeeded ()
250249 }
251- }
252-
253- if err != nil && ! errors .Is (err , context .Canceled ) {
254- verifier .logger .Debug ().
255- Err (err ).
256- Msg ("Sending change stream error." )
257-
258- verifier .changeStreamErrChan <- err
259250
260- if ! gotwritesOffTimestamp {
261- break
251+ if err != nil {
252+ return err
262253 }
263254 }
264255
@@ -284,18 +275,21 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
284275 }
285276
286277 infoLog .Msg ("Change stream is done." )
278+
279+ return nil
287280}
288281
289- // StartChangeStream starts the change stream.
290- func (verifier * Verifier ) StartChangeStream (ctx context.Context ) error {
282+ func (verifier * Verifier ) createChangeStream (
283+ ctx context.Context ,
284+ ) (* mongo.ChangeStream , primitive.Timestamp , error ) {
291285 pipeline := verifier .GetChangeStreamFilter ()
292286 opts := options .ChangeStream ().
293287 SetMaxAwaitTime (1 * time .Second ).
294288 SetFullDocument (options .UpdateLookup )
295289
296290 savedResumeToken , err := verifier .loadChangeStreamResumeToken (ctx )
297291 if err != nil {
298- return errors .Wrap (err , "failed to load persisted change stream resume token" )
292+ return nil , primitive. Timestamp {}, errors .Wrap (err , "failed to load persisted change stream resume token" )
299293 }
300294
301295 csStartLogEvent := verifier .logger .Info ()
@@ -322,40 +316,92 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
322316
323317 sess , err := verifier .srcClient .StartSession ()
324318 if err != nil {
325- return errors .Wrap (err , "failed to start session" )
319+ return nil , primitive. Timestamp {}, errors .Wrap (err , "failed to start session" )
326320 }
327321 sctx := mongo .NewSessionContext (ctx , sess )
328322 srcChangeStream , err := verifier .srcClient .Watch (sctx , pipeline , opts )
329323 if err != nil {
330- return errors .Wrap (err , "failed to open change stream" )
324+ return nil , primitive. Timestamp {}, errors .Wrap (err , "failed to open change stream" )
331325 }
332326
333327 err = verifier .persistChangeStreamResumeToken (ctx , srcChangeStream )
334328 if err != nil {
335- return err
329+ return nil , primitive. Timestamp {}, err
336330 }
337331
338- csTimestamp , err := extractTimestampFromResumeToken (srcChangeStream .ResumeToken ())
332+ startTs , err := extractTimestampFromResumeToken (srcChangeStream .ResumeToken ())
339333 if err != nil {
340- return errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
334+ return nil , primitive. Timestamp {}, errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
341335 }
342336
337+ // With sharded clusters the resume token might lead the cluster time
338+ // by 1 increment. In that case we need the actual cluster time;
339+ // otherwise we will get errors.
343340 clusterTime , err := getClusterTimeFromSession (sess )
344341 if err != nil {
345- return errors .Wrap (err , "failed to read cluster time from session" )
342+ return nil , primitive. Timestamp {}, errors .Wrap (err , "failed to read cluster time from session" )
346343 }
347344
348- verifier .srcStartAtTs = & csTimestamp
349- if csTimestamp .After (clusterTime ) {
350- verifier .srcStartAtTs = & clusterTime
345+ if startTs .After (clusterTime ) {
346+ startTs = clusterTime
351347 }
352348
349+ return srcChangeStream , startTs , nil
350+ }
351+
352+ // StartChangeStream starts the change stream.
353+ func (verifier * Verifier ) StartChangeStream (ctx context.Context ) error {
354+ resultChan := make (chan mo.Result [primitive.Timestamp ])
355+
356+ go func () {
357+ retryer := retry .New (retry .DefaultDurationLimit )
358+ retryer = retryer .WithErrorCodes (util .CursorKilled )
359+
360+ parentThreadWaiting := true
361+
362+ err := retryer .
363+ RunForTransientErrorsOnly (
364+ ctx ,
365+ verifier .logger ,
366+ func (i * retry.Info ) error {
367+ srcChangeStream , startTs , err := verifier .createChangeStream (ctx )
368+ if err != nil {
369+ return err
370+ }
371+
372+ if parentThreadWaiting {
373+ resultChan <- mo .Ok (startTs )
374+ close (resultChan )
375+ parentThreadWaiting = false
376+ }
377+
378+ return verifier .iterateChangeStream (ctx , srcChangeStream )
379+ },
380+ )
381+
382+ if err != nil {
383+ if parentThreadWaiting {
384+ resultChan <- mo.Err [primitive.Timestamp ](err )
385+ } else {
386+ verifier .changeStreamErrChan <- err
387+ close (verifier .changeStreamErrChan )
388+ }
389+ }
390+ }()
391+
392+ result := <- resultChan
393+
394+ startTs , err := result .Get ()
395+ if err != nil {
396+ return err
397+ }
398+
399+ verifier .srcStartAtTs = & startTs
400+
353401 verifier .mux .Lock ()
354402 verifier .changeStreamRunning = true
355403 verifier .mux .Unlock ()
356404
357- go verifier .iterateChangeStream (ctx , srcChangeStream )
358-
359405 return nil
360406}
361407
0 commit comments