Skip to content

Commit 1470b22

Browse files
committed
downloader: hash downloading recovery
If a peer fails to respond (disconnect, etc) during hash downloading switch to a different peer which has it's current_hash in the queue's peer set.
1 parent ba2236f commit 1470b22

File tree

2 files changed

+71
-29
lines changed

2 files changed

+71
-29
lines changed

eth/downloader/downloader.go

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ type syncPack struct {
5353
ignoreInitial bool
5454
}
5555

56+
type hashPack struct {
57+
peerId string
58+
hashes []common.Hash
59+
}
60+
5661
type Downloader struct {
5762
mu sync.RWMutex
5863
queue *queue
@@ -69,7 +74,7 @@ type Downloader struct {
6974

7075
// Channels
7176
newPeerCh chan *peer
72-
hashCh chan []common.Hash
77+
hashCh chan hashPack
7378
blockCh chan blockPack
7479
}
7580

@@ -80,7 +85,7 @@ func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
8085
hasBlock: hasBlock,
8186
getBlock: getBlock,
8287
newPeerCh: make(chan *peer, 1),
83-
hashCh: make(chan []common.Hash, 1),
88+
hashCh: make(chan hashPack, 1),
8489
blockCh: make(chan blockPack, 1),
8590
}
8691

@@ -235,38 +240,50 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool)
235240
}
236241

237242
// XXX Make synchronous
238-
func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitial bool) error {
243+
func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error {
239244
atomic.StoreInt32(&d.fetchingHashes, 1)
240245
defer atomic.StoreInt32(&d.fetchingHashes, 0)
241246

242-
if d.queue.has(hash) {
247+
if d.queue.has(h) {
243248
return errAlreadyInPool
244249
}
245250

246-
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", hash.Bytes()[:4], p.id)
251+
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
247252

248253
start := time.Now()
249254

250255
// We ignore the initial hash in some cases (e.g. we received a block without it's parent)
251256
// In such circumstances we don't need to download the block so don't add it to the queue.
252257
if !ignoreInitial {
253258
// Add the hash to the queue first
254-
d.queue.hashPool.Add(hash)
259+
d.queue.hashPool.Add(h)
255260
}
256261
// Get the first batch of hashes
257-
p.getHashes(hash)
262+
p.getHashes(h)
258263

259-
failureResponseTimer := time.NewTimer(hashTtl)
264+
var (
265+
failureResponseTimer = time.NewTimer(hashTtl)
266+
attemptedPeers = make(map[string]bool) // attempted peers will help with retries
267+
activePeer = p // active peer will help determine the current active peer
268+
hash common.Hash // common and last hash
269+
)
270+
attemptedPeers[p.id] = true
260271

261272
out:
262273
for {
263274
select {
264-
case hashes := <-d.hashCh:
275+
case hashPack := <-d.hashCh:
276+
// make sure the active peer is giving us the hashes
277+
if hashPack.peerId != activePeer.id {
278+
glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId)
279+
break
280+
}
281+
265282
failureResponseTimer.Reset(hashTtl)
266283

267284
var (
268-
done bool // determines whether we're done fetching hashes (i.e. common hash found)
269-
hash common.Hash // current and common hash
285+
hashes = hashPack.hashes
286+
done bool // determines whether we're done fetching hashes (i.e. common hash found)
270287
)
271288
hashSet := set.New()
272289
for _, hash = range hashes {
@@ -283,13 +300,13 @@ out:
283300

284301
// Add hashes to the chunk set
285302
if len(hashes) == 0 { // Make sure the peer actually gave you something valid
286-
glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", p.id)
303+
glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", activePeer.id)
287304
d.queue.reset()
288305

289306
return errEmptyHashSet
290307
} else if !done { // Check if we're done fetching
291308
// Get the next set of hashes
292-
p.getHashes(hashes[len(hashes)-1])
309+
activePeer.getHashes(hash)
293310
} else { // we're done
294311
// The offset of the queue is determined by the highest known block
295312
var offset int
@@ -303,12 +320,30 @@ out:
303320
}
304321
case <-failureResponseTimer.C:
305322
glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
306-
// TODO instead of reseting the queue select a new peer from which we can start downloading hashes.
307-
// 1. check for peer's best hash to be included in the current hash set;
308-
// 2. resume from last point (hashes[len(hashes)-1]) using the newly selected peer.
309-
d.queue.reset()
310323

311-
return errTimeout
324+
var p *peer // p will be set if a peer can be found
325+
// Attempt to find a new peer by checking inclusion of peers best hash in our
326+
// already fetched hash list. This can't guarantee 100% correctness but does
327+
// a fair job. This is always either correct or false incorrect.
328+
for id, peer := range d.peers {
329+
if d.queue.hashPool.Has(peer.recentHash) && !attemptedPeers[id] {
330+
p = peer
331+
break
332+
}
333+
}
334+
335+
// if all peers have been tried, abort the process entirely or if the hash is
336+
// the zero hash.
337+
if p == nil || (hash == common.Hash{}) {
338+
d.queue.reset()
339+
return errTimeout
340+
}
341+
342+
// set p to the active peer. this will invalidate any hashes that may be returned
343+
// by our previous (delayed) peer.
344+
activePeer = p
345+
p.getHashes(hash)
346+
glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id)
312347
}
313348
}
314349
glog.V(logger.Detail).Infof("Downloaded hashes (%d) in %v\n", d.queue.hashPool.Size(), time.Since(start))
@@ -454,7 +489,7 @@ func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
454489
glog.Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.hashPool.Size(), from[:4], to[:4], id)
455490
}
456491

