-
Notifications
You must be signed in to change notification settings - Fork 15
REP-5283 Report change event document sizes accurately. #43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
4dba79b
32c178c
edd2437
896570a
f8b38ae
11e84e0
8587b86
e987143
8f46f20
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,12 +15,15 @@ import ( | |
| "golang.org/x/exp/constraints" | ||
| ) | ||
|
|
||
| const fauxDocSizeForDeleteEvents = 1024 | ||
|
|
||
| // ParsedEvent contains the fields of an event that we have parsed from 'bson.Raw'. | ||
| type ParsedEvent struct { | ||
| ID interface{} `bson:"_id"` | ||
| OpType string `bson:"operationType"` | ||
| Ns *Namespace `bson:"ns,omitempty"` | ||
| DocKey DocKey `bson:"documentKey,omitempty"` | ||
| DocSize *int `bson:"fullDocument,omitempty"` | ||
| ClusterTime *primitive.Timestamp `bson:"clusterTime,omitEmpty"` | ||
| } | ||
|
|
||
|
|
@@ -79,12 +82,14 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch [] | |
| collNames[i] = changeEvent.Ns.Coll | ||
| docIDs[i] = changeEvent.DocKey.ID | ||
|
|
||
| // We don't know the document sizes for documents for all change events, | ||
| // so just be conservative and assume they are maximum size. | ||
| // | ||
| // Note that this prevents us from being able to report a meaningful | ||
| // total data size for noninitial generations in the log. | ||
| dataSizes[i] = maxBSONObjSize | ||
| if changeEvent.DocSize == nil { | ||
| // This happens for deletes and for some updates. | ||
| // The document is probably, but not necessarily, deleted. | ||
| dataSizes[i] = fauxDocSizeForDeleteEvents | ||
| } else { | ||
| // This happens for inserts, replaces, and most updates. | ||
| dataSizes[i] = *changeEvent.DocSize | ||
| } | ||
| default: | ||
| return UnknownEventError{Event: &changeEvent} | ||
| } | ||
|
|
@@ -98,16 +103,52 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch [] | |
| } | ||
|
|
||
| func (verifier *Verifier) GetChangeStreamFilter() []bson.D { | ||
| var pipeline mongo.Pipeline | ||
|
|
||
| if len(verifier.srcNamespaces) == 0 { | ||
| return []bson.D{{bson.E{"$match", bson.D{{"ns.db", bson.D{{"$ne", verifier.metaDBName}}}}}}} | ||
| } | ||
| filter := bson.A{} | ||
| for _, ns := range verifier.srcNamespaces { | ||
| db, coll := SplitNamespace(ns) | ||
| filter = append(filter, bson.D{{"ns", bson.D{{"db", db}, {"coll", coll}}}}) | ||
| pipeline = []bson.D{{bson.E{"$match", bson.D{{"ns.db", bson.D{{"$ne", verifier.metaDBName}}}}}}} | ||
| } else { | ||
| filter := bson.A{} | ||
| for _, ns := range verifier.srcNamespaces { | ||
| db, coll := SplitNamespace(ns) | ||
| filter = append(filter, bson.D{{"ns", bson.D{{"db", db}, {"coll", coll}}}}) | ||
| } | ||
| stage := bson.D{{"$match", bson.D{{"$or", filter}}}} | ||
| pipeline = []bson.D{stage} | ||
| } | ||
| stage := bson.D{{"$match", bson.D{{"$or", filter}}}} | ||
| return []bson.D{stage} | ||
|
|
||
| return append( | ||
| pipeline, | ||
| []bson.D{ | ||
|
|
||
| // Add a documentSize field. | ||
| { | ||
| {"$addFields", bson.D{ | ||
| {"documentSize", bson.D{ | ||
| {"$cond", bson.D{ | ||
| {"if", bson.D{ | ||
| {"$ne", bson.A{ | ||
| "missing", | ||
|
||
| bson.D{{"$type", "$fullDocument"}}, | ||
| }}, | ||
| }}, // fullDocument exists | ||
| {"then", bson.D{{"$bsonSize", "$fullDocument"}}}, | ||
|
||
| {"else", "$$REMOVE"}, | ||
| }}, | ||
| }}, | ||
| }}, | ||
| }, | ||
|
|
||
| // Remove the fullDocument field since a) we don't use it, and | ||
| // b) it's big. | ||
| { | ||
| {"$project", bson.D{ | ||
| {"fullDocument", 0}, | ||
| }}, | ||
| }, | ||
| }..., | ||
| ) | ||
|
|
||
| } | ||
|
|
||
| func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) { | ||
|
|
@@ -236,7 +277,9 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha | |
| // StartChangeStream starts the change stream. | ||
| func (verifier *Verifier) StartChangeStream(ctx context.Context) error { | ||
| pipeline := verifier.GetChangeStreamFilter() | ||
| opts := options.ChangeStream().SetMaxAwaitTime(1 * time.Second) | ||
| opts := options.ChangeStream(). | ||
| SetMaxAwaitTime(1 * time.Second). | ||
| SetFullDocument(options.UpdateLookup) | ||
|
|
||
| savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx) | ||
| if err != nil { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BSON tag should probably be
documentSize.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops! Good catch.