@@ -15,13 +15,16 @@ import (
1515 "golang.org/x/exp/constraints"
1616)
1717
18+ const fauxDocSizeForDeleteEvents = 1024
19+
1820// ParsedEvent contains the fields of an event that we have parsed from 'bson.Raw'.
1921type ParsedEvent struct {
20- ID interface {} `bson:"_id"`
21- OpType string `bson:"operationType"`
22- Ns * Namespace `bson:"ns,omitempty"`
23- DocKey DocKey `bson:"documentKey,omitempty"`
24- ClusterTime * primitive.Timestamp `bson:"clusterTime,omitEmpty"`
22+ ID interface {} `bson:"_id"`
23+ OpType string `bson:"operationType"`
24+ Ns * Namespace `bson:"ns,omitempty"`
25+ DocKey DocKey `bson:"documentKey,omitempty"`
26+ FullDocument bson.Raw `bson:"fullDocument,omitempty"`
27+ ClusterTime * primitive.Timestamp `bson:"clusterTime,omitEmpty"`
2528}
2629
2730func (pe * ParsedEvent ) String () string {
@@ -79,12 +82,14 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
7982 collNames [i ] = changeEvent .Ns .Coll
8083 docIDs [i ] = changeEvent .DocKey .ID
8184
82- // We don't know the document sizes for documents for all change events,
83- // so just be conservative and assume they are maximum size.
84- //
85- // Note that this prevents us from being able to report a meaningful
86- // total data size for noninitial generations in the log.
87- dataSizes [i ] = maxBSONObjSize
85+ if changeEvent .FullDocument == nil {
86+ // This happens for deletes and for some updates.
87+ // The document is probably, but not necessarily, deleted.
88+ dataSizes [i ] = fauxDocSizeForDeleteEvents
89+ } else {
90+ // This happens for inserts, replaces, and most updates.
91+ dataSizes [i ] = len (changeEvent .FullDocument )
92+ }
8893 default :
8994 return UnknownEventError {Event : & changeEvent }
9095 }
@@ -236,7 +241,9 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
236241// StartChangeStream starts the change stream.
237242func (verifier * Verifier ) StartChangeStream (ctx context.Context ) error {
238243 pipeline := verifier .GetChangeStreamFilter ()
239- opts := options .ChangeStream ().SetMaxAwaitTime (1 * time .Second )
244+ opts := options .ChangeStream ().
245+ SetMaxAwaitTime (1 * time .Second ).
246+ SetFullDocument (options .UpdateLookup )
240247
241248 savedResumeToken , err := verifier .loadChangeStreamResumeToken (ctx )
242249 if err != nil {
0 commit comments