457-
d.hashCh <- hashes
492+
d.hashCh <- hashPack{id, hashes}
458493

459494
return nil
460495
}

eth/downloader/downloader_test.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,13 @@ func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block {
4242
}
4343

4444
type downloadTester struct {
45-
downloader *Downloader
46-
hashes []common.Hash
47-
blocks map[common.Hash]*types.Block
48-
t *testing.T
49-
pcount int
50-
done chan bool
45+
downloader *Downloader
46+
hashes []common.Hash
47+
blocks map[common.Hash]*types.Block
48+
t *testing.T
49+
pcount int
50+
done chan bool
51+
activePeerId string
5152
}
5253

5354
func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types.Block) *downloadTester {
@@ -58,6 +59,11 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types
5859
return tester
5960
}
6061

62+
func (dl *downloadTester) sync(peerId string, hash common.Hash) error {
63+
dl.activePeerId = peerId
64+
return dl.downloader.Synchronise(peerId, hash)
65+
}
66+
6167
func (dl *downloadTester) hasBlock(hash common.Hash) bool {
6268
if knownHash == hash {
6369
return true
@@ -70,7 +76,7 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block {
7076
}
7177

7278
func (dl *downloadTester) getHashes(hash common.Hash) error {
73-
dl.downloader.hashCh <- dl.hashes
79+
dl.downloader.AddHashes(dl.activePeerId, dl.hashes)
7480
return nil
7581
}
7682

@@ -115,8 +121,9 @@ func TestDownload(t *testing.T) {
115121
tester.newPeer("peer2", big.NewInt(0), common.Hash{})
116122
tester.badBlocksPeer("peer3", big.NewInt(0), common.Hash{})
117123
tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{})
124+
tester.activePeerId = "peer1"
118125

119-
err := tester.downloader.Synchronise("peer1", hashes[0])
126+
err := tester.sync("peer1", hashes[0])
120127
if err != nil {
121128
t.Error("download error", err)
122129
}
@@ -139,7 +146,7 @@ func TestMissing(t *testing.T) {
139146
hashes = append(extraHashes, hashes[:len(hashes)-1]...)
140147
tester.newPeer("peer2", big.NewInt(0), common.Hash{})
141148

142-
err := tester.downloader.Synchronise("peer1", hashes[0])
149+
err := tester.sync("peer1", hashes[0])
143150
if err != nil {
144151
t.Error("download error", err)
145152
}
@@ -164,7 +171,7 @@ func TestTaking(t *testing.T) {
164171
tester.badBlocksPeer("peer3", big.NewInt(0), common.Hash{})
165172
tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{})
166173

167-
err := tester.downloader.Synchronise("peer1", hashes[0])
174+
err := tester.sync("peer1", hashes[0])
168175
if err != nil {
169176
t.Error("download error", err)
170177
}

0 commit comments

Comments
 (0)