Skip to content

Commit e3f36d9

Browse files
committed
Merge pull request #1960 from karalabe/fix-peer-ignore-list
eth/downloader: fix dysfunctional ignore list hidden by generic set
2 parents e165c2d + b658a73 commit e3f36d9

File tree

2 files changed

+46
-11
lines changed

2 files changed

+46
-11
lines changed

eth/downloader/peer.go

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ import (
2828
"time"
2929

3030
"github.com/ethereum/go-ethereum/common"
31-
"gopkg.in/fatih/set.v0"
3231
)
3332

33+
// Maximum number of entries allowed on the list or lacking items.
34+
const maxLackingHashes = 4096
35+
3436
// Hash and block fetchers belonging to eth/61 and below
3537
type relativeHashFetcherFn func(common.Hash) error
3638
type absoluteHashFetcherFn func(uint64, int) error
@@ -67,7 +69,8 @@ type peer struct {
6769
receiptStarted time.Time // Time instance when the last receipt fetch was started
6870
stateStarted time.Time // Time instance when the last node data fetch was started
6971

70-
ignored *set.Set // Set of hashes not to request (didn't have previously)
72+
lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
73+
lackingLock sync.RWMutex // Lock protecting the lacking hashes list
7174

7275
getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash
7376
getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position
@@ -95,7 +98,7 @@ func newPeer(id string, version int, head common.Hash,
9598
blockCapacity: 1,
9699
receiptCapacity: 1,
97100
stateCapacity: 1,
98-
ignored: set.New(),
101+
lacking: make(map[common.Hash]struct{}),
99102

100103
getRelHashes: getRelHashes,
101104
getAbsHashes: getAbsHashes,
@@ -119,7 +122,10 @@ func (p *peer) Reset() {
119122
atomic.StoreInt32(&p.blockCapacity, 1)
120123
atomic.StoreInt32(&p.receiptCapacity, 1)
121124
atomic.StoreInt32(&p.stateCapacity, 1)
122-
p.ignored.Clear()
125+
126+
p.lackingLock.Lock()
127+
p.lacking = make(map[common.Hash]struct{})
128+
p.lackingLock.Unlock()
123129
}
124130

125131
// Fetch61 sends a block retrieval request to the remote peer.
@@ -305,13 +311,42 @@ func (p *peer) Demote() {
305311
}
306312
}
307313

314+
// MarkLacking appends a new entity to the set of items (blocks, receipts, states)
315+
// that a peer is known not to have (i.e. have been requested before). If the
316+
// set reaches its maximum allowed capacity, items are randomly dropped off.
317+
func (p *peer) MarkLacking(hash common.Hash) {
318+
p.lackingLock.Lock()
319+
defer p.lackingLock.Unlock()
320+
321+
for len(p.lacking) >= maxLackingHashes {
322+
for drop, _ := range p.lacking {
323+
delete(p.lacking, drop)
324+
break
325+
}
326+
}
327+
p.lacking[hash] = struct{}{}
328+
}
329+
330+
// Lacks retrieves whether the hash of a blockchain item is on the peers lacking
331+
// list (i.e. whether we know that the peer does not have it).
332+
func (p *peer) Lacks(hash common.Hash) bool {
333+
p.lackingLock.RLock()
334+
defer p.lackingLock.RUnlock()
335+
336+
_, ok := p.lacking[hash]
337+
return ok
338+
}
339+
308340
// String implements fmt.Stringer.
309341
func (p *peer) String() string {
342+
p.lackingLock.RLock()
343+
defer p.lackingLock.RUnlock()
344+
310345
return fmt.Sprintf("Peer %s [%s]", p.id,
311346
fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+
312347
fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+
313348
fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+
314-
fmt.Sprintf("ignored %4d", p.ignored.Size()),
349+
fmt.Sprintf("lacking %4d", len(p.lacking)),
315350
)
316351
}
317352

eth/downloader/queue.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGe
501501

502502
for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ {
503503
hash, priority := taskQueue.Pop()
504-
if p.ignored.Has(hash) {
504+
if p.Lacks(hash.(common.Hash)) {
505505
skip[hash.(common.Hash)] = int(priority)
506506
} else {
507507
send[hash.(common.Hash)] = int(priority)
@@ -607,7 +607,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
607607
continue
608608
}
609609
// Otherwise unless the peer is known not to have the data, add to the retrieve list
610-
if p.ignored.Has(header.Hash()) {
610+
if p.Lacks(header.Hash()) {
611611
skip = append(skip, header)
612612
} else {
613613
send = append(send, header)
@@ -781,7 +781,7 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
781781
// If no blocks were retrieved, mark them as unavailable for the origin peer
782782
if len(blocks) == 0 {
783783
for hash, _ := range request.Hashes {
784-
request.Peer.ignored.Add(hash)
784+
request.Peer.MarkLacking(hash)
785785
}
786786
}
787787
// Iterate over the downloaded blocks and add each of them
@@ -877,8 +877,8 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
877877

878878
// If no data items were retrieved, mark them as unavailable for the origin peer
879879
if results == 0 {
880-
for hash, _ := range request.Headers {
881-
request.Peer.ignored.Add(hash)
880+
for _, header := range request.Headers {
881+
request.Peer.MarkLacking(header.Hash())
882882
}
883883
}
884884
// Assemble each of the results with their headers and retrieved data parts
@@ -944,7 +944,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
944944
// If no data was retrieved, mark their hashes as unavailable for the origin peer
945945
if len(data) == 0 {
946946
for hash, _ := range request.Hashes {
947-
request.Peer.ignored.Add(hash)
947+
request.Peer.MarkLacking(hash)
948948
}
949949
}
950950
// Iterate over the downloaded data and verify each of them

0 commit comments

Comments
 (0)