@@ -6,8 +6,11 @@ import (
66 "time"
77
88 "github.com/10gen/migration-verifier/internal/keystring"
9+ "github.com/10gen/migration-verifier/internal/retry"
10+ "github.com/10gen/migration-verifier/internal/util"
911 "github.com/pkg/errors"
1012 "github.com/rs/zerolog"
13+ "github.com/samber/mo"
1114 "go.mongodb.org/mongo-driver/bson"
1215 "go.mongodb.org/mongo-driver/bson/primitive"
1316 "go.mongodb.org/mongo-driver/mongo"
@@ -63,7 +66,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
6366 for i , changeEvent := range batch {
6467 if changeEvent .ClusterTime != nil &&
6568 (verifier .lastChangeEventTime == nil ||
66- verifier .lastChangeEventTime .Compare (* changeEvent .ClusterTime ) < 0 ) {
69+ verifier .lastChangeEventTime .Before (* changeEvent .ClusterTime )) {
6770 verifier .lastChangeEventTime = changeEvent .ClusterTime
6871 }
6972 switch changeEvent .OpType {
@@ -175,9 +178,7 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
175178 return nil
176179}
177180
178- func (verifier * Verifier ) iterateChangeStream (ctx context.Context , cs * mongo.ChangeStream ) {
179- defer cs .Close (ctx )
180-
181+ func (verifier * Verifier ) iterateChangeStream (ctx context.Context , cs * mongo.ChangeStream ) error {
181182 var lastPersistedTime time.Time
182183
183184 persistResumeTokenIfNeeded := func () error {
@@ -201,10 +202,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
201202
202203 // If the context is canceled, return immmediately.
203204 case <- ctx .Done ():
204- verifier .logger .Debug ().
205- Err (ctx .Err ()).
206- Msg ("Change stream quitting." )
207- return
205+ return ctx .Err ()
208206
209207 // If the changeStreamEnderChan has a message, the user has indicated that
210208 // source writes are ended. This means we should exit rather than continue
@@ -222,10 +220,11 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
222220 var curTs primitive.Timestamp
223221 curTs , err = extractTimestampFromResumeToken (cs .ResumeToken ())
224222 if err != nil {
225- err = errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
226- break
223+ return errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
227224 }
228225
226+ // writesOffTs never refers to a real event,
227+ // so we can stop once curTs >= writesOffTs.
229228 if ! curTs .Before (writesOffTs ) {
230229 verifier .logger .Debug ().
231230 Interface ("currentTimestamp" , curTs ).
@@ -238,7 +237,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
238237 err = verifier .readAndHandleOneChangeEventBatch (ctx , cs )
239238
240239 if err != nil {
241- break
240+ return err
242241 }
243242 }
244243
@@ -248,17 +247,9 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
248247 if err == nil {
249248 err = persistResumeTokenIfNeeded ()
250249 }
251- }
252-
253- if err != nil && ! errors .Is (err , context .Canceled ) {
254- verifier .logger .Debug ().
255- Err (err ).
256- Msg ("Sending change stream error." )
257250
258- verifier .changeStreamErrChan <- err
259-
260- if ! gotwritesOffTimestamp {
261- break
251+ if err != nil {
252+ return err
262253 }
263254 }
264255
@@ -284,18 +275,21 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
284275 }
285276
286277 infoLog .Msg ("Change stream is done." )
278+
279+ return nil
287280}
288281
289- // StartChangeStream starts the change stream.
290- func (verifier * Verifier ) StartChangeStream (ctx context.Context ) error {
282+ func (verifier * Verifier ) createChangeStream (
283+ ctx context.Context ,
284+ ) (* mongo.ChangeStream , primitive.Timestamp , error ) {
291285 pipeline := verifier .GetChangeStreamFilter ()
292286 opts := options .ChangeStream ().
293287 SetMaxAwaitTime (1 * time .Second ).
294288 SetFullDocument (options .UpdateLookup )
295289
296290 savedResumeToken , err := verifier .loadChangeStreamResumeToken (ctx )
297291 if err != nil {
298- return errors .Wrap (err , "failed to load persisted change stream resume token" )
292+ return nil , primitive. Timestamp {}, errors .Wrap (err , "failed to load persisted change stream resume token" )
299293 }
300294
301295 csStartLogEvent := verifier .logger .Info ()
@@ -322,40 +316,95 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
322316
323317 sess , err := verifier .srcClient .StartSession ()
324318 if err != nil {
325- return errors .Wrap (err , "failed to start session" )
319+ return nil , primitive. Timestamp {}, errors .Wrap (err , "failed to start session" )
326320 }
327321 sctx := mongo .NewSessionContext (ctx , sess )
328322 srcChangeStream , err := verifier .srcClient .Watch (sctx , pipeline , opts )
329323 if err != nil {
330- return errors .Wrap (err , "failed to open change stream" )
324+ return nil , primitive. Timestamp {}, errors .Wrap (err , "failed to open change stream" )
331325 }
332326
333327 err = verifier .persistChangeStreamResumeToken (ctx , srcChangeStream )
334328 if err != nil {
335- return err
329+ return nil , primitive. Timestamp {}, err
336330 }
337331
338- csTimestamp , err := extractTimestampFromResumeToken (srcChangeStream .ResumeToken ())
332+ startTs , err := extractTimestampFromResumeToken (srcChangeStream .ResumeToken ())
339333 if err != nil {
340- return errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
334+ return nil , primitive. Timestamp {}, errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
341335 }
342336
337+ // With sharded clusters the resume token might lead the cluster time
338+ // by 1 increment. In that case we need the actual cluster time;
339+ // otherwise we will get errors.
343340 clusterTime , err := getClusterTimeFromSession (sess )
344341 if err != nil {
345- return errors .Wrap (err , "failed to read cluster time from session" )
342+ return nil , primitive. Timestamp {}, errors .Wrap (err , "failed to read cluster time from session" )
346343 }
347344
348- verifier .srcStartAtTs = & csTimestamp
349- if csTimestamp .After (clusterTime ) {
350- verifier .srcStartAtTs = & clusterTime
345+ if startTs .After (clusterTime ) {
346+ startTs = clusterTime
351347 }
352348
349+ return srcChangeStream , startTs , nil
350+ }
351+
352+ // StartChangeStream starts the change stream.
353+ func (verifier * Verifier ) StartChangeStream (ctx context.Context ) error {
354+ // Result seems a bit simpler than messing with 2 separate channels.
355+ resultChan := make (chan mo.Result [primitive.Timestamp ])
356+
357+ go func () {
358+ retryer := retry .New (retry .DefaultDurationLimit )
359+ retryer = retryer .WithErrorCodes (util .CursorKilled )
360+
361+ parentThreadWaiting := true
362+
363+ err := retryer .
364+ RunForTransientErrorsOnly (
365+ ctx ,
366+ verifier .logger ,
367+ func (i * retry.Info ) error {
368+ srcChangeStream , startTs , err := verifier .createChangeStream (ctx )
369+ if err != nil {
370+ return err
371+ }
372+
373+ defer srcChangeStream .Close (ctx )
374+
375+ if parentThreadWaiting {
376+ resultChan <- mo .Ok (startTs )
377+ close (resultChan )
378+ parentThreadWaiting = false
379+ }
380+
381+ return verifier .iterateChangeStream (ctx , srcChangeStream )
382+ },
383+ )
384+
385+ if err != nil {
386+ if parentThreadWaiting {
387+ resultChan <- mo.Err [primitive.Timestamp ](err )
388+ } else {
389+ verifier .changeStreamErrChan <- err
390+ close (verifier .changeStreamErrChan )
391+ }
392+ }
393+ }()
394+
395+ result := <- resultChan
396+
397+ startTs , err := result .Get ()
398+ if err != nil {
399+ return err
400+ }
401+
402+ verifier .srcStartAtTs = & startTs
403+
353404 verifier .mux .Lock ()
354405 verifier .changeStreamRunning = true
355406 verifier .mux .Unlock ()
356407
357- go verifier .iterateChangeStream (ctx , srcChangeStream )
358-
359408 return nil
360409}
361410
0 commit comments