Skip to content

Commit 396be9f

Browse files
committed
Add a timeout when reading cursor, & fail immediately.
1 parent c94bed3 commit 396be9f

File tree

1 file changed

+43
-13
lines changed

1 file changed

+43
-13
lines changed

internal/verifier/compare.go

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package verifier
33
import (
44
"bytes"
55
"context"
6+
"time"
67

78
"github.com/10gen/migration-verifier/internal/types"
89
"github.com/pkg/errors"
@@ -12,6 +13,8 @@ import (
1213
"golang.org/x/sync/errgroup"
1314
)
1415

16+
const readTimeout = 10 * time.Minute
17+
1518
func (verifier *Verifier) FetchAndCompareDocuments(
1619
givenCtx context.Context,
1720
task *VerificationTask,
@@ -25,9 +28,9 @@ func (verifier *Verifier) FetchAndCompareDocuments(
2528
// another to read from the destination, and a third one to receive the
2629
// docs from the other 2 threads and compare them. It’s done this way,
2730
// rather than fetch-everything-then-compare, to minimize memory usage.
28-
errGroup, ctx := errgroup.WithContext(givenCtx)
31+
errGroup, groupCtx := errgroup.WithContext(givenCtx)
2932

30-
srcChannel, dstChannel := verifier.getFetcherChannels(ctx, errGroup, task)
33+
srcChannel, dstChannel := verifier.getFetcherChannels(groupCtx, errGroup, task)
3134

3235
results := []VerificationResult{}
3336
var docCount types.DocumentCount
@@ -36,7 +39,7 @@ func (verifier *Verifier) FetchAndCompareDocuments(
3639
errGroup.Go(func() error {
3740
var err error
3841
results, docCount, byteCount, err = verifier.compareDocsFromChannels(
39-
ctx,
42+
groupCtx,
4043
task,
4144
srcChannel,
4245
dstChannel,
@@ -131,16 +134,29 @@ func (verifier *Verifier) compareDocsFromChannels(
131134
}
132135

133136
var srcClosed, dstClosed bool
134-
var err error
137+
138+
readTimer := time.NewTimer(0)
139+
defer func() {
140+
if !readTimer.Stop() {
141+
<-readTimer.C
142+
}
143+
}()
135144

136145
// We always read src & dst back & forth. This ensures that, if one side
137146
// lags the other significantly, we won’t keep caching the faster side’s
138147
// documents and thus consume more & more memory.
139-
for err == nil && (!srcClosed || !dstClosed) {
148+
for !srcClosed || !dstClosed {
140149
if !srcClosed {
150+
simpleTimerReset(readTimer, readTimeout)
151+
141152
select {
142153
case <-ctx.Done():
143154
return nil, 0, 0, ctx.Err()
155+
case <-readTimer.C:
156+
return nil, 0, 0, errors.Errorf(
157+
"failed to read from source after %s",
158+
readTimeout,
159+
)
144160
case doc, alive := <-srcChannel:
145161
if !alive {
146162
srcClosed = true
@@ -149,10 +165,10 @@ func (verifier *Verifier) compareDocsFromChannels(
149165
docCount++
150166
byteCount += types.ByteCount(len(doc))
151167

152-
err = handleNewDoc(doc, true)
168+
err := handleNewDoc(doc, true)
153169

154170
if err != nil {
155-
err = errors.Wrapf(
171+
return nil, 0, 0, errors.Wrapf(
156172
err,
157173
"comparer thread failed to handle source doc with ID %v",
158174
doc.Lookup("_id"),
@@ -162,19 +178,29 @@ func (verifier *Verifier) compareDocsFromChannels(
162178
}
163179

164180
if !dstClosed {
181+
simpleTimerReset(readTimer, readTimeout)
182+
165183
select {
166184
case <-ctx.Done():
167185
return nil, 0, 0, ctx.Err()
186+
case <-readTimer.C:
187+
return nil, 0, 0, errors.Errorf(
188+
"failed to read from destination after %s",
189+
readTimeout,
190+
)
168191
case doc, alive := <-dstChannel:
169192
if !alive {
193+
verifier.logger.Debug().
194+
Interface("task", task.PrimaryKey).
195+
Msg("Destination reader closed.")
170196
dstClosed = true
171197
break
172198
}
173199

174-
err = handleNewDoc(doc, false)
200+
err := handleNewDoc(doc, false)
175201

176202
if err != nil {
177-
err = errors.Wrapf(
203+
return nil, 0, 0, errors.Wrapf(
178204
err,
179205
"comparer thread failed to handle destination doc with ID %v",
180206
doc.Lookup("_id"),
@@ -184,10 +210,6 @@ func (verifier *Verifier) compareDocsFromChannels(
184210
}
185211
}
186212

187-
if err != nil {
188-
return nil, 0, 0, errors.Wrap(err, "comparer thread failed")
189-
}
190-
191213
// We got here because both srcChannel and dstChannel are closed,
192214
// which means we have processed all documents with the same mapKey
193215
// between source & destination.
@@ -227,6 +249,14 @@ func (verifier *Verifier) compareDocsFromChannels(
227249
return results, docCount, byteCount, nil
228250
}
229251

252+
func simpleTimerReset(t *time.Timer, dur time.Duration) {
253+
if !t.Stop() {
254+
<-t.C
255+
}
256+
257+
t.Reset(dur)
258+
}
259+
230260
func (verifier *Verifier) getFetcherChannels(
231261
ctx context.Context,
232262
errGroup *errgroup.Group,

0 commit comments

Comments
 (0)