@@ -19,12 +19,12 @@ const fauxDocSizeForDeleteEvents = 1024
1919
2020// ParsedEvent contains the fields of an event that we have parsed from 'bson.Raw'.
2121type ParsedEvent struct {
22- ID interface {} `bson:"_id"`
23- OpType string `bson:"operationType"`
24- Ns * Namespace `bson:"ns,omitempty"`
25- DocKey DocKey `bson:"documentKey,omitempty"`
26- FullDocument bson. Raw `bson:"fullDocument,omitempty"`
27- ClusterTime * primitive.Timestamp `bson:"clusterTime,omitEmpty"`
22+ ID interface {} `bson:"_id"`
23+ OpType string `bson:"operationType"`
24+ Ns * Namespace `bson:"ns,omitempty"`
25+ DocKey DocKey `bson:"documentKey,omitempty"`
26+ DocSize * int `bson:"fullDocument,omitempty"`
27+ ClusterTime * primitive.Timestamp `bson:"clusterTime,omitEmpty"`
2828}
2929
3030func (pe * ParsedEvent ) String () string {
@@ -82,13 +82,13 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
8282 collNames [i ] = changeEvent .Ns .Coll
8383 docIDs [i ] = changeEvent .DocKey .ID
8484
85- if changeEvent .FullDocument == nil {
85+ if changeEvent .DocSize == nil {
8686 // This happens for deletes and for some updates.
8787 // The document is probably, but not necessarily, deleted.
8888 dataSizes [i ] = fauxDocSizeForDeleteEvents
8989 } else {
9090 // This happens for inserts, replaces, and most updates.
91- dataSizes [i ] = len ( changeEvent .FullDocument )
91+ dataSizes [i ] = * changeEvent .DocSize
9292 }
9393 default :
9494 return UnknownEventError {Event : & changeEvent }
@@ -103,16 +103,52 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
103103}
104104
105105func (verifier * Verifier ) GetChangeStreamFilter () []bson.D {
106+ var pipeline mongo.Pipeline
107+
106108 if len (verifier .srcNamespaces ) == 0 {
107- return []bson.D {{bson.E {"$match" , bson.D {{"ns.db" , bson.D {{"$ne" , verifier .metaDBName }}}}}}}
108- }
109- filter := bson.A {}
110- for _ , ns := range verifier .srcNamespaces {
111- db , coll := SplitNamespace (ns )
112- filter = append (filter , bson.D {{"ns" , bson.D {{"db" , db }, {"coll" , coll }}}})
109+ pipeline = []bson.D {{bson.E {"$match" , bson.D {{"ns.db" , bson.D {{"$ne" , verifier .metaDBName }}}}}}}
110+ } else {
111+ filter := bson.A {}
112+ for _ , ns := range verifier .srcNamespaces {
113+ db , coll := SplitNamespace (ns )
114+ filter = append (filter , bson.D {{"ns" , bson.D {{"db" , db }, {"coll" , coll }}}})
115+ }
116+ stage := bson.D {{"$match" , bson.D {{"$or" , filter }}}}
117+ pipeline = []bson.D {stage }
113118 }
114- stage := bson.D {{"$match" , bson.D {{"$or" , filter }}}}
115- return []bson.D {stage }
119+
120+ return append (
121+ pipeline ,
122+ []bson.D {
123+
124+ // Add a documentSize field.
125+ {
126+ {"$addFields" , bson.D {
127+ {"documentSize" , bson.D {
128+ {"$cond" , bson.D {
129+ {"if" , bson.D {
130+ {"$ne" , bson.A {
131+ "missing" ,
132+ bson.D {{"$type" , "$fullDocument" }},
133+ }},
134+ }}, // fullDocument exists
135+ {"then" , bson.D {{"$bsonSize" , "$fullDocument" }}},
136+ {"else" , "$$REMOVE" },
137+ }},
138+ }},
139+ }},
140+ },
141+
142+ // Remove the fullDocument field since a) we don't use it, and
143+ // b) it's big.
144+ {
145+ {"$project" , bson.D {
146+ {"fullDocument" , 0 },
147+ }},
148+ },
149+ }... ,
150+ )
151+
116152}
117153
118154func (verifier * Verifier ) iterateChangeStream (ctx context.Context , cs * mongo.ChangeStream ) {
0 commit comments