@@ -3,6 +3,7 @@ package verifier
33import (
44 "bytes"
55 "context"
6+ "time"
67
78 "github.com/10gen/migration-verifier/internal/types"
89 "github.com/pkg/errors"
@@ -12,6 +13,8 @@ import (
1213 "golang.org/x/sync/errgroup"
1314)
1415
16+ const readTimeout = 10 * time .Minute
17+
1518func (verifier * Verifier ) FetchAndCompareDocuments (
1619 givenCtx context.Context ,
1720 task * VerificationTask ,
@@ -25,9 +28,9 @@ func (verifier *Verifier) FetchAndCompareDocuments(
2528 // another to read from the destination, and a third one to receive the
2629 // docs from the other 2 threads and compare them. It’s done this way,
2730 // rather than fetch-everything-then-compare, to minimize memory usage.
28- errGroup , ctx := errgroup .WithContext (givenCtx )
31+ errGroup , groupCtx := errgroup .WithContext (givenCtx )
2932
30- srcChannel , dstChannel := verifier .getFetcherChannels (ctx , errGroup , task )
33+ srcChannel , dstChannel := verifier .getFetcherChannels (groupCtx , errGroup , task )
3134
3235 results := []VerificationResult {}
3336 var docCount types.DocumentCount
@@ -36,7 +39,7 @@ func (verifier *Verifier) FetchAndCompareDocuments(
3639 errGroup .Go (func () error {
3740 var err error
3841 results , docCount , byteCount , err = verifier .compareDocsFromChannels (
39- ctx ,
42+ groupCtx ,
4043 task ,
4144 srcChannel ,
4245 dstChannel ,
@@ -131,16 +134,29 @@ func (verifier *Verifier) compareDocsFromChannels(
131134 }
132135
133136 var srcClosed , dstClosed bool
134- var err error
137+
138+ readTimer := time .NewTimer (0 )
139+ defer func () {
140+ if ! readTimer .Stop () {
141+ <- readTimer .C
142+ }
143+ }()
135144
136145 // We always read src & dst back & forth. This ensures that, if one side
137146 // lags the other significantly, we won’t keep caching the faster side’s
138147 // documents and thus consume more & more memory.
139- for err == nil && ( ! srcClosed || ! dstClosed ) {
148+ for ! srcClosed || ! dstClosed {
140149 if ! srcClosed {
150+ simpleTimerReset (readTimer , readTimeout )
151+
141152 select {
142153 case <- ctx .Done ():
143154 return nil , 0 , 0 , ctx .Err ()
155+ case <- readTimer .C :
156+ return nil , 0 , 0 , errors .Errorf (
157+ "failed to read from source after %s" ,
158+ readTimeout ,
159+ )
144160 case doc , alive := <- srcChannel :
145161 if ! alive {
146162 srcClosed = true
@@ -149,10 +165,10 @@ func (verifier *Verifier) compareDocsFromChannels(
149165 docCount ++
150166 byteCount += types .ByteCount (len (doc ))
151167
152- err = handleNewDoc (doc , true )
168+ err : = handleNewDoc (doc , true )
153169
154170 if err != nil {
155- err = errors .Wrapf (
171+ return nil , 0 , 0 , errors .Wrapf (
156172 err ,
157173 "comparer thread failed to handle source doc with ID %v" ,
158174 doc .Lookup ("_id" ),
@@ -162,19 +178,26 @@ func (verifier *Verifier) compareDocsFromChannels(
162178 }
163179
164180 if ! dstClosed {
181+ simpleTimerReset (readTimer , readTimeout )
182+
165183 select {
166184 case <- ctx .Done ():
167185 return nil , 0 , 0 , ctx .Err ()
186+ case <- readTimer .C :
187+ return nil , 0 , 0 , errors .Errorf (
188+ "failed to read from destination after %s" ,
189+ readTimeout ,
190+ )
168191 case doc , alive := <- dstChannel :
169192 if ! alive {
170193 dstClosed = true
171194 break
172195 }
173196
174- err = handleNewDoc (doc , false )
197+ err : = handleNewDoc (doc , false )
175198
176199 if err != nil {
177- err = errors .Wrapf (
200+ return nil , 0 , 0 , errors .Wrapf (
178201 err ,
179202 "comparer thread failed to handle destination doc with ID %v" ,
180203 doc .Lookup ("_id" ),
@@ -184,10 +207,6 @@ func (verifier *Verifier) compareDocsFromChannels(
184207 }
185208 }
186209
187- if err != nil {
188- return nil , 0 , 0 , errors .Wrap (err , "comparer thread failed" )
189- }
190-
191210 // We got here because both srcChannel and dstChannel are closed,
192211 // which means we have processed all documents with the same mapKey
193212 // between source & destination.
@@ -227,6 +246,14 @@ func (verifier *Verifier) compareDocsFromChannels(
227246 return results , docCount , byteCount , nil
228247}
229248
249+ func simpleTimerReset (t * time.Timer , dur time.Duration ) {
250+ if ! t .Stop () {
251+ <- t .C
252+ }
253+
254+ t .Reset (dur )
255+ }
256+
230257func (verifier * Verifier ) getFetcherChannels (
231258 ctx context.Context ,
232259 errGroup * errgroup.Group ,
0 commit comments