Skip to content

Commit bae1a1a

Browse files
committed
generational rechecking
1 parent 39b962d commit bae1a1a

File tree

2 files changed

+46
-32
lines changed

2 files changed

+46
-32
lines changed

internal/verifier/compare.go

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ func (verifier *Verifier) compareDocsFromChannels(
6464
error,
6565
) {
6666
results := []VerificationResult{}
67-
var docCount types.DocumentCount
68-
var byteCount types.ByteCount
67+
var srcDocCount types.DocumentCount
68+
var srcByteCount types.ByteCount
6969

7070
mapKeyFieldNames := make([]string, 1+len(task.QueryFilter.ShardKeys))
7171
mapKeyFieldNames[0] = "_id"
@@ -89,6 +89,9 @@ func (verifier *Verifier) compareDocsFromChannels(
8989
if isSrc {
9090
ourMap = srcCache
9191
theirMap = dstCache
92+
93+
srcDocCount++
94+
srcByteCount += types.ByteCount(len(doc))
9295
} else {
9396
ourMap = dstCache
9497
theirMap = srcCache
@@ -148,8 +151,11 @@ func (verifier *Verifier) compareDocsFromChannels(
148151
for !srcClosed || !dstClosed {
149152
simpleTimerReset(readTimer, readTimeout)
150153

154+
var srcDoc, dstDoc bson.Raw
155+
151156
eg, egCtx := errgroup.WithContext(ctx)
152157
eg.Go(func() error {
158+
var alive bool
153159
select {
154160
case <-egCtx.Done():
155161
return egCtx.Err()
@@ -158,30 +164,19 @@ func (verifier *Verifier) compareDocsFromChannels(
158164
"failed to read from source after %s",
159165
readTimeout,
160166
)
161-
case doc, alive := <-srcChannel:
167+
case srcDoc, alive = <-srcChannel:
162168
if !alive {
163169
srcClosed = true
164170
break
165171
}
166-
docCount++
167-
byteCount += types.ByteCount(len(doc))
168-
169-
err := handleNewDoc(doc, true)
170-
171-
if err != nil {
172-
return errors.Wrapf(
173-
err,
174-
"comparer thread failed to handle source doc with ID %v",
175-
doc.Lookup("_id"),
176-
)
177-
}
178172
}
179173

180174
return nil
181175
})
182176

183177
if !dstClosed {
184178
eg.Go(func() error {
179+
var alive bool
185180
select {
186181
case <-egCtx.Done():
187182
return egCtx.Err()
@@ -190,21 +185,11 @@ func (verifier *Verifier) compareDocsFromChannels(
190185
"failed to read from destination after %s",
191186
readTimeout,
192187
)
193-
case doc, alive := <-dstChannel:
188+
case dstDoc, alive = <-dstChannel:
194189
if !alive {
195190
dstClosed = true
196191
break
197192
}
198-
199-
err := handleNewDoc(doc, false)
200-
201-
if err != nil {
202-
return errors.Wrapf(
203-
err,
204-
"comparer thread failed to handle destination doc with ID %v",
205-
doc.Lookup("_id"),
206-
)
207-
}
208193
}
209194

210195
return nil
@@ -214,9 +199,35 @@ func (verifier *Verifier) compareDocsFromChannels(
214199
if err := eg.Wait(); err != nil {
215200
return nil, 0, 0, errors.Wrap(
216201
err,
217-
"failed to compare documents",
202+
"failed to read documents",
218203
)
219204
}
205+
206+
if srcDoc != nil {
207+
err := handleNewDoc(srcDoc, true)
208+
209+
if err != nil {
210+
return nil, 0, 0, errors.Wrapf(
211+
err,
212+
"comparer thread failed to handle task %s's source doc with ID %v",
213+
task.PrimaryKey,
214+
srcDoc.Lookup("_id"),
215+
)
216+
}
217+
}
218+
219+
if dstDoc != nil {
220+
err := handleNewDoc(dstDoc, false)
221+
222+
if err != nil {
223+
return nil, 0, 0, errors.Wrapf(
224+
err,
225+
"comparer thread failed to handle task %s's destination doc with ID %v",
226+
task.PrimaryKey,
227+
dstDoc.Lookup("_id"),
228+
)
229+
}
230+
}
220231
}
221232

222233
// We got here because both srcChannel and dstChannel are closed,
@@ -255,7 +266,7 @@ func (verifier *Verifier) compareDocsFromChannels(
255266
)
256267
}
257268

258-
return results, docCount, byteCount, nil
269+
return results, srcDocCount, srcByteCount, nil
259270
}
260271

261272
func simpleTimerReset(t *time.Timer, dur time.Duration) {

internal/verifier/migration_verifier_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1281,16 +1281,19 @@ func (suite *IntegrationTestSuite) TestMetadataMismatchAndPartitioning() {
12811281
}
12821282

12831283
func (suite *IntegrationTestSuite) TestGenerationalRechecking() {
1284+
dbname1 := suite.DBNameForTest("1")
1285+
dbname2 := suite.DBNameForTest("2")
1286+
12841287
zerolog.SetGlobalLevel(zerolog.DebugLevel)
12851288
verifier := suite.BuildVerifier()
1286-
verifier.SetSrcNamespaces([]string{"testDb1.testColl1"})
1287-
verifier.SetDstNamespaces([]string{"testDb2.testColl3"})
1289+
verifier.SetSrcNamespaces([]string{dbname1 + ".testColl1"})
1290+
verifier.SetDstNamespaces([]string{dbname2 + ".testColl3"})
12881291
verifier.SetNamespaceMap()
12891292

12901293
ctx := suite.Context()
12911294

1292-
srcColl := suite.srcMongoClient.Database("testDb1").Collection("testColl1")
1293-
dstColl := suite.dstMongoClient.Database("testDb2").Collection("testColl3")
1295+
srcColl := suite.srcMongoClient.Database(dbname1).Collection("testColl1")
1296+
dstColl := suite.dstMongoClient.Database(dbname2).Collection("testColl3")
12941297
_, err := srcColl.InsertOne(ctx, bson.M{"_id": 1, "x": 42})
12951298
suite.Require().NoError(err)
12961299
_, err = srcColl.InsertOne(ctx, bson.M{"_id": 2, "x": 43})

0 commit comments

Comments
 (0)