Skip to content

Commit 2d95686

Browse files
committed
Merge branch 'main' into REP-5317-add-dst-change-stream
2 parents 16224b1 + bfc46eb commit 2d95686

34 files changed

+3747
-156
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/pkg/errors v0.9.1
1313
github.com/rs/zerolog v1.28.0
1414
github.com/samber/lo v1.47.0
15+
github.com/samber/mo v1.13.0
1516
github.com/stretchr/testify v1.8.0
1617
github.com/urfave/cli v1.22.9
1718
go.mongodb.org/mongo-driver v1.17.1

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf
8484
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
8585
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
8686
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
87+
github.com/samber/mo v1.13.0 h1:LB1OwfJMju3a6FjghH+AIvzMG0ZPOzgTWj1qaHs1IQ4=
88+
github.com/samber/mo v1.13.0/go.mod h1:BfkrCPuYzVG3ZljnZB783WIJIGk1mcZr9c9CPf8tAxs=
8789
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
8890
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
8991
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=

internal/util/error.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import (
2020
// `ErrorCode` newtype, but that requires a more invasive change to everything
2121
// that uses error codes.
2222
const (
23-
LockFailed int = 107
24-
SampleTooManyDuplicates int = 28799
23+
LockFailed = 107
24+
SampleTooManyDuplicates = 28799
25+
CursorKilled = 237
2526
)
2627

2728
//

internal/verifier/change_stream.go

Lines changed: 127 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ import (
77

88
"github.com/10gen/migration-verifier/internal/keystring"
99
"github.com/10gen/migration-verifier/internal/logger"
10+
"github.com/10gen/migration-verifier/internal/retry"
1011
"github.com/10gen/migration-verifier/internal/util"
1112
"github.com/pkg/errors"
1213
"github.com/rs/zerolog"
14+
"github.com/samber/mo"
1315
"go.mongodb.org/mongo-driver/bson"
1416
"go.mongodb.org/mongo-driver/bson/primitive"
1517
"go.mongodb.org/mongo-driver/mongo"
@@ -208,17 +210,37 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
208210
// and omit fullDocument, but $bsonSize was new in MongoDB 4.4, and we still
209211
// want to verify migrations from 4.2. fullDocument is unlikely to be a
210212
// bottleneck anyway.
211-
func (csr *ChangeStreamReader) GetChangeStreamFilter() []bson.D {
213+
func (csr *ChangeStreamReader) GetChangeStreamFilter() (pipeline mongo.Pipeline) {
212214
if len(csr.namespaces) == 0 {
213-
return []bson.D{{bson.E{"$match", bson.D{{"ns.db", bson.D{{"$ne", csr.metaDBName}}}}}}}
214-
}
215-
filter := bson.A{}
216-
for _, ns := range csr.namespaces {
217-
db, coll := SplitNamespace(ns)
218-
filter = append(filter, bson.D{{"ns", bson.D{{"db", db}, {"coll", coll}}}})
215+
pipeline = mongo.Pipeline{
216+
{{"$match", bson.D{
217+
{"ns.db", bson.D{{"$ne", csr.metaDBName}}},
218+
}}},
219+
}
220+
} else {
221+
filter := []bson.D{}
222+
for _, ns := range csr.namespaces {
223+
db, coll := SplitNamespace(ns)
224+
filter = append(filter, bson.D{
225+
{"ns", bson.D{
226+
{"db", db},
227+
{"coll", coll},
228+
}},
229+
})
230+
}
231+
pipeline = mongo.Pipeline{
232+
{{"$match", bson.D{{"$or", filter}}}},
233+
}
219234
}
220-
stage := bson.D{{"$match", bson.D{{"$or", filter}}}}
221-
return []bson.D{stage}
235+
236+
return append(
237+
pipeline,
238+
bson.D{
239+
{"$unset", []string{
240+
"updateDescription",
241+
}},
242+
},
243+
)
222244
}
223245

224246
// This function reads a single `getMore` response into a slice.
@@ -231,6 +253,7 @@ func (csr *ChangeStreamReader) GetChangeStreamFilter() []bson.D {
231253
// shouldn’t really happen anyway by definition.
232254
func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
233255
ctx context.Context,
256+
ri *retry.Info,
234257
cs *mongo.ChangeStream,
235258
) error {
236259
eventsRead := 0
@@ -257,13 +280,15 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
257280

258281
if changeEventBatch[eventsRead].ClusterTime != nil &&
259282
(csr.lastChangeEventTime == nil ||
260-
csr.lastChangeEventTime.Compare(*changeEventBatch[eventsRead].ClusterTime) < 0) {
283+
csr.lastChangeEventTime.Before(*changeEventBatch[eventsRead].ClusterTime)) {
261284
csr.lastChangeEventTime = changeEventBatch[eventsRead].ClusterTime
262285
}
263286

264287
eventsRead++
265288
}
266289

290+
ri.IterationSuccess()
291+
267292
if eventsRead == 0 {
268293
return nil
269294
}
@@ -272,9 +297,11 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
272297
return nil
273298
}
274299

275-
func (csr *ChangeStreamReader) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) {
276-
defer cs.Close(ctx)
277-
300+
func (csr *ChangeStreamReader) iterateChangeStream(
301+
ctx context.Context,
302+
ri *retry.Info,
303+
cs *mongo.ChangeStream,
304+
) error {
278305
var lastPersistedTime time.Time
279306

280307
persistResumeTokenIfNeeded := func() error {
@@ -298,10 +325,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(ctx context.Context, cs *mong
298325

299326
// If the context is canceled, return immmediately.
300327
case <-ctx.Done():
301-
csr.logger.Debug().
302-
Err(ctx.Err()).
303-
Msg("Change stream quitting.")
304-
return
328+
return ctx.Err()
305329

306330
// If the ChangeStreamEnderChan has a message, the user has indicated that
307331
// source writes are ended. This means we should exit rather than continue
@@ -319,11 +343,12 @@ func (csr *ChangeStreamReader) iterateChangeStream(ctx context.Context, cs *mong
319343
var curTs primitive.Timestamp
320344
curTs, err = extractTimestampFromResumeToken(cs.ResumeToken())
321345
if err != nil {
322-
err = errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
323-
break
346+
return errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
324347
}
325348

326-
if curTs.After(writesOffTs) {
349+
// writesOffTs never refers to a real event,
350+
// so we can stop once curTs >= writesOffTs.
351+
if !curTs.Before(writesOffTs) {
327352
csr.logger.Debug().
328353
Interface("currentTimestamp", curTs).
329354
Interface("writesOffTimestamp", writesOffTs).
@@ -332,30 +357,22 @@ func (csr *ChangeStreamReader) iterateChangeStream(ctx context.Context, cs *mong
332357
break
333358
}
334359

335-
err = csr.readAndHandleOneChangeEventBatch(ctx, cs)
360+
err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs)
336361

337362
if err != nil {
338-
break
363+
return err
339364
}
340365
}
341366

342367
default:
343-
err = csr.readAndHandleOneChangeEventBatch(ctx, cs)
368+
err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs)
344369

345370
if err == nil {
346371
err = persistResumeTokenIfNeeded()
347372
}
348-
}
349-
350-
if err != nil && !errors.Is(err, context.Canceled) {
351-
csr.logger.Debug().
352-
Err(err).
353-
Msg("Sending change stream error.")
354-
355-
csr.ChangeStreamErrChan <- err
356373

357-
if !gotwritesOffTimestamp {
358-
break
374+
if err != nil {
375+
return err
359376
}
360377
}
361378

@@ -379,12 +396,14 @@ func (csr *ChangeStreamReader) iterateChangeStream(ctx context.Context, cs *mong
379396
}
380397

381398
infoLog.Msg("Change stream is done.")
399+
400+
return nil
382401
}
383402

384-
// StartChangeStream starts the change stream.
385-
func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
403+
func (csr *ChangeStreamReader) createChangeStream(
404+
ctx context.Context,
405+
) (*mongo.ChangeStream, primitive.Timestamp, error) {
386406
pipeline := csr.GetChangeStreamFilter()
387-
388407
opts := options.ChangeStream().
389408
SetMaxAwaitTime(1 * time.Second).
390409
SetFullDocument(options.UpdateLookup)
@@ -395,7 +414,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
395414

396415
savedResumeToken, err := csr.loadChangeStreamResumeToken(ctx)
397416
if err != nil {
398-
return errors.Wrap(err, "failed to load persisted change stream resume token")
417+
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token")
399418
}
400419

401420
csStartLogEvent := csr.logger.Info()
@@ -422,37 +441,97 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
422441

423442
sess, err := csr.watcherClient.StartSession()
424443
if err != nil {
425-
return errors.Wrap(err, "failed to start session")
444+
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session")
426445
}
427446
sctx := mongo.NewSessionContext(ctx, sess)
428447
changeStream, err := csr.watcherClient.Watch(sctx, pipeline, opts)
429448
if err != nil {
430-
return errors.Wrap(err, "failed to open change stream")
449+
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream")
431450
}
432451

433452
err = csr.persistChangeStreamResumeToken(ctx, changeStream)
434453
if err != nil {
435-
return err
454+
return nil, primitive.Timestamp{}, err
436455
}
437456

438-
csTimestamp, err := extractTimestampFromResumeToken(changeStream.ResumeToken())
457+
startTs, err := extractTimestampFromResumeToken(srcChangeStream.ResumeToken())
439458
if err != nil {
440-
return errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
459+
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
441460
}
442461

462+
// With sharded clusters the resume token might lead the cluster time
463+
// by 1 increment. In that case we need the actual cluster time;
464+
// otherwise we will get errors.
443465
clusterTime, err := getClusterTimeFromSession(sess)
444466
if err != nil {
445-
return errors.Wrap(err, "failed to read cluster time from session")
467+
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session")
446468
}
447469

448-
csr.startAtTs = &csTimestamp
449-
if csTimestamp.After(clusterTime) {
450-
csr.startAtTs = &clusterTime
470+
if startTs.After(clusterTime) {
471+
startTs = clusterTime
451472
}
452473

453-
csr.changeStreamRunning = true
474+
return srcChangeStream, startTs, nil
475+
}
476+
477+
// StartChangeStream starts the change stream.
478+
func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
479+
// This channel holds the first change stream creation's result, whether
480+
// success or failure. Rather than using a Result we could make separate
481+
// Timestamp and error channels, but the single channel is cleaner since
482+
// there's no chance of "nonsense" like both channels returning a payload.
483+
initialCreateResultChan := make(chan mo.Result[primitive.Timestamp])
484+
485+
go func() {
486+
retryer := retry.New(retry.DefaultDurationLimit)
487+
retryer = retryer.WithErrorCodes(util.CursorKilled)
488+
489+
parentThreadWaiting := true
490+
491+
err := retryer.RunForTransientErrorsOnly(
492+
ctx,
493+
verifier.logger,
494+
func(ri *retry.Info) error {
495+
srcChangeStream, startTs, err := verifier.createChangeStream(ctx)
496+
if err != nil {
497+
if parentThreadWaiting {
498+
initialCreateResultChan <- mo.Err[primitive.Timestamp](err)
499+
return nil
500+
}
501+
502+
return err
503+
}
504+
505+
defer srcChangeStream.Close(ctx)
454506

455-
go csr.iterateChangeStream(ctx, changeStream)
507+
if parentThreadWaiting {
508+
initialCreateResultChan <- mo.Ok(startTs)
509+
close(initialCreateResultChan)
510+
parentThreadWaiting = false
511+
}
512+
513+
return verifier.iterateChangeStream(ctx, ri, srcChangeStream)
514+
},
515+
)
516+
517+
if err != nil {
518+
// NB: This failure always happens after the initial change stream
519+
// creation.
520+
verifier.changeStreamErrChan <- err
521+
close(verifier.changeStreamErrChan)
522+
}
523+
}()
524+
525+
result := <-initialCreateResultChan
526+
527+
startTs, err := result.Get()
528+
if err != nil {
529+
return err
530+
}
531+
532+
csr.startAtTs = &startTs
533+
534+
csr.changeStreamRunning = true
456535

457536
return nil
458537
}

0 commit comments

Comments
 (0)