Skip to content

Commit 274f86c

Browse files
karalabeobscuren
authored andcommitted
eth/downloader: match capabilities when querying idle peers
1 parent b527c9c commit 274f86c

File tree

3 files changed

+53
-8
lines changed

3 files changed

+53
-8
lines changed

eth/downloader/downloader.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -816,7 +816,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
816816
}
817817
// Send a download request to all idle peers, until throttled
818818
throttled := false
819-
for _, peer := range d.peers.IdlePeers() {
819+
for _, peer := range d.peers.IdlePeers(eth61) {
820820
// Short circuit if throttling activated
821821
if d.queue.Throttle() {
822822
throttled = true
@@ -1255,7 +1255,7 @@ func (d *Downloader) fetchBodies(from uint64) error {
12551255
}
12561256
// Send a download request to all idle peers, until throttled
12571257
queuedEmptyBlocks, throttled := false, false
1258-
for _, peer := range d.peers.IdlePeers() {
1258+
for _, peer := range d.peers.IdlePeers(eth62) {
12591259
// Short circuit if throttling activated
12601260
if d.queue.Throttle() {
12611261
throttled = true

eth/downloader/downloader_test.go

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,17 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
205205
dl.lock.Lock()
206206
defer dl.lock.Unlock()
207207

208-
err := dl.downloader.RegisterPeer(id, version, hashes[0],
209-
dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay),
210-
dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay))
208+
var err error
209+
switch version {
210+
case 61:
211+
err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil)
212+
case 62:
213+
err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay))
214+
case 63:
215+
err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay))
216+
case 64:
217+
err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay))
218+
}
211219
if err == nil {
212220
// Assign the owned hashes and blocks to the peer (deep copy)
213221
dl.peerHashes[id] = make([]common.Hash, len(hashes))
@@ -618,6 +626,41 @@ func testMultiSynchronisation(t *testing.T, protocol int) {
618626
}
619627
}
620628

629+
// Tests that synchronisations behave well in multi-version protocol environments
630+
// and not wreak havok on other nodes in the network.
631+
func TestMultiProtocolSynchronisation61(t *testing.T) { testMultiProtocolSynchronisation(t, 61) }
632+
func TestMultiProtocolSynchronisation62(t *testing.T) { testMultiProtocolSynchronisation(t, 62) }
633+
func TestMultiProtocolSynchronisation63(t *testing.T) { testMultiProtocolSynchronisation(t, 63) }
634+
func TestMultiProtocolSynchronisation64(t *testing.T) { testMultiProtocolSynchronisation(t, 64) }
635+
636+
func testMultiProtocolSynchronisation(t *testing.T, protocol int) {
637+
// Create a small enough block chain to download
638+
targetBlocks := blockCacheLimit - 15
639+
hashes, blocks := makeChain(targetBlocks, 0, genesis)
640+
641+
// Create peers of every type
642+
tester := newTester()
643+
tester.newPeer("peer 61", 61, hashes, blocks)
644+
tester.newPeer("peer 62", 62, hashes, blocks)
645+
tester.newPeer("peer 63", 63, hashes, blocks)
646+
tester.newPeer("peer 64", 64, hashes, blocks)
647+
648+
// Synchronise with the requestd peer and make sure all blocks were retrieved
649+
if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil); err != nil {
650+
t.Fatalf("failed to synchronise blocks: %v", err)
651+
}
652+
if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
653+
t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
654+
}
655+
// Check that no peers have been dropped off
656+
for _, version := range []int{61, 62, 63, 64} {
657+
peer := fmt.Sprintf("peer %d", version)
658+
if _, ok := tester.peerHashes[peer]; !ok {
659+
t.Errorf("%s dropped", peer)
660+
}
661+
}
662+
}
663+
621664
// Tests that if a block is empty (i.e. header only), no body request should be
622665
// made, and instead the header should be assembled into a whole block in itself.
623666
func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) }

eth/downloader/peer.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -312,14 +312,16 @@ func (ps *peerSet) AllPeers() []*peer {
312312

313313
// IdlePeers retrieves a flat list of all the currently idle peers within the
314314
// active peer set, ordered by their reputation.
315-
func (ps *peerSet) IdlePeers() []*peer {
315+
func (ps *peerSet) IdlePeers(version int) []*peer {
316316
ps.lock.RLock()
317317
defer ps.lock.RUnlock()
318318

319319
list := make([]*peer, 0, len(ps.peers))
320320
for _, p := range ps.peers {
321-
if atomic.LoadInt32(&p.idle) == 0 {
322-
list = append(list, p)
321+
if (version == eth61 && p.version == eth61) || (version >= eth62 && p.version >= eth62) {
322+
if atomic.LoadInt32(&p.idle) == 0 {
323+
list = append(list, p)
324+
}
323325
}
324326
}
325327
for i := 0; i < len(list); i++ {

0 commit comments

Comments
 (0)