Skip to content

Commit 316393b

Browse files
committed
Read documents in parallel.
This makes compareDocsFromChannels() use an errgroup to read its document-reader channels in parallel.
1 parent 2c7fc0c commit 316393b

File tree

2 files changed

+80
-52
lines changed

2 files changed

+80
-52
lines changed

internal/verifier/compare.go

Lines changed: 78 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ func (verifier *Verifier) compareDocsFromChannels(
6464
error,
6565
) {
6666
results := []VerificationResult{}
67-
var docCount types.DocumentCount
68-
var byteCount types.ByteCount
67+
var srcDocCount types.DocumentCount
68+
var srcByteCount types.ByteCount
6969

7070
mapKeyFieldNames := make([]string, 1+len(task.QueryFilter.ShardKeys))
7171
mapKeyFieldNames[0] = "_id"
@@ -142,67 +142,95 @@ func (verifier *Verifier) compareDocsFromChannels(
142142
}
143143
}()
144144

145-
// We always read src & dst back & forth. This ensures that, if one side
145+
// We always read src & dst together. This ensures that, if one side
146146
// lags the other significantly, we won’t keep caching the faster side’s
147147
// documents and thus consume more & more memory.
148148
for !srcClosed || !dstClosed {
149+
simpleTimerReset(readTimer, readTimeout)
150+
151+
var srcDoc, dstDoc bson.Raw
152+
153+
eg, egCtx := errgroup.WithContext(ctx)
154+
149155
if !srcClosed {
150-
simpleTimerReset(readTimer, readTimeout)
151-
152-
select {
153-
case <-ctx.Done():
154-
return nil, 0, 0, ctx.Err()
155-
case <-readTimer.C:
156-
return nil, 0, 0, errors.Errorf(
157-
"failed to read from source after %s",
158-
readTimeout,
159-
)
160-
case doc, alive := <-srcChannel:
161-
if !alive {
162-
srcClosed = true
163-
break
156+
eg.Go(func() error {
157+
var alive bool
158+
select {
159+
case <-egCtx.Done():
160+
return egCtx.Err()
161+
case <-readTimer.C:
162+
return errors.Errorf(
163+
"failed to read from source after %s",
164+
readTimeout,
165+
)
166+
case srcDoc, alive = <-srcChannel:
167+
if !alive {
168+
srcClosed = true
169+
break
170+
}
171+
172+
srcDocCount++
173+
srcByteCount += types.ByteCount(len(srcDoc))
164174
}
165-
docCount++
166-
byteCount += types.ByteCount(len(doc))
167175

168-
err := handleNewDoc(doc, true)
176+
return nil
177+
})
178+
}
169179

170-
if err != nil {
171-
return nil, 0, 0, errors.Wrapf(
172-
err,
173-
"comparer thread failed to handle source doc with ID %v",
174-
doc.Lookup("_id"),
180+
if !dstClosed {
181+
eg.Go(func() error {
182+
var alive bool
183+
select {
184+
case <-egCtx.Done():
185+
return egCtx.Err()
186+
case <-readTimer.C:
187+
return errors.Errorf(
188+
"failed to read from destination after %s",
189+
readTimeout,
175190
)
191+
case dstDoc, alive = <-dstChannel:
192+
if !alive {
193+
dstClosed = true
194+
break
195+
}
176196
}
177-
}
197+
198+
return nil
199+
})
178200
}
179201

180-
if !dstClosed {
181-
simpleTimerReset(readTimer, readTimeout)
182-
183-
select {
184-
case <-ctx.Done():
185-
return nil, 0, 0, ctx.Err()
186-
case <-readTimer.C:
187-
return nil, 0, 0, errors.Errorf(
188-
"failed to read from destination after %s",
189-
readTimeout,
202+
if err := eg.Wait(); err != nil {
203+
return nil, 0, 0, errors.Wrap(
204+
err,
205+
"failed to read documents",
206+
)
207+
}
208+
209+
if srcDoc != nil {
210+
err := handleNewDoc(srcDoc, true)
211+
212+
if err != nil {
213+
return nil, 0, 0, errors.Wrapf(
214+
err,
215+
"comparer thread failed to handle %#q's source doc (task: %s) with ID %v",
216+
namespace,
217+
task.PrimaryKey,
218+
srcDoc.Lookup("_id"),
190219
)
191-
case doc, alive := <-dstChannel:
192-
if !alive {
193-
dstClosed = true
194-
break
195-
}
220+
}
221+
}
196222

197-
err := handleNewDoc(doc, false)
223+
if dstDoc != nil {
224+
err := handleNewDoc(dstDoc, false)
198225

199-
if err != nil {
200-
return nil, 0, 0, errors.Wrapf(
201-
err,
202-
"comparer thread failed to handle destination doc with ID %v",
203-
doc.Lookup("_id"),
204-
)
205-
}
226+
if err != nil {
227+
return nil, 0, 0, errors.Wrapf(
228+
err,
229+
"comparer thread failed to handle %#q's destination doc (task: %s) with ID %v",
230+
namespace,
231+
task.PrimaryKey,
232+
dstDoc.Lookup("_id"),
233+
)
206234
}
207235
}
208236
}
@@ -243,7 +271,7 @@ func (verifier *Verifier) compareDocsFromChannels(
243271
)
244272
}
245273

246-
return results, docCount, byteCount, nil
274+
return results, srcDocCount, srcByteCount, nil
247275
}
248276

249277
func simpleTimerReset(t *time.Timer, dur time.Duration) {

internal/verifier/migration_verifier_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ func TestVerifierCompareDocs(t *testing.T) {
574574
{{"_id", id}, {"sharded", 123}},
575575
},
576576
compareFn: func(t *testing.T, mismatchedIds []VerificationResult) {
577-
assert.Empty(t, mismatchedIds)
577+
assert.Empty(t, mismatchedIds, "should be no problems")
578578
},
579579
},
580580
}
@@ -1491,7 +1491,7 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() {
14911491
status := waitForTasks()
14921492
suite.Require().Greater(status.CompletedTasks, 1)
14931493
suite.Require().Greater(status.TotalTasks, 1)
1494-
suite.Require().Equal(status.FailedTasks, 0)
1494+
suite.Require().Zero(status.FailedTasks, "there should be no failed tasks")
14951495

14961496
// Insert another document that is not in the filter.
14971497
// This should trigger a recheck despite being outside the filter.

0 commit comments

Comments
 (0)