Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 58 additions & 15 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ import (
"golang.org/x/exp/constraints"
)

const fauxDocSizeForDeleteEvents = 1024

// ParsedEvent contains the fields of an event that we have parsed from 'bson.Raw'.
type ParsedEvent struct {
ID interface{} `bson:"_id"`
OpType string `bson:"operationType"`
Ns *Namespace `bson:"ns,omitempty"`
DocKey DocKey `bson:"documentKey,omitempty"`
DocSize *int `bson:"fullDocument,omitempty"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BSON tag should probably be documentSize.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops! Good catch.

ClusterTime *primitive.Timestamp `bson:"clusterTime,omitEmpty"`
}

Expand Down Expand Up @@ -79,12 +82,14 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
collNames[i] = changeEvent.Ns.Coll
docIDs[i] = changeEvent.DocKey.ID

// We don't know the document sizes for documents for all change events,
// so just be conservative and assume they are maximum size.
//
// Note that this prevents us from being able to report a meaningful
// total data size for noninitial generations in the log.
dataSizes[i] = maxBSONObjSize
if changeEvent.DocSize == nil {
// This happens for deletes and for some updates.
// The document is probably, but not necessarily, deleted.
dataSizes[i] = fauxDocSizeForDeleteEvents
} else {
// This happens for inserts, replaces, and most updates.
dataSizes[i] = *changeEvent.DocSize
}
default:
return UnknownEventError{Event: &changeEvent}
}
Expand All @@ -98,16 +103,52 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
}

func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
var pipeline mongo.Pipeline

if len(verifier.srcNamespaces) == 0 {
return []bson.D{{bson.E{"$match", bson.D{{"ns.db", bson.D{{"$ne", verifier.metaDBName}}}}}}}
}
filter := bson.A{}
for _, ns := range verifier.srcNamespaces {
db, coll := SplitNamespace(ns)
filter = append(filter, bson.D{{"ns", bson.D{{"db", db}, {"coll", coll}}}})
pipeline = []bson.D{{{"$match", bson.D{{"ns.db", bson.D{{"$ne", verifier.metaDBName}}}}}}}
} else {
filter := []bson.D{}
for _, ns := range verifier.srcNamespaces {
db, coll := SplitNamespace(ns)
filter = append(filter, bson.D{{"ns", bson.D{{"db", db}, {"coll", coll}}}})
}
stage := bson.D{{"$match", bson.D{{"$or", filter}}}}
pipeline = []bson.D{stage}
}
stage := bson.D{{"$match", bson.D{{"$or", filter}}}}
return []bson.D{stage}

return append(
pipeline,
[]bson.D{

// Add a documentSize field.
{
{"$addFields", bson.D{
{"documentSize", bson.D{
{"$cond", bson.D{
{"if", bson.D{
{"$ne", bson.A{
"missing",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check the fullDocument missing case? $bsonSize returns 0 on null value.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should avoid “magic values” when possible. I realize that 0 often serves in that role, but if we can avoid that I think it’s a small code-quality win.

bson.D{{"$type", "$fullDocument"}},
}},
}}, // fullDocument exists
{"then", bson.D{{"$bsonSize", "$fullDocument"}}},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$bsonSize is added in v.4.4. Can we fall back to the old slower logic for v4.2 and below?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for checking this. I checked as far back as 5.0 but not earlier.

That makes me think we should just do fullDocument. I don’t think this is where any bottlenecks are anyway.

Thoughts?

{"else", "$$REMOVE"},
}},
}},
}},
},

// Remove the fullDocument field since a) we don't use it, and
// b) it's big.
{
{"$project", bson.D{
{"fullDocument", 0},
}},
},
}...,
)

}

func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) {
Expand Down Expand Up @@ -236,7 +277,9 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
// StartChangeStream starts the change stream.
func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
pipeline := verifier.GetChangeStreamFilter()
opts := options.ChangeStream().SetMaxAwaitTime(1 * time.Second)
opts := options.ChangeStream().
SetMaxAwaitTime(1 * time.Second).
SetFullDocument(options.UpdateLookup)

savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion internal/verifier/recheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (suite *MultiMetaVersionTestSuite) TestFailedCompareThenReplace() {
DB: "the",
Coll: "namespace",
},
DocSize: func() *int { v := 123; return &v }(),
}

err := verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
Expand All @@ -63,7 +64,7 @@ func (suite *MultiMetaVersionTestSuite) TestFailedCompareThenReplace() {
CollectionName: "namespace",
DocumentID: "theDocID",
},
DataSize: maxBSONObjSize,
DataSize: *event.DocSize,
},
},
recheckDocs,
Expand Down
Loading