Skip to content

Commit 48709d5

Browse files
karalabeobscuren
authored andcommitted
[release/1.4.11] eth, eth/downloader: better remote head tracking
(cherry picked from commit 1dd2720) Conflicts: eth/handler.go eth/sync.go
1 parent 65da8f6 commit 48709d5

File tree

6 files changed

+62
-51
lines changed

6 files changed

+62
-51
lines changed

eth/downloader/downloader.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,12 +236,12 @@ func (d *Downloader) Synchronising() bool {
236236

237237
// RegisterPeer injects a new download peer into the set of block source to be
238238
// used for fetching hashes and blocks from.
239-
func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
239+
func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHeadRetrievalFn,
240240
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
241241
getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
242242

243243
glog.V(logger.Detail).Infoln("Registering peer", id)
244-
if err := d.peers.Register(newPeer(id, version, head, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
244+
if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
245245
glog.V(logger.Error).Infoln("Register failed:", err)
246246
return err
247247
}
@@ -501,7 +501,8 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
501501
glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
502502

503503
// Request the advertised remote head block and wait for the response
504-
go p.getRelHeaders(p.head, 1, 0, false)
504+
head, _ := p.currentHead()
505+
go p.getRelHeaders(head, 1, 0, false)
505506

506507
timeout := time.After(d.requestTTL())
507508
for {

eth/downloader/downloader_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -400,11 +400,11 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
400400
var err error
401401
switch version {
402402
case 62:
403-
err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil)
403+
err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil)
404404
case 63:
405-
err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
405+
err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
406406
case 64:
407-
err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
407+
err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
408408
}
409409
if err == nil {
410410
// Assign the owned hashes, headers and blocks to the peer (deep copy)
@@ -463,6 +463,17 @@ func (dl *downloadTester) dropPeer(id string) {
463463
dl.downloader.UnregisterPeer(id)
464464
}
465465

466+
// peerCurrentHeadFn constructs a function to retrieve a peer's current head hash
467+
// and total difficulty.
468+
func (dl *downloadTester) peerCurrentHeadFn(id string) func() (common.Hash, *big.Int) {
469+
return func() (common.Hash, *big.Int) {
470+
dl.lock.RLock()
471+
defer dl.lock.RUnlock()
472+
473+
return dl.peerHashes[id][0], nil
474+
}
475+
}
476+
466477
// peerGetRelHeadersFn constructs a GetBlockHeaders function based on a hashed
467478
// origin; associated with a particular peer in the download tester. The returned
468479
// function can be used to retrieve batches of headers from the particular peer.

eth/downloader/peer.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"errors"
2424
"fmt"
2525
"math"
26+
"math/big"
2627
"sort"
2728
"strings"
2829
"sync"
@@ -37,6 +38,9 @@ const (
3738
measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value.
3839
)
3940

41+
// Head hash and total difficulty retriever for
42+
type currentHeadRetrievalFn func() (common.Hash, *big.Int)
43+
4044
// Block header and body fetchers belonging to eth/62 and above
4145
type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
4246
type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
@@ -52,8 +56,7 @@ var (
5256

5357
// peer represents an active peer from which hashes and blocks are retrieved.
5458
type peer struct {
55-
id string // Unique identifier of the peer
56-
head common.Hash // Hash of the peers latest known block
59+
id string // Unique identifier of the peer
5760

5861
headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1)
5962
blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
@@ -74,6 +77,8 @@ type peer struct {
7477

7578
lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
7679

80+
currentHead currentHeadRetrievalFn // Method to fetch the currently known head of the peer
81+
7782
getRelHeaders relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash
7883
getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position
7984
getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
@@ -87,14 +92,14 @@ type peer struct {
8792

8893
// newPeer create a new downloader peer, with specific hash and block retrieval
8994
// mechanisms.
90-
func newPeer(id string, version int, head common.Hash,
95+
func newPeer(id string, version int, currentHead currentHeadRetrievalFn,
9196
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
9297
getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer {
9398
return &peer{
9499
id: id,
95-
head: head,
96100
lacking: make(map[common.Hash]struct{}),
97101

102+
currentHead: currentHead,
98103
getRelHeaders: getRelHeaders,
99104
getAbsHeaders: getAbsHeaders,
100105
getBlockBodies: getBlockBodies,

eth/handler.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -272,11 +272,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
272272
defer pm.removePeer(p.id)
273273

274274
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
275-
err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(),
276-
p.RequestHeadersByHash, p.RequestHeadersByNumber,
277-
p.RequestBodies, p.RequestReceipts, p.RequestNodeData,
278-
)
279-
if err != nil {
275+
if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head, p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil {
280276
return err
281277
}
282278
// Propagate existing transactions. new transactions appearing
@@ -411,7 +407,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
411407
// If we already have a DAO header, we can check the peer's TD against it. If
412408
// the peer's ahead of this, it too must have a reply to the DAO check
413409
if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
414-
if p.Td().Cmp(pm.blockchain.GetTd(daoHeader.Hash())) >= 0 {
410+
if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash())) >= 0 {
415411
verifyDAO = false
416412
}
417413
}
@@ -617,7 +613,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
617613
// Mark the hashes as present at the remote node
618614
for _, block := range announces {
619615
p.MarkBlock(block.Hash)
620-
p.SetHead(block.Hash)
621616
}
622617
// Schedule all the unknown hashes for retrieval
623618
unknown := make([]announce, 0, len(announces))
@@ -644,15 +639,23 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
644639

645640
// Mark the peer as owning the block and schedule it for import
646641
p.MarkBlock(request.Block.Hash())
647-
p.SetHead(request.Block.Hash())
648-
649642
pm.fetcher.Enqueue(p.id, request.Block)
650643

651-
// Update the peers total difficulty if needed, schedule a download if gapped
652-
if request.TD.Cmp(p.Td()) > 0 {
653-
p.SetTd(request.TD)
654-
td := pm.blockchain.GetTd(pm.blockchain.CurrentBlock().Hash())
655-
if request.TD.Cmp(new(big.Int).Add(td, request.Block.Difficulty())) > 0 {
644+
// Assuming the block is importable by the peer, but possibly not yet done so,
645+
// calculate the head hash and TD that the peer truly must have.
646+
var (
647+
trueHead = request.Block.ParentHash()
648+
trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty())
649+
)
650+
// Update the peers total difficulty if better than the previous
651+
if _, td := p.Head(); trueTD.Cmp(td) > 0 {
652+
p.SetHead(trueHead, trueTD)
653+
654+
// Schedule a sync if above ours. Note, this will not fire a sync for a gap of
655+
// a singe block (as the true TD is below the propagated block), however this
656+
// scenario should easily be covered by the fetcher.
657+
currentBlock := pm.blockchain.CurrentBlock()
658+
if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash())) > 0 {
656659
go pm.synchronise(p)
657660
}
658661
}

eth/peer.go

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -84,43 +84,31 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
8484

8585
// Info gathers and returns a collection of metadata known about a peer.
8686
func (p *peer) Info() *PeerInfo {
87+
hash, td := p.Head()
88+
8789
return &PeerInfo{
8890
Version: p.version,
89-
Difficulty: p.Td(),
90-
Head: fmt.Sprintf("%x", p.Head()),
91+
Difficulty: td,
92+
Head: hash.Hex(),
9193
}
9294
}
9395

94-
// Head retrieves a copy of the current head (most recent) hash of the peer.
95-
func (p *peer) Head() (hash common.Hash) {
96+
// Head retrieves a copy of the current head hash and total difficulty of the
97+
// peer.
98+
func (p *peer) Head() (hash common.Hash, td *big.Int) {
9699
p.lock.RLock()
97100
defer p.lock.RUnlock()
98101

99102
copy(hash[:], p.head[:])
100-
return hash
103+
return hash, new(big.Int).Set(p.td)
101104
}
102105

103-
// SetHead updates the head (most recent) hash of the peer.
104-
func (p *peer) SetHead(hash common.Hash) {
106+
// SetHead updates the head hash and total difficulty of the peer.
107+
func (p *peer) SetHead(hash common.Hash, td *big.Int) {
105108
p.lock.Lock()
106109
defer p.lock.Unlock()
107110

108111
copy(p.head[:], hash[:])
109-
}
110-
111-
// Td retrieves the current total difficulty of a peer.
112-
func (p *peer) Td() *big.Int {
113-
p.lock.RLock()
114-
defer p.lock.RUnlock()
115-
116-
return new(big.Int).Set(p.td)
117-
}
118-
119-
// SetTd updates the current total difficulty of a peer.
120-
func (p *peer) SetTd(td *big.Int) {
121-
p.lock.Lock()
122-
defer p.lock.Unlock()
123-
124112
p.td.Set(td)
125113
}
126114

@@ -411,7 +399,7 @@ func (ps *peerSet) BestPeer() *peer {
411399
bestTd *big.Int
412400
)
413401
for _, p := range ps.peers {
414-
if td := p.Td(); bestPeer == nil || td.Cmp(bestTd) > 0 {
402+
if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
415403
bestPeer, bestTd = p, td
416404
}
417405
}

eth/sync.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,17 +161,20 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
161161
if peer == nil {
162162
return
163163
}
164-
// Make sure the peer's TD is higher than our own. If not drop.
165-
td := pm.blockchain.GetTd(pm.blockchain.CurrentBlock().Hash())
166-
if peer.Td().Cmp(td) <= 0 {
164+
// Make sure the peer's TD is higher than our own
165+
currentBlock := pm.blockchain.CurrentBlock()
166+
td := pm.blockchain.GetTd(currentBlock.Hash())
167+
168+
pHead, pTd := peer.Head()
169+
if pTd.Cmp(td) <= 0 {
167170
return
168171
}
169172
// Otherwise try to sync with the downloader
170173
mode := downloader.FullSync
171174
if atomic.LoadUint32(&pm.fastSync) == 1 {
172175
mode = downloader.FastSync
173176
}
174-
if err := pm.downloader.Synchronise(peer.id, peer.Head(), peer.Td(), mode); err != nil {
177+
if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
175178
return
176179
}
177180
atomic.StoreUint32(&pm.synced, 1) // Mark initial sync done

0 commit comments

Comments
 (0)