Skip to content

Commit 7e1302d

Browse files
committed
Read documents in parallel.
This makes compareDocsFromChannels() use an errgroup to read its document-reader channels in parallel.
1 parent 2c7fc0c commit 7e1302d

File tree

2 files changed

+77
-51
lines changed

2 files changed

+77
-51
lines changed

internal/verifier/compare.go

Lines changed: 75 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ func (verifier *Verifier) compareDocsFromChannels(
6464
error,
6565
) {
6666
results := []VerificationResult{}
67-
var docCount types.DocumentCount
68-
var byteCount types.ByteCount
67+
var srcDocCount types.DocumentCount
68+
var srcByteCount types.ByteCount
6969

7070
mapKeyFieldNames := make([]string, 1+len(task.QueryFilter.ShardKeys))
7171
mapKeyFieldNames[0] = "_id"
@@ -89,6 +89,9 @@ func (verifier *Verifier) compareDocsFromChannels(
8989
if isSrc {
9090
ourMap = srcCache
9191
theirMap = dstCache
92+
93+
srcDocCount++
94+
srcByteCount += types.ByteCount(len(doc))
9295
} else {
9396
ourMap = dstCache
9497
theirMap = srcCache
@@ -146,63 +149,86 @@ func (verifier *Verifier) compareDocsFromChannels(
146149
// lags the other significantly, we won’t keep caching the faster side’s
147150
// documents and thus consume more & more memory.
148151
for !srcClosed || !dstClosed {
152+
simpleTimerReset(readTimer, readTimeout)
153+
154+
var srcDoc, dstDoc bson.Raw
155+
156+
eg, egCtx := errgroup.WithContext(ctx)
157+
149158
if !srcClosed {
150-
simpleTimerReset(readTimer, readTimeout)
151-
152-
select {
153-
case <-ctx.Done():
154-
return nil, 0, 0, ctx.Err()
155-
case <-readTimer.C:
156-
return nil, 0, 0, errors.Errorf(
157-
"failed to read from source after %s",
158-
readTimeout,
159-
)
160-
case doc, alive := <-srcChannel:
161-
if !alive {
162-
srcClosed = true
163-
break
159+
eg.Go(func() error {
160+
var alive bool
161+
select {
162+
case <-egCtx.Done():
163+
return egCtx.Err()
164+
case <-readTimer.C:
165+
return errors.Errorf(
166+
"failed to read from source after %s",
167+
readTimeout,
168+
)
169+
case srcDoc, alive = <-srcChannel:
170+
if !alive {
171+
srcClosed = true
172+
break
173+
}
164174
}
165-
docCount++
166-
byteCount += types.ByteCount(len(doc))
167175

168-
err := handleNewDoc(doc, true)
176+
return nil
177+
})
178+
}
169179

170-
if err != nil {
171-
return nil, 0, 0, errors.Wrapf(
172-
err,
173-
"comparer thread failed to handle source doc with ID %v",
174-
doc.Lookup("_id"),
180+
if !dstClosed {
181+
eg.Go(func() error {
182+
var alive bool
183+
select {
184+
case <-egCtx.Done():
185+
return egCtx.Err()
186+
case <-readTimer.C:
187+
return errors.Errorf(
188+
"failed to read from destination after %s",
189+
readTimeout,
175190
)
191+
case dstDoc, alive = <-dstChannel:
192+
if !alive {
193+
dstClosed = true
194+
break
195+
}
176196
}
177-
}
197+
198+
return nil
199+
})
178200
}
179201

180-
if !dstClosed {
181-
simpleTimerReset(readTimer, readTimeout)
182-
183-
select {
184-
case <-ctx.Done():
185-
return nil, 0, 0, ctx.Err()
186-
case <-readTimer.C:
187-
return nil, 0, 0, errors.Errorf(
188-
"failed to read from destination after %s",
189-
readTimeout,
202+
if err := eg.Wait(); err != nil {
203+
return nil, 0, 0, errors.Wrap(
204+
err,
205+
"failed to read documents",
206+
)
207+
}
208+
209+
if srcDoc != nil {
210+
err := handleNewDoc(srcDoc, true)
211+
212+
if err != nil {
213+
return nil, 0, 0, errors.Wrapf(
214+
err,
215+
"comparer thread failed to handle task %s's source doc with ID %v",
216+
task.PrimaryKey,
217+
srcDoc.Lookup("_id"),
190218
)
191-
case doc, alive := <-dstChannel:
192-
if !alive {
193-
dstClosed = true
194-
break
195-
}
219+
}
220+
}
196221

197-
err := handleNewDoc(doc, false)
222+
if dstDoc != nil {
223+
err := handleNewDoc(dstDoc, false)
198224

199-
if err != nil {
200-
return nil, 0, 0, errors.Wrapf(
201-
err,
202-
"comparer thread failed to handle destination doc with ID %v",
203-
doc.Lookup("_id"),
204-
)
205-
}
225+
if err != nil {
226+
return nil, 0, 0, errors.Wrapf(
227+
err,
228+
"comparer thread failed to handle task %s's destination doc with ID %v",
229+
task.PrimaryKey,
230+
dstDoc.Lookup("_id"),
231+
)
206232
}
207233
}
208234
}
@@ -243,7 +269,7 @@ func (verifier *Verifier) compareDocsFromChannels(
243269
)
244270
}
245271

246-
return results, docCount, byteCount, nil
272+
return results, srcDocCount, srcByteCount, nil
247273
}
248274

249275
func simpleTimerReset(t *time.Timer, dur time.Duration) {

internal/verifier/migration_verifier_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ func TestVerifierCompareDocs(t *testing.T) {
574574
{{"_id", id}, {"sharded", 123}},
575575
},
576576
compareFn: func(t *testing.T, mismatchedIds []VerificationResult) {
577-
assert.Empty(t, mismatchedIds)
577+
assert.Empty(t, mismatchedIds, "should be no problems")
578578
},
579579
},
580580
}
@@ -1491,7 +1491,7 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() {
14911491
status := waitForTasks()
14921492
suite.Require().Greater(status.CompletedTasks, 1)
14931493
suite.Require().Greater(status.TotalTasks, 1)
1494-
suite.Require().Equal(status.FailedTasks, 0)
1494+
suite.Require().Zero(status.FailedTasks, "there should be no failed tasks")
14951495

14961496
// Insert another document that is not in the filter.
14971497
// This should trigger a recheck despite being outside the filter.

0 commit comments

Comments
 (0)