Skip to content

Commit 5bf64f6

Browse files
committed
Felipe's review
1 parent a9cb0c1 commit 5bf64f6

File tree

9 files changed

+164
-87
lines changed

9 files changed

+164
-87
lines changed

internal/verifier/change_stream.go

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,10 @@ type ChangeStreamReader struct {
5959
lastChangeEventTime *primitive.Timestamp
6060
logger *logger.Logger
6161
namespaces []string
62-
metaDBName string
6362

64-
metaClient *mongo.Client
63+
metaDB *mongo.Database
6564
watcherClient *mongo.Client
66-
buildInfo *util.BuildInfo
65+
buildInfo util.BuildInfo
6766

6867
changeStreamRunning bool
6968
ChangeEventBatchChan chan []ParsedEvent
@@ -77,35 +76,29 @@ type ChangeStreamReader struct {
7776
func (verifier *Verifier) initializeChangeStreamReaders() {
7877
verifier.srcChangeStreamReader = &ChangeStreamReader{
7978
readerType: srcReaderType,
80-
lastChangeEventTime: nil,
8179
logger: verifier.logger,
8280
namespaces: verifier.srcNamespaces,
83-
metaDBName: verifier.metaDBName,
84-
metaClient: verifier.metaClient,
81+
metaDB: verifier.metaClient.Database(verifier.metaDBName),
8582
watcherClient: verifier.srcClient,
86-
buildInfo: verifier.srcBuildInfo,
83+
buildInfo: *verifier.srcBuildInfo,
8784
changeStreamRunning: false,
8885
ChangeEventBatchChan: make(chan []ParsedEvent),
8986
ChangeStreamWritesOffTsChan: make(chan primitive.Timestamp),
9087
ChangeStreamErrChan: make(chan error),
9188
ChangeStreamDoneChan: make(chan struct{}),
92-
startAtTs: nil,
9389
}
9490
verifier.dstChangeStreamReader = &ChangeStreamReader{
9591
readerType: dstReaderType,
96-
lastChangeEventTime: nil,
9792
logger: verifier.logger,
9893
namespaces: verifier.dstNamespaces,
99-
metaDBName: verifier.metaDBName,
100-
metaClient: verifier.metaClient,
94+
metaDB: verifier.metaClient.Database(verifier.metaDBName),
10195
watcherClient: verifier.dstClient,
102-
buildInfo: verifier.dstBuildInfo,
96+
buildInfo: *verifier.dstBuildInfo,
10397
changeStreamRunning: false,
10498
ChangeEventBatchChan: make(chan []ParsedEvent),
10599
ChangeStreamWritesOffTsChan: make(chan primitive.Timestamp),
106100
ChangeStreamErrChan: make(chan error),
107101
ChangeStreamDoneChan: make(chan struct{}),
108-
startAtTs: nil,
109102
}
110103
}
111104

@@ -156,12 +149,13 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
156149
// We need to retrieve the source namespaces if change events are from the destination.
157150
switch eventOrigin {
158151
case dstReaderType:
159-
if len(verifier.dstSrcNsMap) == 0 {
152+
if verifier.nsMap.Len() == 0 {
153+
// Namespace is not remapped. Source namespace is the same as the destination.
160154
srcDBName = changeEvent.Ns.DB
161155
srcCollName = changeEvent.Ns.Coll
162156
} else {
163157
dstNs := fmt.Sprintf("%s.%s", changeEvent.Ns.DB, changeEvent.Ns.Coll)
164-
srcNs, exist := verifier.dstSrcNsMap[dstNs]
158+
srcNs, exist := verifier.nsMap.GetSrcNamespace(dstNs)
165159
if !exist {
166160
return errors.Errorf("no source namespace corresponding to the destination namepsace %s", dstNs)
167161
}
@@ -214,7 +208,7 @@ func (csr *ChangeStreamReader) GetChangeStreamFilter() (pipeline mongo.Pipeline)
214208
if len(csr.namespaces) == 0 {
215209
pipeline = mongo.Pipeline{
216210
{{"$match", bson.D{
217-
{"ns.db", bson.D{{"$ne", csr.metaDBName}}},
211+
{"ns.db", bson.D{{"$ne", csr.metaDB.Name()}}},
218212
}}},
219213
}
220214
} else {
@@ -328,16 +322,17 @@ func (csr *ChangeStreamReader) iterateChangeStream(
328322
return ctx.Err()
329323

330324
// If the ChangeStreamEnderChan has a message, the user has indicated that
331-
// source and destination writes are ended. This means we should exit rather than continue
332-
// reading the change stream since there should be no more events.
325+
// source writes are ended and the migration tool is finished / committed.
326+
// This means we should exit rather than continue reading the change stream
327+
// since there should be no more events.
333328
case writesOffTs := <-csr.ChangeStreamWritesOffTsChan:
334329
csr.logger.Debug().
335330
Interface("writesOffTimestamp", writesOffTs).
336331
Msg("Change stream thread received writesOff timestamp. Finalizing change stream.")
337332

338333
gotwritesOffTimestamp = true
339334

340-
// Read all change events until the source / destination reports no events.
335+
// Read change events until the stream reaches the writesOffTs.
341336
// (i.e., the `getMore` call returns empty)
342337
for {
343338
var curTs primitive.Timestamp
@@ -543,7 +538,7 @@ func addTimestampToLogEvent(ts primitive.Timestamp, event *zerolog.Event) *zerol
543538
}
544539

545540
func (csr *ChangeStreamReader) getChangeStreamMetadataCollection() *mongo.Collection {
546-
return csr.metaClient.Database(csr.metaDBName).Collection(metadataChangeStreamCollectionName)
541+
return csr.metaDB.Collection(metadataChangeStreamCollectionName)
547542
}
548543

549544
func (csr *ChangeStreamReader) loadChangeStreamResumeToken(ctx context.Context) (bson.Raw, error) {

internal/verifier/change_stream_test.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,10 @@ func (suite *IntegrationTestSuite) TestCreateForbidden() {
463463
func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() {
464464
ctx := suite.Context()
465465

466-
db := suite.dstMongoClient.Database("dstDB")
466+
srcDBName := suite.DBNameForTest() + "_src"
467+
dstDBName := suite.DBNameForTest() + "_dst"
468+
469+
db := suite.dstMongoClient.Database(dstDBName)
467470
coll1 := db.Collection("dstColl1")
468471
coll2 := db.Collection("dstColl2")
469472

@@ -472,8 +475,8 @@ func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() {
472475
}
473476

474477
verifier := suite.BuildVerifier()
475-
verifier.SetSrcNamespaces([]string{"srcDB.srcColl1", "srcDB.srcColl2"})
476-
verifier.SetDstNamespaces([]string{"dstDB.dstColl1", "dstDB.dstColl2"})
478+
verifier.SetSrcNamespaces([]string{srcDBName + ".srcColl1", srcDBName + ".srcColl2"})
479+
verifier.SetDstNamespaces([]string{dstDBName + ".dstColl1", dstDBName + ".dstColl2"})
477480
verifier.SetNamespaceMap()
478481

479482
verifier.StartChangeEventHandler(ctx, verifier.dstChangeStreamReader)
@@ -508,14 +511,14 @@ func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() {
508511

509512
coll1RecheckCount, coll2RecheckCount := 0, 0
510513
for _, recheck := range rechecks {
511-
suite.Require().Equal("srcDB", recheck.PrimaryKey.DatabaseName)
512-
switch recheck.PrimaryKey.CollectionName {
514+
suite.Require().Equal(srcDBName, recheck.PrimaryKey.SrcDatabaseName)
515+
switch recheck.PrimaryKey.SrcCollectionName {
513516
case "srcColl1":
514517
coll1RecheckCount++
515518
case "srcColl2":
516519
coll2RecheckCount++
517520
default:
518-
suite.T().Fatalf("unknown collection name: %v", recheck.PrimaryKey.CollectionName)
521+
suite.T().Fatalf("unknown collection name: %v", recheck.PrimaryKey.SrcCollectionName)
519522
}
520523
}
521524
suite.Require().Equal(2, coll1RecheckCount)

internal/verifier/check.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,10 @@ func (verifier *Verifier) CheckWorker(ctx context.Context) error {
8787
select {
8888
case err := <-verifier.srcChangeStreamReader.ChangeStreamErrChan:
8989
cancel()
90-
return errors.Wrapf(err, "got an error from %s", verifier.srcChangeStreamReader)
90+
return errors.Wrapf(err, "%s failed", verifier.srcChangeStreamReader)
9191
case err := <-verifier.dstChangeStreamReader.ChangeStreamErrChan:
9292
cancel()
93-
return errors.Wrapf(err, "got an error from %s", verifier.dstChangeStreamReader)
93+
return errors.Wrapf(err, "%s failed", verifier.dstChangeStreamReader)
9494
case <-ctx.Done():
9595
cancel()
9696
return nil
@@ -170,16 +170,18 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
170170
verifier.phase = Idle
171171
}()
172172

173-
if verifier.srcChangeStreamReader.changeStreamRunning {
174-
verifier.logger.Debug().Msg("Check: Source change stream already running.")
175-
} else {
176-
verifier.logger.Debug().Msg("Source change stream not running; starting change stream")
173+
for _, csReader := range []*ChangeStreamReader{verifier.srcChangeStreamReader, verifier.dstChangeStreamReader} {
174+
if csReader.changeStreamRunning {
175+
verifier.logger.Debug().Msgf("Check: %s already running.", csReader)
176+
} else {
177+
verifier.logger.Debug().Msgf("%s not running; starting change stream", csReader)
177178

178-
err = verifier.srcChangeStreamReader.StartChangeStream(ctx)
179-
if err != nil {
180-
return errors.Wrap(err, "failed to start change stream on source")
179+
err = csReader.StartChangeStream(ctx)
180+
if err != nil {
181+
return errors.Wrapf(err, "failed to start %s", csReader)
182+
}
183+
verifier.StartChangeEventHandler(ctx, csReader)
181184
}
182-
verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader)
183185
}
184186

185187
if verifier.dstChangeStreamReader.changeStreamRunning {

internal/verifier/migration_verifier.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,7 @@ type Verifier struct {
128128

129129
srcNamespaces []string
130130
dstNamespaces []string
131-
srcDstNsMap map[string]string
132-
dstSrcNsMap map[string]string
131+
nsMap *NSMap
133132
metaDBName string
134133

135134
mux sync.RWMutex
@@ -271,7 +270,7 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
271270
return errors.Wrapf(err, "failed to fetch destination's cluster time")
272271
}
273272

274-
// This has to happen under the lock because the change stream
273+
// This has to happen outside the lock because the change stream
275274
// might be inserting docs into the recheck queue, which happens
276275
// under the lock.
277276
select {
@@ -391,15 +390,8 @@ func (verifier *Verifier) SetDstNamespaces(arg []string) {
391390
}
392391

393392
func (verifier *Verifier) SetNamespaceMap() {
394-
verifier.srcDstNsMap = make(map[string]string)
395-
verifier.dstSrcNsMap = make(map[string]string)
396-
if len(verifier.dstNamespaces) == 0 {
397-
return
398-
}
399-
for i, ns := range verifier.srcNamespaces {
400-
verifier.srcDstNsMap[ns] = verifier.dstNamespaces[i]
401-
verifier.dstSrcNsMap[verifier.dstNamespaces[i]] = ns
402-
}
393+
verifier.nsMap = NewNSMap()
394+
verifier.nsMap.PopulateWithNamespaces(verifier.srcNamespaces, verifier.dstNamespaces)
403395
}
404396

405397
func (verifier *Verifier) SetMetaDBName(arg string) {

internal/verifier/nsmap.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package verifier
2+
3+
type NSMap struct {
4+
srcDstNsMap map[string]string
5+
dstSrcNsMap map[string]string
6+
}
7+
8+
func NewNSMap() *NSMap {
9+
return &NSMap{
10+
srcDstNsMap: make(map[string]string),
11+
dstSrcNsMap: make(map[string]string),
12+
}
13+
}
14+
15+
func (nsmap *NSMap) PopulateWithNamespaces(srcNamespaces []string, dstNamespaces []string) {
16+
if len(srcNamespaces) != len(dstNamespaces) {
17+
panic("source and destination namespaces are not the same length")
18+
}
19+
20+
for i, srcNs := range srcNamespaces {
21+
dstNs := dstNamespaces[i]
22+
if _, exist := nsmap.srcDstNsMap[srcNs]; exist {
23+
panic("another mapping already exists for source namespace " + srcNs)
24+
}
25+
if _, exist := nsmap.dstSrcNsMap[dstNs]; exist {
26+
panic("another mapping already exists for destination namespace " + dstNs)
27+
}
28+
nsmap.srcDstNsMap[srcNs] = dstNs
29+
nsmap.dstSrcNsMap[dstNs] = srcNs
30+
}
31+
}
32+
33+
func (nsmap *NSMap) Len() int {
34+
if len(nsmap.srcDstNsMap) != len(nsmap.dstSrcNsMap) {
35+
panic("source and destination namespaces are not the same length")
36+
}
37+
38+
return len(nsmap.srcDstNsMap)
39+
}
40+
41+
func (nsmap *NSMap) GetDstNamespace(srcNamespace string) (string, bool) {
42+
ns, ok := nsmap.srcDstNsMap[srcNamespace]
43+
return ns, ok
44+
}
45+
46+
func (nsmap *NSMap) GetSrcNamespace(dstNamespace string) (string, bool) {
47+
ns, ok := nsmap.dstSrcNsMap[dstNamespace]
48+
return ns, ok
49+
}

internal/verifier/nsmap_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package verifier
2+
3+
import (
4+
"github.com/stretchr/testify/suite"
5+
"testing"
6+
)
7+
8+
type UnitTestSuite struct {
9+
suite.Suite
10+
}
11+
12+
func TestUnitTestSuite(t *testing.T) {
13+
ts := new(UnitTestSuite)
14+
suite.Run(t, ts)
15+
}
16+
17+
func (s *UnitTestSuite) Test_EmptyNsMap() {
18+
nsMap := NewNSMap()
19+
srcNamespaces := []string{"srcDB.A", "srcDB.B"}
20+
dstNamespaces := []string{"dstDB.B", "dstDB.A"}
21+
nsMap.PopulateWithNamespaces(srcNamespaces, dstNamespaces)
22+
s.Require().Equal(2, nsMap.Len())
23+
24+
_, ok := nsMap.GetDstNamespace("non-existent.coll")
25+
s.Require().False(ok)
26+
27+
for i, srcNs := range srcNamespaces {
28+
gotNs, ok := nsMap.GetDstNamespace(srcNs)
29+
s.Require().True(ok)
30+
s.Require().Equal(dstNamespaces[i], gotNs)
31+
}
32+
33+
for i, dstNs := range dstNamespaces {
34+
gotNs, ok := nsMap.GetSrcNamespace(dstNs)
35+
s.Require().True(ok)
36+
s.Require().Equal(srcNamespaces[i], gotNs)
37+
}
38+
}

internal/verifier/recheck.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,16 @@ const (
2424

2525
// RecheckPrimaryKey stores the implicit type of recheck to perform
2626
// Currently, we only handle document mismatches/change stream updates,
27-
// so DatabaseName, CollectionName, and DocumentID must always be specified.
27+
// so SrcDatabaseName, SrcCollectionName, and DocumentID must always be specified.
2828
//
2929
// NB: Order is important here so that, within a given generation,
3030
// sorting by _id will guarantee that all rechecks for a given
3131
// namespace appear consecutively.
32-
//
33-
// DatabaseName and CollectionName should be on the source.
3432
type RecheckPrimaryKey struct {
35-
Generation int `bson:"generation"`
36-
DatabaseName string `bson:"db"`
37-
CollectionName string `bson:"coll"`
38-
DocumentID interface{} `bson:"docID"`
33+
Generation int `bson:"generation"`
34+
SrcDatabaseName string `bson:"db"`
35+
SrcCollectionName string `bson:"coll"`
36+
DocumentID interface{} `bson:"docID"`
3937
}
4038

4139
// RecheckDoc stores the necessary information to know which documents must be rechecked.
@@ -109,10 +107,10 @@ func (verifier *Verifier) insertRecheckDocs(
109107
models := make([]mongo.WriteModel, len(curThreadIndexes))
110108
for m, i := range curThreadIndexes {
111109
pk := RecheckPrimaryKey{
112-
Generation: generation,
113-
DatabaseName: dbNames[i],
114-
CollectionName: collNames[i],
115-
DocumentID: documentIDs[i],
110+
Generation: generation,
111+
SrcDatabaseName: dbNames[i],
112+
SrcCollectionName: collNames[i],
113+
DocumentID: documentIDs[i],
116114
}
117115

118116
// The filter must exclude DataSize; otherwise, if a failed comparison
@@ -302,8 +300,8 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
302300
// - the buffered document IDs’ size exceeds the per-task maximum
303301
// - the buffered documents exceed the partition size
304302
//
305-
if doc.PrimaryKey.DatabaseName != prevDBName ||
306-
doc.PrimaryKey.CollectionName != prevCollName ||
303+
if doc.PrimaryKey.SrcDatabaseName != prevDBName ||
304+
doc.PrimaryKey.SrcCollectionName != prevCollName ||
307305
int64(len(idAccum)) > maxDocsPerTask ||
308306
idLenAccum >= maxIdsPerRecheckTask ||
309307
dataSizeAccum >= verifier.partitionSizeInBytes {
@@ -313,8 +311,8 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
313311
return err
314312
}
315313

316-
prevDBName = doc.PrimaryKey.DatabaseName
317-
prevCollName = doc.PrimaryKey.CollectionName
314+
prevDBName = doc.PrimaryKey.SrcDatabaseName
315+
prevCollName = doc.PrimaryKey.SrcCollectionName
318316
idLenAccum = 0
319317
dataSizeAccum = 0
320318
idAccum = idAccum[:0]

0 commit comments

Comments
 (0)