Skip to content

Commit 8c5ad8b

Browse files
authored
Enqueue rechecks more efficiently (#154)
- Send a raw `insert` command rather than InsertMany. This prevents the driver from “sneakily” ensuring that each inserted document has an `_id`. (This also requires making TolerateSimpleDuplicateKeyInBulk smarter.) - Avoid bson.Marshal() and bson.Unmarshal in hot paths. Even when the struct implements the Marshaler & Unmarshaler interfaces, the driver still does some extra work that, while not as problematic as reflection, is still better avoided. - Avoid reallocations when marshaling recheck structs.
1 parent 17f7c8d commit 8c5ad8b

File tree

16 files changed

+480
-231
lines changed

16 files changed

+480
-231
lines changed

internal/util/error.go

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/10gen/migration-verifier/mmongo"
1111
mapset "github.com/deckarep/golang-set/v2"
1212
"github.com/pkg/errors"
13+
"github.com/rs/zerolog"
1314
"github.com/samber/lo"
1415
"go.mongodb.org/mongo-driver/v2/bson"
1516
"go.mongodb.org/mongo-driver/v2/mongo"
@@ -465,26 +466,45 @@ func TolerateSimpleDuplicateKeyInBulk(
465466
docsCount int,
466467
err error,
467468
) error {
468-
bwe := mongo.BulkWriteException{}
469-
if errors.As(err, &bwe) && bwe.WriteConcernError == nil {
470-
writeCodes := lo.Map(
471-
bwe.WriteErrors,
472-
func(we mongo.BulkWriteError, _ int) int {
473-
return we.Code
474-
},
475-
)
476-
477-
codesSet := mapset.NewSet(writeCodes...)
478-
if codesSet.Cardinality() == 1 && writeCodes[0] == DuplicateKeyErrCode {
479-
// This will be fairly common since we now listen for change events
480-
// on both source & destination, so use trace level here.
469+
if err == nil {
470+
return nil
471+
}
472+
473+
var writeCodes []int
474+
475+
if bwe := (mongo.BulkWriteException{}); errors.As(err, &bwe) {
476+
if bwe.WriteConcernError == nil {
477+
writeCodes = lo.Map(
478+
bwe.WriteErrors,
479+
func(we mongo.BulkWriteError, _ int) int {
480+
return we.Code
481+
},
482+
)
483+
}
484+
} else if we := (mongo.WriteException{}); errors.As(err, &we) {
485+
if we.WriteConcernError == nil {
486+
writeCodes = lo.Map(
487+
we.WriteErrors,
488+
func(we mongo.WriteError, _ int) int {
489+
return we.Code
490+
},
491+
)
492+
}
493+
}
494+
495+
codesSet := mapset.NewSet(writeCodes...)
496+
if codesSet.Cardinality() == 1 && writeCodes[0] == DuplicateKeyErrCode {
497+
// This will be fairly common since we now listen for change events
498+
// on both source & destination, so use trace level here, and don’t
499+
// do the logger calls at all unless the level warrants.
500+
if logger.GetLevel() <= zerolog.TraceLevel {
481501
logger.Trace().
482502
Int("documentsSubmitted", docsCount).
483503
Int("duplicates", len(writeCodes)).
484504
Msg("Ignoring duplicate key error on recheck inserts.")
485-
486-
err = nil
487505
}
506+
507+
err = nil
488508
}
489509

490510
return err

internal/verifier/change_stream.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch ch
177177
dbNames := make([]string, len(batch.events))
178178
collNames := make([]string, len(batch.events))
179179
docIDs := make([]bson.RawValue, len(batch.events))
180-
dataSizes := make([]int, len(batch.events))
180+
dataSizes := make([]int32, len(batch.events))
181181

182182
latestTimestamp := bson.Timestamp{}
183183

@@ -230,14 +230,14 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch ch
230230
docIDs[i] = changeEvent.DocID
231231

232232
if changeEvent.FullDocLen.OrZero() > 0 {
233-
dataSizes[i] = int(changeEvent.FullDocLen.OrZero())
233+
dataSizes[i] = int32(changeEvent.FullDocLen.OrZero())
234234
} else if changeEvent.FullDocument == nil {
235235
// This happens for deletes and for some updates.
236236
// The document is probably, but not necessarily, deleted.
237237
dataSizes[i] = fauxDocSizeForDeleteEvents
238238
} else {
239239
// This happens for inserts, replaces, and most updates.
240-
dataSizes[i] = len(changeEvent.FullDocument)
240+
dataSizes[i] = int32(len(changeEvent.FullDocument))
241241
}
242242

243243
if err := eventRecorder.AddEvent(&changeEvent); err != nil {
@@ -382,7 +382,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
382382

383383
batchTotalBytes += len(cs.Current)
384384

385-
if err := cs.Decode(&changeEvents[eventsRead]); err != nil {
385+
if err := (&changeEvents[eventsRead]).UnmarshalFromBSON(cs.Current); err != nil {
386386
return errors.Wrapf(err, "failed to decode change event to %T", changeEvents[eventsRead])
387387
}
388388

internal/verifier/change_stream_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/10gen/migration-verifier/internal/testutil"
1212
"github.com/10gen/migration-verifier/internal/types"
1313
"github.com/10gen/migration-verifier/internal/util"
14+
"github.com/10gen/migration-verifier/internal/verifier/recheck"
1415
"github.com/10gen/migration-verifier/mbson"
1516
"github.com/10gen/migration-verifier/mslices"
1617
"github.com/10gen/migration-verifier/mstrings"
@@ -127,7 +128,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_BsonSize() {
127128
suite.Assert().Less(len(cs.Current), 10_000, "event should not be large")
128129

129130
parsed := ParsedEvent{}
130-
suite.Require().NoError(cs.Decode(&parsed))
131+
suite.Require().NoError((&parsed).UnmarshalFromBSON(cs.Current))
131132
suite.Require().Equal("insert", parsed.OpType)
132133

133134
suite.Require().Equal(
@@ -144,7 +145,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_BsonSize() {
144145

145146
suite.Require().True(cs.Next(ctx), "should get event")
146147
parsed = ParsedEvent{}
147-
suite.Require().NoError(cs.Decode(&parsed))
148+
suite.Require().NoError((&parsed).UnmarshalFromBSON(cs.Current))
148149
suite.Require().Equal("delete", parsed.OpType)
149150
suite.Require().True(parsed.FullDocLen.IsNone(), "full doc len not in delete")
150151
}
@@ -916,7 +917,7 @@ func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() {
916917
_, err = coll2.InsertOne(ctx, bson.D{{"_id", 1}})
917918
suite.Require().NoError(err)
918919

919-
var rechecks []RecheckDoc
920+
var rechecks []recheck.Doc
920921
require.Eventually(
921922
suite.T(),
922923
func() bool {

internal/verifier/compare.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ func (verifier *Verifier) compareDocsFromChannels(
322322
Details: Missing,
323323
Cluster: ClusterTarget,
324324
NameSpace: namespace,
325-
dataSize: len(docWithTs.doc),
325+
dataSize: int32(len(docWithTs.doc)),
326326
SrcTimestamp: option.Some(docWithTs.ts),
327327
},
328328
)
@@ -339,7 +339,7 @@ func (verifier *Verifier) compareDocsFromChannels(
339339
Details: Missing,
340340
Cluster: ClusterSource,
341341
NameSpace: namespace,
342-
dataSize: len(docWithTs.doc),
342+
dataSize: int32(len(docWithTs.doc)),
343343
DstTimestamp: option.Some(docWithTs.ts),
344344
},
345345
)
@@ -732,7 +732,7 @@ func (verifier *Verifier) compareOneDocument(srcClientDoc, dstClientDoc bson.Raw
732732
Details: Mismatch + fmt.Sprintf(" : Document %s has fields in different order", srcClientDoc.Lookup("_id")),
733733
Cluster: ClusterTarget,
734734
NameSpace: namespace,
735-
dataSize: dataSize,
735+
dataSize: int32(dataSize),
736736
}}, nil
737737
}
738738
results := mismatchResultsToVerificationResults(mismatch, srcClientDoc, dstClientDoc, namespace, srcClientDoc.Lookup("_id"), "" /* fieldPrefix */)

internal/verifier/migration_verifier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -565,7 +565,7 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int,
565565
Int("mismatchesCount", len(problems)).
566566
Msg("Discrepancies found. Will recheck in the next generation.")
567567

568-
dataSizes := make([]int, 0, len(problems))
568+
dataSizes := make([]int32, 0, len(problems))
569569

570570
// This stores all IDs for the next generation to check.
571571
// Its length should equal len(mismatches) + len(missingIds).

internal/verifier/migration_verifier_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -941,21 +941,21 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() {
941941
ctx,
942942
"foo.bar",
943943
mslices.Of(mbson.ToRawValue(42)),
944-
[]int{100},
944+
[]int32{100},
945945
)
946946
suite.Require().NoError(err)
947947
err = verifier.InsertFailedCompareRecheckDocs(
948948
ctx,
949949
"foo.bar",
950950
mslices.Of(mbson.ToRawValue(43), mbson.ToRawValue(44)),
951-
[]int{100, 100},
951+
[]int32{100, 100},
952952
)
953953
suite.Require().NoError(err)
954954
err = verifier.InsertFailedCompareRecheckDocs(
955955
ctx,
956956
"foo.bar2",
957957
mslices.Of(mbson.ToRawValue(42)),
958-
[]int{100},
958+
[]int32{100},
959959
)
960960
suite.Require().NoError(err)
961961
event := ParsedEvent{

internal/verifier/parsed_event.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,20 @@ func (pe *ParsedEvent) String() string {
2626

2727
var _ bson.Unmarshaler = &ParsedEvent{}
2828

29-
// UnmarshalBSON implements bson.Unmarshaler. We define this manually to
30-
// avoid reflection, which can substantially impede performance in “hot”
31-
// code paths like this.
29+
// UnmarshalBSON implements bson.Unmarshaler but panics because
30+
// it’s faster to use UnmarshalFromBSON than bson.Unmarshal().
3231
func (pe *ParsedEvent) UnmarshalBSON(in []byte) error {
33-
els, err := bson.Raw(in).Elements()
34-
if err != nil {
35-
return errors.Wrapf(err, "parsing elements")
36-
}
32+
panic("Use UnmarshalFromBSON instead.")
33+
}
34+
35+
// UnmarshalFromBSON unmarshals from BSON without the overhead of
36+
// bson.Unmarshal.
37+
func (pe *ParsedEvent) UnmarshalFromBSON(in []byte) error {
38+
for el, err := range mbson.RawElements(in) {
39+
if err != nil {
40+
return errors.Wrapf(err, "parsing elements")
41+
}
3742

38-
for _, el := range els {
3943
key, err := el.KeyErr()
4044
if err != nil {
4145
return errors.Wrapf(err, "parsing field name")
@@ -59,7 +63,7 @@ func (pe *ParsedEvent) UnmarshalBSON(in []byte) error {
5963

6064
ns := Namespace{}
6165

62-
err = bson.Unmarshal(rvDoc, &ns)
66+
err = (&ns).UnmarshalFromBSON(rvDoc)
6367
if err != nil {
6468
return errors.Wrapf(err, "unmarshaling %#q value", key)
6569
}

0 commit comments

Comments
 (0)