Skip to content

Commit d7ce176

Browse files
authored
Further change stream optimizations (#135)
This optimizes the change stream by: - Removing `updateLookup`. It’s there so that we can group rechecks into more evenly-sized tasks, but it incurs the overhead of fetching the document on each change event. That overhead almost certainly outweighs the task-sizing benefit. - Removing `wallTime`. When change events are small, this uselessly inflates the event and reduces the # of events we can parse at once. - Replacing `documentKey` with a new `_docID` field that just contains the document ID, which is the only part of the document key that we actually need. (The shard key fields are not needed in the change event.) This also removes the internal ParsedEvent.ID field, which nothing actually needs.
1 parent ee2106a commit d7ce176

File tree

4 files changed

+22
-34
lines changed

4 files changed

+22
-34
lines changed

internal/verifier/change_stream.go

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,23 +41,16 @@ var supportedEventOpTypes = mapset.NewSet(
4141

4242
// ParsedEvent contains the fields of an event that we have parsed from 'bson.Raw'.
4343
type ParsedEvent struct {
44-
ID any `bson:"_id"`
4544
OpType string `bson:"operationType"`
4645
Ns *Namespace `bson:"ns,omitempty"`
47-
DocKey DocKey `bson:"documentKey,omitempty"`
46+
DocID any `bson:"_docID,omitempty"`
4847
FullDocument bson.Raw `bson:"fullDocument,omitempty"`
4948
FullDocLen option.Option[types.ByteCount] `bson:"_fullDocLen"`
5049
ClusterTime *primitive.Timestamp `bson:"clusterTime,omitEmpty"`
5150
}
5251

5352
func (pe *ParsedEvent) String() string {
54-
return fmt.Sprintf("{OpType: %s, namespace: %s, docID: %v, clusterTime: %v}", pe.OpType, pe.Ns, pe.DocKey.ID, pe.ClusterTime)
55-
}
56-
57-
// DocKey is a deserialized form for the ChangeEvent documentKey field. We currently only care about
58-
// the _id.
59-
type DocKey struct {
60-
ID any `bson:"_id"`
53+
return fmt.Sprintf("{OpType: %s, namespace: %s, docID: %v, clusterTime: %v}", pe.OpType, pe.Ns, pe.DocID, pe.ClusterTime)
6154
}
6255

6356
const (
@@ -244,7 +237,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch ch
244237

245238
dbNames[i] = srcDBName
246239
collNames[i] = srcCollName
247-
docIDs[i] = changeEvent.DocKey.ID
240+
docIDs[i] = changeEvent.DocID
248241

249242
if changeEvent.FullDocLen.OrZero() > 0 {
250243
dataSizes[i] = int(changeEvent.FullDocLen.OrZero())
@@ -323,8 +316,12 @@ func (csr *ChangeStreamReader) GetChangeStreamFilter() (pipeline mongo.Pipeline)
323316
pipeline = append(
324317
pipeline,
325318
bson.D{
326-
{"$unset", []string{
327-
"updateDescription",
319+
{"$addFields", bson.D{
320+
{"_docID", "$documentKey._id"},
321+
322+
{"updateDescription", "$$REMOVE"},
323+
{"wallTime", "$$REMOVE"},
324+
{"documentKey", "$$REMOVE"},
328325
}},
329326
},
330327
)
@@ -622,8 +619,7 @@ func (csr *ChangeStreamReader) createChangeStream(
622619
) (*mongo.ChangeStream, mongo.Session, primitive.Timestamp, error) {
623620
pipeline := csr.GetChangeStreamFilter()
624621
opts := options.ChangeStream().
625-
SetMaxAwaitTime(maxChangeStreamAwaitTime).
626-
SetFullDocument(options.UpdateLookup)
622+
SetMaxAwaitTime(maxChangeStreamAwaitTime)
627623

628624
if csr.clusterInfo.VersionArray[0] >= 6 {
629625
opts = opts.SetCustomPipeline(bson.M{"showExpandedEvents": true})

internal/verifier/change_stream_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_BsonSize() {
108108
cs, err := suite.srcMongoClient.Watch(
109109
ctx,
110110
filter,
111-
options.ChangeStream().SetFullDocument("updateLookup"),
112111
)
113112
suite.Require().NoError(err)
114113
defer cs.Close(ctx)
@@ -125,17 +124,18 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_BsonSize() {
125124

126125
suite.Require().True(cs.Next(ctx), "should get event")
127126

128-
suite.Require().Equal(
129-
"abc",
130-
cs.Current.Lookup("documentKey", "_id").StringValue(),
131-
"event should reference expected document",
132-
)
133127
suite.Assert().Less(len(cs.Current), 10_000, "event should not be large")
134128

135129
parsed := ParsedEvent{}
136130
suite.Require().NoError(cs.Decode(&parsed))
137131
suite.Require().Equal("insert", parsed.OpType)
138132

133+
suite.Require().Equal(
134+
"abc",
135+
parsed.DocID,
136+
"event should reference expected document",
137+
)
138+
139139
suite.Require().True(parsed.FullDocLen.IsSome(), "full doc len should be in event")
140140
suite.Assert().Greater(parsed.FullDocLen.MustGet(), types.ByteCount(10_000))
141141

@@ -313,7 +313,8 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
313313
"docID": "heyhey",
314314
},
315315
recheckDocs[0]["_id"],
316-
"recheck doc should have expected ID",
316+
"recheck doc (%v) should have expected ID",
317+
recheckDocs[0],
317318
)
318319
}
319320

internal/verifier/migration_verifier_test.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -577,9 +577,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck()
577577
events: []ParsedEvent{{
578578
OpType: "insert",
579579
Ns: &Namespace{DB: "mydb", Coll: "coll2"},
580-
DocKey: DocKey{
581-
ID: "heyhey",
582-
},
580+
DocID: "heyhey",
583581
ClusterTime: &primitive.Timestamp{
584582
T: uint32(time.Now().Unix()),
585583
},
@@ -593,14 +591,9 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck()
593591
ctx,
594592
changeEventBatch{
595593
events: []ParsedEvent{{
596-
ID: bson.M{
597-
"docID": "ID/docID",
598-
},
599594
OpType: "insert",
600595
Ns: &Namespace{DB: "mydb", Coll: "coll1"},
601-
DocKey: DocKey{
602-
ID: "hoohoo",
603-
},
596+
DocID: "hoohoo",
604597
ClusterTime: &primitive.Timestamp{
605598
T: uint32(time.Now().Unix()),
606599
},
@@ -842,7 +835,7 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() {
842835
err = verifier.InsertFailedCompareRecheckDocs(ctx, "foo.bar2", []any{42}, []int{100})
843836
suite.Require().NoError(err)
844837
event := ParsedEvent{
845-
DocKey: DocKey{ID: int32(55)},
838+
DocID: int32(55),
846839
OpType: "delete",
847840
Ns: &Namespace{
848841
DB: "foo",

internal/verifier/recheck_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() {
4848

4949
event := ParsedEvent{
5050
OpType: "insert",
51-
DocKey: DocKey{
52-
ID: "theDocID",
53-
},
51+
DocID: "theDocID",
5452
Ns: &Namespace{
5553
DB: "the",
5654
Coll: "namespace",

0 commit comments

Comments
 (0)