@@ -20,6 +20,8 @@ const (
20
20
fetchTimeout = 5 * time .Second // Maximum alloted time to return an explicitly requested block
21
21
maxUncleDist = 7 // Maximum allowed backward distance from the chain head
22
22
maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
23
+ hashLimit = 256 // Maximum number of unique blocks a peer may have announced
24
+ blockLimit = 64 // Maximum number of unique blocks a per may have delivered
23
25
)
24
26
25
27
var (
@@ -74,12 +76,14 @@ type Fetcher struct {
74
76
quit chan struct {}
75
77
76
78
// Announce states
79
+ announces map [string ]int // Per peer announce counts to prevent memory exhaustion
77
80
announced map [common.Hash ][]* announce // Announced blocks, scheduled for fetching
78
81
fetching map [common.Hash ]* announce // Announced blocks, currently fetching
79
82
80
83
// Block cache
81
- queue * prque.Prque // Queue containing the import operations (block number sorted)
82
- queued map [common.Hash ]struct {} // Presence set of already queued blocks (to dedup imports)
84
+ queue * prque.Prque // Queue containing the import operations (block number sorted)
85
+ queues map [string ]int // Per peer block counts to prevent memory exhaustion
86
+ queued map [common.Hash ]* inject // Set of already queued blocks (to dedup imports)
83
87
84
88
// Callbacks
85
89
getBlock blockRetrievalFn // Retrieves a block from the local chain
@@ -88,6 +92,10 @@ type Fetcher struct {
88
92
chainHeight chainHeightFn // Retrieves the current chain's height
89
93
insertChain chainInsertFn // Injects a batch of blocks into the chain
90
94
dropPeer peerDropFn // Drops a peer for misbehaving
95
+
96
+ // Testing hooks
97
+ fetchingHook func ([]common.Hash ) // Method to call upon starting a block fetch
98
+ importedHook func (* types.Block ) // Method to call upon successful block import
91
99
}
92
100
93
101
// New creates a block fetcher to retrieve blocks based on hash announcements.
@@ -98,10 +106,12 @@ func New(getBlock blockRetrievalFn, validateBlock blockValidatorFn, broadcastBlo
98
106
filter : make (chan chan []* types.Block ),
99
107
done : make (chan common.Hash ),
100
108
quit : make (chan struct {}),
109
+ announces : make (map [string ]int ),
101
110
announced : make (map [common.Hash ][]* announce ),
102
111
fetching : make (map [common.Hash ]* announce ),
103
112
queue : prque .New (),
104
- queued : make (map [common.Hash ]struct {}),
113
+ queues : make (map [string ]int ),
114
+ queued : make (map [common.Hash ]* inject ),
105
115
getBlock : getBlock ,
106
116
validateBlock : validateBlock ,
107
117
broadcastBlock : broadcastBlock ,
@@ -189,23 +199,24 @@ func (f *Fetcher) loop() {
189
199
// Clean up any expired block fetches
190
200
for hash , announce := range f .fetching {
191
201
if time .Since (announce .time ) > fetchTimeout {
192
- delete (f .announced , hash )
193
- delete (f .fetching , hash )
202
+ f .forgetHash (hash )
194
203
}
195
204
}
196
205
// Import any queued blocks that could potentially fit
197
206
height := f .chainHeight ()
198
207
for ! f .queue .Empty () {
199
208
op := f .queue .PopItem ().(* inject )
200
- number := op .block .NumberU64 ()
201
209
202
210
// If too high up the chain or phase, continue later
211
+ number := op .block .NumberU64 ()
203
212
if number > height + 1 {
204
213
f .queue .Push (op , - float32 (op .block .NumberU64 ()))
205
214
break
206
215
}
207
216
// Otherwise if fresh and still unknown, try and import
208
- if number + maxUncleDist < height || f .getBlock (op .block .Hash ()) != nil {
217
+ hash := op .block .Hash ()
218
+ if number + maxUncleDist < height || f .getBlock (hash ) != nil {
219
+ f .forgetBlock (hash )
209
220
continue
210
221
}
211
222
f .insert (op .origin , op .block )
@@ -217,10 +228,17 @@ func (f *Fetcher) loop() {
217
228
return
218
229
219
230
case notification := <- f .notify :
220
- // A block was announced, schedule if it's not yet downloading
231
+ // A block was announced, make sure the peer isn't DOSing us
232
+ count := f .announces [notification .origin ] + 1
233
+ if count > hashLimit {
234
+ glog .V (logger .Debug ).Infof ("Peer %s: exceeded outstanding announces (%d)" , notification .origin , hashLimit )
235
+ break
236
+ }
237
+ // All is well, schedule the announce if block's not yet downloading
221
238
if _ , ok := f .fetching [notification .hash ]; ok {
222
239
break
223
240
}
241
+ f .announces [notification .origin ] = count
224
242
f .announced [notification .hash ] = append (f .announced [notification .hash ], notification )
225
243
if len (f .announced ) == 1 {
226
244
f .reschedule (fetch )
@@ -232,22 +250,24 @@ func (f *Fetcher) loop() {
232
250
233
251
case hash := <- f .done :
234
252
// A pending import finished, remove all traces of the notification
235
- delete (f .announced , hash )
236
- delete (f .fetching , hash )
237
- delete (f .queued , hash )
253
+ f .forgetHash (hash )
254
+ f .forgetBlock (hash )
238
255
239
256
case <- fetch .C :
240
257
// At least one block's timer ran out, check for needing retrieval
241
258
request := make (map [string ][]common.Hash )
242
259
243
260
for hash , announces := range f .announced {
244
261
if time .Since (announces [0 ].time ) > arriveTimeout - gatherSlack {
262
+ // Pick a random peer to retrieve from, reset all others
245
263
announce := announces [rand .Intn (len (announces ))]
264
+ f .forgetHash (hash )
265
+
266
+ // If the block still didn't arrive, queue for fetching
246
267
if f .getBlock (hash ) == nil {
247
268
request [announce .origin ] = append (request [announce .origin ], hash )
248
269
f .fetching [hash ] = announce
249
270
}
250
- delete (f .announced , hash )
251
271
}
252
272
}
253
273
// Send out all block requests
@@ -261,7 +281,14 @@ func (f *Fetcher) loop() {
261
281
262
282
glog .V (logger .Detail ).Infof ("Peer %s: fetching %s" , peer , list )
263
283
}
264
- go f .fetching [hashes [0 ]].fetch (hashes )
284
+ // Create a closure of the fetch and schedule in on a new thread
285
+ fetcher , hashes := f .fetching [hashes [0 ]].fetch , hashes
286
+ go func () {
287
+ if f .fetchingHook != nil {
288
+ f .fetchingHook (hashes )
289
+ }
290
+ fetcher (hashes )
291
+ }()
265
292
}
266
293
// Schedule the next fetch if blocks are still pending
267
294
f .reschedule (fetch )
@@ -285,7 +312,7 @@ func (f *Fetcher) loop() {
285
312
if f .getBlock (hash ) == nil {
286
313
explicit = append (explicit , block )
287
314
} else {
288
- delete ( f . fetching , hash )
315
+ f . forgetHash ( hash )
289
316
}
290
317
} else {
291
318
download = append (download , block )
@@ -328,15 +355,26 @@ func (f *Fetcher) reschedule(fetch *time.Timer) {
328
355
func (f * Fetcher ) enqueue (peer string , block * types.Block ) {
329
356
hash := block .Hash ()
330
357
358
+ // Ensure the peer isn't DOSing us
359
+ count := f .queues [peer ] + 1
360
+ if count > blockLimit {
361
+ glog .V (logger .Debug ).Infof ("Peer %s: discarded block #%d [%x], exceeded allowance (%d)" , peer , block .NumberU64 (), hash .Bytes ()[:4 ], blockLimit )
362
+ return
363
+ }
331
364
// Discard any past or too distant blocks
332
365
if dist := int64 (block .NumberU64 ()) - int64 (f .chainHeight ()); dist < - maxUncleDist || dist > maxQueueDist {
333
366
glog .V (logger .Debug ).Infof ("Peer %s: discarded block #%d [%x], distance %d" , peer , block .NumberU64 (), hash .Bytes ()[:4 ], dist )
334
367
return
335
368
}
336
369
// Schedule the block for future importing
337
370
if _ , ok := f .queued [hash ]; ! ok {
338
- f .queued [hash ] = struct {}{}
339
- f .queue .Push (& inject {origin : peer , block : block }, - float32 (block .NumberU64 ()))
371
+ op := & inject {
372
+ origin : peer ,
373
+ block : block ,
374
+ }
375
+ f .queues [peer ] = count
376
+ f .queued [hash ] = op
377
+ f .queue .Push (op , - float32 (block .NumberU64 ()))
340
378
341
379
if glog .V (logger .Debug ) {
342
380
glog .Infof ("Peer %s: queued block #%d [%x], total %v" , peer , block .NumberU64 (), hash .Bytes ()[:4 ], f .queue .Size ())
@@ -375,5 +413,44 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
375
413
}
376
414
// If import succeeded, broadcast the block
377
415
go f .broadcastBlock (block , false )
416
+
417
+ // Invoke the testing hook if needed
418
+ if f .importedHook != nil {
419
+ f .importedHook (block )
420
+ }
378
421
}()
379
422
}
423
+
424
+ // forgetHash removes all traces of a block announcement from the fetcher's
425
+ // internal state.
426
+ func (f * Fetcher ) forgetHash (hash common.Hash ) {
427
+ // Remove all pending announces and decrement DOS counters
428
+ for _ , announce := range f .announced [hash ] {
429
+ f .announces [announce .origin ]--
430
+ if f .announces [announce .origin ] == 0 {
431
+ delete (f .announces , announce .origin )
432
+ }
433
+ }
434
+ delete (f .announced , hash )
435
+
436
+ // Remove any pending fetches and decrement the DOS counters
437
+ if announce := f .fetching [hash ]; announce != nil {
438
+ f .announces [announce .origin ]--
439
+ if f .announces [announce .origin ] == 0 {
440
+ delete (f .announces , announce .origin )
441
+ }
442
+ delete (f .fetching , hash )
443
+ }
444
+ }
445
+
446
+ // forgetBlock removes all traces of a queued block frmo the fetcher's internal
447
+ // state.
448
+ func (f * Fetcher ) forgetBlock (hash common.Hash ) {
449
+ if insert := f .queued [hash ]; insert != nil {
450
+ f .queues [insert .origin ]--
451
+ if f .queues [insert .origin ] == 0 {
452
+ delete (f .queues , insert .origin )
453
+ }
454
+ delete (f .queued , hash )
455
+ }
456
+ }
0 commit comments