Skip to content

Commit 5fb4f11

Browse files
authored
Merge pull request #158 from FGasper/felipe_reduce_change_stream_recheck_dupes
Migration Verifier listens for changes on both source & destination clusters and enqueues rechecks in the same collection for both. Thus, any changes that hit the source also hit the destination. Thus, for every change on the source we expect to see a duplicate change on the destination. We handle this via an insert with tolerance for duplicate keys. The server’s duplicate-key path, though, is quite slow. It’s much faster just to write both documents and then deduplicate them when reading. This changeset makes that change. Rechecks triggered by source changes are no longer document-level duplicates of destination-triggered rechecks because the `_id` now contains a `rand` field, set to a random int32, that distinguishes them. When we convert those into recheck tasks, we project the `_id.rand` field out so that it’s easy to deduplicate them. This also avoids duplicate-key errors in the “hot documents” case as well.
1 parent 7b19d5e commit 5fb4f11

File tree

3 files changed

+79
-12
lines changed

3 files changed

+79
-12
lines changed

internal/verifier/change_stream_test.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,16 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
306306
"the verifier should enqueue a recheck",
307307
)
308308

309+
require.Len(suite.T(), recheckDocs, 1)
310+
311+
recheckDocs[0]["_id"] = lo.Filter(
312+
recheckDocs[0]["_id"].(bson.D),
313+
func(el bson.E, _ int) bool {
314+
return el.Key != "rand"
315+
},
316+
)
317+
delete(recheckDocs[0], "rand")
318+
309319
suite.Assert().Equal(
310320
bson.D{
311321
{"db", suite.DBNameForTest()},
@@ -335,7 +345,21 @@ func (suite *IntegrationTestSuite) fetchVerifierRechecks(ctx context.Context, ve
335345
recheckDocs := []bson.M{}
336346

337347
recheckColl := verifier.getRecheckQueueCollection(verifier.generation)
338-
cursor, err := recheckColl.Find(ctx, bson.D{})
348+
cursor, err := recheckColl.Aggregate(
349+
ctx,
350+
mongo.Pipeline{
351+
{{"$addFields", bson.D{
352+
{"_id.rand", "$$REMOVE"},
353+
}}},
354+
{{"$group", bson.D{
355+
{"_id", "$_id"},
356+
{"doc", bson.D{{"$first", "$$ROOT"}}},
357+
}}},
358+
{{"$replaceRoot", bson.D{
359+
{"newRoot", "$doc"},
360+
}}},
361+
},
362+
)
339363

340364
if !errors.Is(err, mongo.ErrNoDocuments) {
341365
suite.Require().NoError(err)

internal/verifier/recheck.go

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package verifier
33
import (
44
"context"
55
"fmt"
6+
"math/rand/v2"
7+
"time"
68

79
"github.com/10gen/migration-verifier/contextplus"
810
"github.com/10gen/migration-verifier/internal/reportutils"
@@ -35,6 +37,17 @@ type RecheckPrimaryKey struct {
3537
SrcDatabaseName string `bson:"db"`
3638
SrcCollectionName string `bson:"coll"`
3739
DocumentID bson.RawValue `bson:"docID"`
40+
41+
// Rand is here to allow “duplicate” entries. We do this because, with
42+
// multiple change streams returning the same events, we expect duplicate
43+
// key errors to be frequent. The server is quite slow in handling such
44+
// errors, though. To avoid that, while still allowing the _id index to
45+
// facilitate easy sorting of the duplicates, we set this field to a
46+
// random value on each entry.
47+
//
48+
// This also avoids duplicate-key slowness where the source workload
49+
// involves frequent writes to a small number of documents.
50+
Rand int32
3851
}
3952

4053
var _ bson.Marshaler = &RecheckPrimaryKey{}
@@ -47,6 +60,7 @@ func (rk *RecheckPrimaryKey) MarshalBSON() ([]byte, error) {
4760
Type: bsoncore.Type(rk.DocumentID.Type),
4861
Data: rk.DocumentID.Value,
4962
}).
63+
AppendInt32("rand", rk.Rand).
5064
Build(), nil
5165
}
5266

@@ -86,13 +100,7 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs(
86100
Int("count", len(documentIDs)).
87101
Msg("Persisting rechecks for mismatched or missing documents.")
88102

89-
return verifier.insertRecheckDocs(
90-
ctx,
91-
dbNames,
92-
collNames,
93-
documentIDs,
94-
dataSizes,
95-
)
103+
return verifier.insertRecheckDocs(ctx, dbNames, collNames, documentIDs, dataSizes)
96104
}
97105

98106
func (verifier *Verifier) insertRecheckDocs(
@@ -168,6 +176,7 @@ func (verifier *Verifier) insertRecheckDocs(
168176
SrcDatabaseName: dbName,
169177
SrcCollectionName: collNames[i],
170178
DocumentID: rawDocIDs[i],
179+
Rand: rand.Int32(),
171180
},
172181
DataSize: dataSizes[i],
173182
}
@@ -344,6 +353,8 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
344353
Int("rechecksCount", int(rechecksCount)).
345354
Msgf("Creating recheck tasks from prior generation’s enqueued rechecks.")
346355

356+
startTime := time.Now()
357+
347358
// We generate one recheck task per collection, unless
348359
// 1) The size of the list of IDs would exceed 12MB (a very conservative way of avoiding
349360
// the 16MB BSON limit)
@@ -364,7 +375,11 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
364375
cursor, err := recheckColl.Find(
365376
ctx,
366377
bson.D{},
367-
options.Find().SetSort(bson.D{{"_id", 1}}),
378+
options.Find().
379+
SetSort(bson.D{{"_id", 1}}).
380+
SetProjection(bson.D{
381+
{"_id.rand", 0},
382+
}),
368383
)
369384
if err != nil {
370385
return err
@@ -403,6 +418,8 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
403418
return nil
404419
}
405420

421+
var lastIDRaw bson.RawValue
422+
406423
// We group these here using a sort rather than using aggregate because aggregate is
407424
// subject to a 16MB limit on group size.
408425
for cursor.Next(ctx) {
@@ -439,8 +456,20 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
439456
idsSizer = util.BSONArraySizer{}
440457
dataSizeAccum = 0
441458
idAccum = idAccum[:0]
459+
lastIDRaw = bson.RawValue{}
460+
}
461+
462+
// We’re iterating the rechecks in order such that, if the same doc
463+
// gets enqueued from multiple sources, we’ll see those records
464+
// consecutively. We can deduplicate here, then, by checking to see if
465+
// the doc ID has changed. (NB: At this point we know the namespace
466+
// has *not* changed because we just checked for that.)
467+
if idRaw.Equal(lastIDRaw) {
468+
continue
442469
}
443470

471+
lastIDRaw = idRaw
472+
444473
idsSizer.Add(idRaw)
445474
dataSizeAccum += int64(doc.DataSize)
446475
idAccum = append(idAccum, doc.PrimaryKey.DocumentID)
@@ -461,6 +490,7 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
461490
Int("generation", 1+prevGeneration).
462491
Int64("totalDocs", int64(totalDocs)).
463492
Str("totalData", reportutils.FmtBytes(totalRecheckData)).
493+
Stringer("timeElapsed", time.Since(startTime)).
464494
Msg("Scheduled documents for recheck in the new generation.")
465495
}
466496

internal/verifier/recheck_test.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/rs/zerolog"
1414
"github.com/samber/lo"
1515
"go.mongodb.org/mongo-driver/v2/bson"
16+
"go.mongodb.org/mongo-driver/v2/mongo"
1617
"go.mongodb.org/mongo-driver/v2/mongo/options"
1718
)
1819

@@ -85,11 +86,23 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() {
8586
func (suite *IntegrationTestSuite) fetchRecheckDocs(ctx context.Context, verifier *Verifier) []RecheckDoc {
8687
metaColl := verifier.getRecheckQueueCollection(verifier.generation)
8788

88-
cursor, err := metaColl.Find(
89+
cursor, err := metaColl.Aggregate(
8990
ctx,
90-
bson.D{},
91-
options.Find().SetProjection(bson.D{{"dataSize", 0}}),
91+
mongo.Pipeline{
92+
{{"$addFields", bson.D{
93+
{"_id.rand", "$$REMOVE"},
94+
{"dataSize", "$$REMOVE"},
95+
}}},
96+
{{"$group", bson.D{
97+
{"_id", "$_id"},
98+
{"doc", bson.D{{"$first", "$$ROOT"}}},
99+
}}},
100+
{{"$replaceRoot", bson.D{
101+
{"newRoot", "$doc"},
102+
}}},
103+
},
92104
)
105+
93106
suite.Require().NoError(err, "find recheck docs")
94107

95108
var results []RecheckDoc

0 commit comments

Comments
 (0)