@@ -12,7 +12,7 @@ import (
12
12
)
13
13
14
14
const (
15
- blockCacheLimit = 4096 // Maximum number of blocks to cache before throttling the download
15
+ blockCacheLimit = 1024 // Maximum number of blocks to cache before throttling the download
16
16
)
17
17
18
18
// fetchRequest is a currently running block retrieval operation.
@@ -28,8 +28,7 @@ type queue struct {
28
28
hashQueue * prque.Prque // Priority queue of the block hashes to fetch
29
29
hashCounter int // Counter indexing the added hashes to ensure retrieval order
30
30
31
- pendPool map [string ]* fetchRequest // Currently pending block retrieval operations
32
- pendCount int // Number of pending block fetches (to throttle the download)
31
+ pendPool map [string ]* fetchRequest // Currently pending block retrieval operations
33
32
34
33
blockPool map [common.Hash ]int // Hash-set of the downloaded data blocks, mapping to cache indexes
35
34
blockCache []* types.Block // Downloaded but not yet delivered blocks
@@ -58,7 +57,6 @@ func (q *queue) Reset() {
58
57
q .hashCounter = 0
59
58
60
59
q .pendPool = make (map [string ]* fetchRequest )
61
- q .pendCount = 0
62
60
63
61
q .blockPool = make (map [common.Hash ]int )
64
62
q .blockOffset = 0
@@ -106,7 +104,13 @@ func (q *queue) Throttle() bool {
106
104
q .lock .RLock ()
107
105
defer q .lock .RUnlock ()
108
106
109
- return q .pendCount >= len (q .blockCache )- len (q .blockPool )
107
+ // Calculate the currently in-flight block requests
108
+ pending := 0
109
+ for _ , request := range q .pendPool {
110
+ pending += len (request .Hashes )
111
+ }
112
+ // Throttle if more blocks are in-flight than free space in the cache
113
+ return pending >= len (q .blockCache )- len (q .blockPool )
110
114
}
111
115
112
116
// Has checks if a hash is within the download queue or not.
@@ -206,10 +210,14 @@ func (q *queue) Reserve(p *peer, max int) *fetchRequest {
206
210
q .lock .Lock ()
207
211
defer q .lock .Unlock ()
208
212
209
- // Short circuit if the pool has been depleted
213
+ // Short circuit if the pool has been depleted, or if the peer's already
214
+ // downloading something (sanity check not to corrupt state)
210
215
if q .hashQueue .Empty () {
211
216
return nil
212
217
}
218
+ if _ , ok := q .pendPool [p .id ]; ok {
219
+ return nil
220
+ }
213
221
// Retrieve a batch of hashes, skipping previously failed ones
214
222
send := make (map [common.Hash ]int )
215
223
skip := make (map [common.Hash ]int )
@@ -236,7 +244,6 @@ func (q *queue) Reserve(p *peer, max int) *fetchRequest {
236
244
Time : time .Now (),
237
245
}
238
246
q .pendPool [p .id ] = request
239
- q .pendCount += len (request .Hashes )
240
247
241
248
return request
242
249
}
@@ -250,7 +257,6 @@ func (q *queue) Cancel(request *fetchRequest) {
250
257
q .hashQueue .Push (hash , float32 (index ))
251
258
}
252
259
delete (q .pendPool , request .Peer .id )
253
- q .pendCount -= len (request .Hashes )
254
260
}
255
261
256
262
// Expire checks for in flight requests that exceeded a timeout allowance,
@@ -266,7 +272,6 @@ func (q *queue) Expire(timeout time.Duration) []string {
266
272
for hash , index := range request .Hashes {
267
273
q .hashQueue .Push (hash , float32 (index ))
268
274
}
269
- q .pendCount -= len (request .Hashes )
270
275
peers = append (peers , id )
271
276
}
272
277
}
@@ -289,9 +294,6 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
289
294
}
290
295
delete (q .pendPool , id )
291
296
292
- // Mark all the hashes in the request as non-pending
293
- q .pendCount -= len (request .Hashes )
294
-
295
297
// If no blocks were retrieved, mark them as unavailable for the origin peer
296
298
if len (blocks ) == 0 {
297
299
for hash , _ := range request .Hashes {
0 commit comments