@@ -6,8 +6,11 @@ import (
66 "time"
77
88 "github.com/10gen/migration-verifier/internal/keystring"
9+ "github.com/10gen/migration-verifier/internal/retry"
10+ "github.com/10gen/migration-verifier/internal/util"
911 "github.com/pkg/errors"
1012 "github.com/rs/zerolog"
13+ "github.com/samber/mo"
1114 "go.mongodb.org/mongo-driver/bson"
1215 "go.mongodb.org/mongo-driver/bson/primitive"
1316 "go.mongodb.org/mongo-driver/mongo"
@@ -63,7 +66,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
6366 for i , changeEvent := range batch {
6467 if changeEvent .ClusterTime != nil &&
6568 (verifier .lastChangeEventTime == nil ||
66- verifier .lastChangeEventTime .Compare (* changeEvent .ClusterTime ) < 0 ) {
69+ verifier .lastChangeEventTime .Before (* changeEvent .ClusterTime )) {
6770 verifier .lastChangeEventTime = changeEvent .ClusterTime
6871 }
6972 switch changeEvent .OpType {
@@ -136,6 +139,7 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
136139// shouldn’t really happen anyway by definition.
137140func (verifier * Verifier ) readAndHandleOneChangeEventBatch (
138141 ctx context.Context ,
142+ ri * retry.Info ,
139143 cs * mongo.ChangeStream ,
140144) error {
141145 eventsRead := 0
@@ -163,6 +167,8 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
163167 eventsRead ++
164168 }
165169
170+ ri .IterationSuccess ()
171+
166172 if eventsRead == 0 {
167173 return nil
168174 }
@@ -175,9 +181,11 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
175181 return nil
176182}
177183
178- func (verifier * Verifier ) iterateChangeStream (ctx context.Context , cs * mongo.ChangeStream ) {
179- defer cs .Close (ctx )
180-
184+ func (verifier * Verifier ) iterateChangeStream (
185+ ctx context.Context ,
186+ ri * retry.Info ,
187+ cs * mongo.ChangeStream ,
188+ ) error {
181189 var lastPersistedTime time.Time
182190
183191 persistResumeTokenIfNeeded := func () error {
@@ -201,10 +209,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
201209
202210 // If the context is canceled, return immmediately.
203211 case <- ctx .Done ():
204- verifier .logger .Debug ().
205- Err (ctx .Err ()).
206- Msg ("Change stream quitting." )
207- return
212+ return ctx .Err ()
208213
209214 // If the changeStreamEnderChan has a message, the user has indicated that
210215 // source writes are ended. This means we should exit rather than continue
@@ -222,10 +227,11 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
222227 var curTs primitive.Timestamp
223228 curTs , err = extractTimestampFromResumeToken (cs .ResumeToken ())
224229 if err != nil {
225- err = errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
226- break
230+ return errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
227231 }
228232
233+ // writesOffTs never refers to a real event,
234+ // so we can stop once curTs >= writesOffTs.
229235 if ! curTs .Before (writesOffTs ) {
230236 verifier .logger .Debug ().
231237 Interface ("currentTimestamp" , curTs ).
@@ -235,30 +241,22 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
235241 break
236242 }
237243
238- err = verifier .readAndHandleOneChangeEventBatch (ctx , cs )
244+ err = verifier .readAndHandleOneChangeEventBatch (ctx , ri , cs )
239245
240246 if err != nil {
241- break
247+ return err
242248 }
243249 }
244250
245251 default :
246- err = verifier .readAndHandleOneChangeEventBatch (ctx , cs )
252+ err = verifier .readAndHandleOneChangeEventBatch (ctx , ri , cs )
247253
248254 if err == nil {
249255 err = persistResumeTokenIfNeeded ()
250256 }
251- }
252257
253- if err != nil && ! errors .Is (err , context .Canceled ) {
254- verifier .logger .Debug ().
255- Err (err ).
256- Msg ("Sending change stream error." )
257-
258- verifier .changeStreamErrChan <- err
259-
260- if ! gotwritesOffTimestamp {
261- break
258+ if err != nil {
259+ return err
262260 }
263261 }
264262
@@ -284,10 +282,13 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
284282 }
285283
286284 infoLog .Msg ("Change stream is done." )
285+
286+ return nil
287287}
288288
289- // StartChangeStream starts the change stream.
290- func (verifier * Verifier ) StartChangeStream (ctx context.Context ) error {
289+ func (verifier * Verifier ) createChangeStream (
290+ ctx context.Context ,
291+ ) (* mongo.ChangeStream , primitive.Timestamp , error ) {
291292 pipeline := verifier .GetChangeStreamFilter ()
292293 opts := options .ChangeStream ().
293294 SetMaxAwaitTime (1 * time .Second ).
@@ -299,7 +300,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
299300
300301 savedResumeToken , err := verifier .loadChangeStreamResumeToken (ctx )
301302 if err != nil {
302- return errors .Wrap (err , "failed to load persisted change stream resume token" )
303+ return nil , primitive. Timestamp {}, errors .Wrap (err , "failed to load persisted change stream resume token" )
303304 }
304305
305306 csStartLogEvent := verifier .logger .Info ()
@@ -326,40 +327,100 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
326327
327328 sess , err := verifier .srcClient .StartSession ()
328329 if err != nil {
329- return errors .Wrap (err , "failed to start session" )
330+ return nil , primitive. Timestamp {}, errors .Wrap (err , "failed to start session" )
330331 }
331332 sctx := mongo .NewSessionContext (ctx , sess )
332333 srcChangeStream , err := verifier .srcClient .Watch (sctx , pipeline , opts )
333334 if err != nil {
334- return errors .Wrap (err , "failed to open change stream" )
335+ return nil , primitive. Timestamp {}, errors .Wrap (err , "failed to open change stream" )
335336 }
336337
337338 err = verifier .persistChangeStreamResumeToken (ctx , srcChangeStream )
338339 if err != nil {
339- return err
340+ return nil , primitive. Timestamp {}, err
340341 }
341342
342- csTimestamp , err := extractTimestampFromResumeToken (srcChangeStream .ResumeToken ())
343+ startTs , err := extractTimestampFromResumeToken (srcChangeStream .ResumeToken ())
343344 if err != nil {
344- return errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
345+ return nil , primitive. Timestamp {}, errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
345346 }
346347
348+ // With sharded clusters the resume token might lead the cluster time
349+ // by 1 increment. In that case we need the actual cluster time;
350+ // otherwise we will get errors.
347351 clusterTime , err := getClusterTimeFromSession (sess )
348352 if err != nil {
349- return errors .Wrap (err , "failed to read cluster time from session" )
353+ return nil , primitive.Timestamp {}, errors .Wrap (err , "failed to read cluster time from session" )
354+ }
355+
356+ if startTs .After (clusterTime ) {
357+ startTs = clusterTime
350358 }
351359
352- verifier .srcStartAtTs = & csTimestamp
353- if csTimestamp .After (clusterTime ) {
354- verifier .srcStartAtTs = & clusterTime
360+ return srcChangeStream , startTs , nil
361+ }
362+
363+ // StartChangeStream starts the change stream.
364+ func (verifier * Verifier ) StartChangeStream (ctx context.Context ) error {
365+ // This channel holds the first change stream creation's result, whether
366+ // success or failure. Rather than using a Result we could make separate
367+ // Timestamp and error channels, but the single channel is cleaner since
368+ // there's no chance of "nonsense" like both channels returning a payload.
369+ initialCreateResultChan := make (chan mo.Result [primitive.Timestamp ])
370+
371+ go func () {
372+ retryer := retry .New (retry .DefaultDurationLimit )
373+ retryer = retryer .WithErrorCodes (util .CursorKilled )
374+
375+ parentThreadWaiting := true
376+
377+ err := retryer .RunForTransientErrorsOnly (
378+ ctx ,
379+ verifier .logger ,
380+ func (ri * retry.Info ) error {
381+ srcChangeStream , startTs , err := verifier .createChangeStream (ctx )
382+ if err != nil {
383+ if parentThreadWaiting {
384+ initialCreateResultChan <- mo.Err [primitive.Timestamp ](err )
385+ return nil
386+ }
387+
388+ return err
389+ }
390+
391+ defer srcChangeStream .Close (ctx )
392+
393+ if parentThreadWaiting {
394+ initialCreateResultChan <- mo .Ok (startTs )
395+ close (initialCreateResultChan )
396+ parentThreadWaiting = false
397+ }
398+
399+ return verifier .iterateChangeStream (ctx , ri , srcChangeStream )
400+ },
401+ )
402+
403+ if err != nil {
404+ // NB: This failure always happens after the initial change stream
405+ // creation.
406+ verifier .changeStreamErrChan <- err
407+ close (verifier .changeStreamErrChan )
408+ }
409+ }()
410+
411+ result := <- initialCreateResultChan
412+
413+ startTs , err := result .Get ()
414+ if err != nil {
415+ return err
355416 }
356417
418+ verifier .srcStartAtTs = & startTs
419+
357420 verifier .mux .Lock ()
358421 verifier .changeStreamRunning = true
359422 verifier .mux .Unlock ()
360423
361- go verifier .iterateChangeStream (ctx , srcChangeStream )
362-
363424 return nil
364425}
365426
0 commit comments