Skip to content

Commit f14d849

Browse files
committed
Read docs in parallel
1 parent bfc46eb commit f14d849

File tree

1 file changed

+42
-30
lines changed

1 file changed

+42
-30
lines changed

internal/verifier/compare.go

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -146,14 +146,15 @@ func (verifier *Verifier) compareDocsFromChannels(
146146
// lags the other significantly, we won’t keep caching the faster side’s
147147
// documents and thus consume more & more memory.
148148
for !srcClosed || !dstClosed {
149-
if !srcClosed {
150-
simpleTimerReset(readTimer, readTimeout)
149+
simpleTimerReset(readTimer, readTimeout)
151150

151+
eg, egCtx := errgroup.WithContext(ctx)
152+
eg.Go(func() error {
152153
select {
153-
case <-ctx.Done():
154-
return nil, 0, 0, ctx.Err()
154+
case <-egCtx.Done():
155+
return egCtx.Err()
155156
case <-readTimer.C:
156-
return nil, 0, 0, errors.Errorf(
157+
return errors.Errorf(
157158
"failed to read from source after %s",
158159
readTimeout,
159160
)
@@ -168,42 +169,53 @@ func (verifier *Verifier) compareDocsFromChannels(
168169
err := handleNewDoc(doc, true)
169170

170171
if err != nil {
171-
return nil, 0, 0, errors.Wrapf(
172+
return errors.Wrapf(
172173
err,
173174
"comparer thread failed to handle source doc with ID %v",
174175
doc.Lookup("_id"),
175176
)
176177
}
177178
}
178-
}
179179

180-
if !dstClosed {
181-
simpleTimerReset(readTimer, readTimeout)
180+
return nil
181+
})
182182

183-
select {
184-
case <-ctx.Done():
185-
return nil, 0, 0, ctx.Err()
186-
case <-readTimer.C:
187-
return nil, 0, 0, errors.Errorf(
188-
"failed to read from destination after %s",
189-
readTimeout,
190-
)
191-
case doc, alive := <-dstChannel:
192-
if !alive {
193-
dstClosed = true
194-
break
183+
if !dstClosed {
184+
eg.Go(func() error {
185+
select {
186+
case <-egCtx.Done():
187+
return egCtx.Err()
188+
case <-readTimer.C:
189+
return errors.Errorf(
190+
"failed to read from destination after %s",
191+
readTimeout,
192+
)
193+
case doc, alive := <-dstChannel:
194+
if !alive {
195+
dstClosed = true
196+
break
197+
}
198+
199+
err := handleNewDoc(doc, false)
200+
201+
if err != nil {
202+
return errors.Wrapf(
203+
err,
204+
"comparer thread failed to handle destination doc with ID %v",
205+
doc.Lookup("_id"),
206+
)
207+
}
195208
}
196209

197-
err := handleNewDoc(doc, false)
210+
return nil
211+
})
212+
}
198213

199-
if err != nil {
200-
return nil, 0, 0, errors.Wrapf(
201-
err,
202-
"comparer thread failed to handle destination doc with ID %v",
203-
doc.Lookup("_id"),
204-
)
205-
}
206-
}
214+
if err := eg.Wait(); err != nil {
215+
return nil, 0, 0, errors.Wrap(
216+
err,
217+
"failed to compare documents",
218+
)
207219
}
208220
}
209221

0 commit comments

Comments
 (0)