@@ -20,7 +20,8 @@ const (
20
20
fetchTimeout = 5 * time .Second // Maximum alloted time to return an explicitly requested block
21
21
maxUncleDist = 7 // Maximum allowed backward distance from the chain head
22
22
maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
23
- announceLimit = 256 // Maximum number of unique blocks a peer may have announced
23
+ hashLimit = 256 // Maximum number of unique blocks a peer may have announced
24
+ blockLimit = 64 // Maximum number of unique blocks a per may have delivered
24
25
)
25
26
26
27
var (
@@ -80,8 +81,9 @@ type Fetcher struct {
80
81
fetching map [common.Hash ]* announce // Announced blocks, currently fetching
81
82
82
83
// Block cache
83
- queue * prque.Prque // Queue containing the import operations (block number sorted)
84
- queued map [common.Hash ]struct {} // Presence set of already queued blocks (to dedup imports)
84
+ queue * prque.Prque // Queue containing the import operations (block number sorted)
85
+ queues map [string ]int // Per peer block counts to prevent memory exhaustion
86
+ queued map [common.Hash ]* inject // Set of already queued blocks (to dedup imports)
85
87
86
88
// Callbacks
87
89
getBlock blockRetrievalFn // Retrieves a block from the local chain
@@ -104,7 +106,8 @@ func New(getBlock blockRetrievalFn, validateBlock blockValidatorFn, broadcastBlo
104
106
announced : make (map [common.Hash ][]* announce ),
105
107
fetching : make (map [common.Hash ]* announce ),
106
108
queue : prque .New (),
107
- queued : make (map [common.Hash ]struct {}),
109
+ queues : make (map [string ]int ),
110
+ queued : make (map [common.Hash ]* inject ),
108
111
getBlock : getBlock ,
109
112
validateBlock : validateBlock ,
110
113
broadcastBlock : broadcastBlock ,
@@ -192,22 +195,24 @@ func (f *Fetcher) loop() {
192
195
// Clean up any expired block fetches
193
196
for hash , announce := range f .fetching {
194
197
if time .Since (announce .time ) > fetchTimeout {
195
- f .forgetBlock (hash )
198
+ f .forgetHash (hash )
196
199
}
197
200
}
198
201
// Import any queued blocks that could potentially fit
199
202
height := f .chainHeight ()
200
203
for ! f .queue .Empty () {
201
204
op := f .queue .PopItem ().(* inject )
202
- number := op .block .NumberU64 ()
203
205
204
206
// If too high up the chain or phase, continue later
207
+ number := op .block .NumberU64 ()
205
208
if number > height + 1 {
206
209
f .queue .Push (op , - float32 (op .block .NumberU64 ()))
207
210
break
208
211
}
209
212
// Otherwise if fresh and still unknown, try and import
210
- if number + maxUncleDist < height || f .getBlock (op .block .Hash ()) != nil {
213
+ hash := op .block .Hash ()
214
+ if number + maxUncleDist < height || f .getBlock (hash ) != nil {
215
+ f .forgetBlock (hash )
211
216
continue
212
217
}
213
218
f .insert (op .origin , op .block )
@@ -221,8 +226,8 @@ func (f *Fetcher) loop() {
221
226
case notification := <- f .notify :
222
227
// A block was announced, make sure the peer isn't DOSing us
223
228
count := f .announces [notification .origin ] + 1
224
- if count > announceLimit {
225
- glog .V (logger .Debug ).Infof ("Peer %s: exceeded outstanding announces (%d)" , notification .origin , announceLimit )
229
+ if count > hashLimit {
230
+ glog .V (logger .Debug ).Infof ("Peer %s: exceeded outstanding announces (%d)" , notification .origin , hashLimit )
226
231
break
227
232
}
228
233
// All is well, schedule the announce if block's not yet downloading
@@ -241,8 +246,8 @@ func (f *Fetcher) loop() {
241
246
242
247
case hash := <- f .done :
243
248
// A pending import finished, remove all traces of the notification
249
+ f .forgetHash (hash )
244
250
f .forgetBlock (hash )
245
- delete (f .queued , hash )
246
251
247
252
case <- fetch .C :
248
253
// At least one block's timer ran out, check for needing retrieval
@@ -252,7 +257,7 @@ func (f *Fetcher) loop() {
252
257
if time .Since (announces [0 ].time ) > arriveTimeout - gatherSlack {
253
258
// Pick a random peer to retrieve from, reset all others
254
259
announce := announces [rand .Intn (len (announces ))]
255
- f .forgetBlock (hash )
260
+ f .forgetHash (hash )
256
261
257
262
// If the block still didn't arrive, queue for fetching
258
263
if f .getBlock (hash ) == nil {
@@ -296,7 +301,7 @@ func (f *Fetcher) loop() {
296
301
if f .getBlock (hash ) == nil {
297
302
explicit = append (explicit , block )
298
303
} else {
299
- f .forgetBlock (hash )
304
+ f .forgetHash (hash )
300
305
}
301
306
} else {
302
307
download = append (download , block )
@@ -339,15 +344,26 @@ func (f *Fetcher) reschedule(fetch *time.Timer) {
339
344
func (f * Fetcher ) enqueue (peer string , block * types.Block ) {
340
345
hash := block .Hash ()
341
346
347
+ // Ensure the peer isn't DOSing us
348
+ count := f .queues [peer ] + 1
349
+ if count > blockLimit {
350
+ glog .V (logger .Debug ).Infof ("Peer %s: discarded block #%d [%x], exceeded allowance (%d)" , peer , block .NumberU64 (), hash .Bytes ()[:4 ], blockLimit )
351
+ return
352
+ }
342
353
// Discard any past or too distant blocks
343
354
if dist := int64 (block .NumberU64 ()) - int64 (f .chainHeight ()); dist < - maxUncleDist || dist > maxQueueDist {
344
355
glog .V (logger .Debug ).Infof ("Peer %s: discarded block #%d [%x], distance %d" , peer , block .NumberU64 (), hash .Bytes ()[:4 ], dist )
345
356
return
346
357
}
347
358
// Schedule the block for future importing
348
359
if _ , ok := f .queued [hash ]; ! ok {
349
- f .queued [hash ] = struct {}{}
350
- f .queue .Push (& inject {origin : peer , block : block }, - float32 (block .NumberU64 ()))
360
+ op := & inject {
361
+ origin : peer ,
362
+ block : block ,
363
+ }
364
+ f .queues [peer ] = count
365
+ f .queued [hash ] = op
366
+ f .queue .Push (op , - float32 (block .NumberU64 ()))
351
367
352
368
if glog .V (logger .Debug ) {
353
369
glog .Infof ("Peer %s: queued block #%d [%x], total %v" , peer , block .NumberU64 (), hash .Bytes ()[:4 ], f .queue .Size ())
@@ -389,8 +405,9 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
389
405
}()
390
406
}
391
407
392
- // forgetBlock removes all traces of a block from the fetcher's internal state.
393
- func (f * Fetcher ) forgetBlock (hash common.Hash ) {
408
+ // forgetHash removes all traces of a block announcement from the fetcher's
409
+ // internal state.
410
+ func (f * Fetcher ) forgetHash (hash common.Hash ) {
394
411
// Remove all pending announces and decrement DOS counters
395
412
for _ , announce := range f .announced [hash ] {
396
413
f .announces [announce .origin ]--
@@ -409,3 +426,15 @@ func (f *Fetcher) forgetBlock(hash common.Hash) {
409
426
delete (f .fetching , hash )
410
427
}
411
428
}
429
+
430
+ // forgetBlock removes all traces of a queued block frmo the fetcher's internal
431
+ // state.
432
+ func (f * Fetcher ) forgetBlock (hash common.Hash ) {
433
+ if insert := f .queued [hash ]; insert != nil {
434
+ f .queues [insert .origin ]--
435
+ if f .queues [insert .origin ] == 0 {
436
+ delete (f .queues , insert .origin )
437
+ }
438
+ delete (f .queued , hash )
439
+ }
440
+ }
0 commit comments