Skip to content

Commit bd5720f

Browse files
committed
eth, eth/downloader: handle sync errors a bit more gracefully
1 parent 9d188f7 commit bd5720f

File tree

4 files changed

+48
-48
lines changed

4 files changed

+48
-48
lines changed

eth/downloader/downloader.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ var (
2424
blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out
2525

2626
errLowTd = errors.New("peer's TD is too low")
27-
errBusy = errors.New("busy")
27+
ErrBusy = errors.New("busy")
2828
errUnknownPeer = errors.New("peer's unknown or unhealthy")
29-
ErrBadPeer = errors.New("action from bad peer ignored")
29+
errBadPeer = errors.New("action from bad peer ignored")
3030
errNoPeers = errors.New("no peers to keep download active")
3131
errPendingQueue = errors.New("pending items in queue")
32-
errTimeout = errors.New("timeout")
32+
ErrTimeout = errors.New("timeout")
3333
errEmptyHashSet = errors.New("empty hash set by peer")
3434
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
3535
errAlreadyInPool = errors.New("hash already in pool")
@@ -68,7 +68,7 @@ type Downloader struct {
6868
getBlock getBlockFn
6969

7070
// Status
71-
synchronizing int32
71+
synchronising int32
7272

7373
// Channels
7474
newPeerCh chan *peer
@@ -119,15 +119,15 @@ func (d *Downloader) UnregisterPeer(id string) {
119119
delete(d.peers, id)
120120
}
121121

122-
// Synchronize will select the peer and use it for synchronizing. If an empty string is given
122+
// Synchronise will select the peer and use it for synchronising. If an empty string is given
123123
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
124124
// checks fail an error will be returned. This method is synchronous
125-
func (d *Downloader) Synchronize(id string, hash common.Hash) error {
125+
func (d *Downloader) Synchronise(id string, hash common.Hash) error {
126126
// Make sure only one goroutine is ever allowed past this point at once
127-
if !atomic.CompareAndSwapInt32(&d.synchronizing, 0, 1) {
128-
return nil
127+
if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
128+
return ErrBusy
129129
}
130-
defer atomic.StoreInt32(&d.synchronizing, 0)
130+
defer atomic.StoreInt32(&d.synchronising, 0)
131131

132132
// Abort if the queue still contains some leftover data
133133
if _, cached := d.queue.Size(); cached > 0 {
@@ -272,7 +272,7 @@ out:
272272
// the zero hash.
273273
if p == nil || (hash == common.Hash{}) {
274274
d.queue.Reset()
275-
return errTimeout
275+
return ErrTimeout
276276
}
277277

278278
// set p to the active peer. this will invalidate any hashes that may be returned
@@ -282,7 +282,7 @@ out:
282282
glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id)
283283
}
284284
}
285-
glog.V(logger.Detail).Infof("Downloaded hashes (%d) in %v\n", d.queue.Pending(), time.Since(start))
285+
glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v\n", d.queue.Pending(), time.Since(start))
286286

287287
return nil
288288
}
@@ -384,7 +384,6 @@ out:
384384
}
385385
}
386386
}
387-
388387
glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start))
389388

390389
return nil
@@ -404,11 +403,10 @@ func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
404403
return fmt.Errorf("received hashes from %s while active peer is %s", id, d.activePeer)
405404
}
406405

407-
if glog.V(logger.Detail) && len(hashes) != 0 {
406+
if glog.V(logger.Debug) && len(hashes) != 0 {
408407
from, to := hashes[0], hashes[len(hashes)-1]
409-
glog.Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id)
408+
glog.V(logger.Debug).Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id)
410409
}
411-
412410
d.hashCh <- hashPack{id, hashes}
413411

414412
return nil

eth/downloader/downloader_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types
6161

6262
func (dl *downloadTester) sync(peerId string, hash common.Hash) error {
6363
dl.activePeerId = peerId
64-
return dl.downloader.Synchronize(peerId, hash)
64+
return dl.downloader.Synchronise(peerId, hash)
6565
}
6666

6767
func (dl *downloadTester) hasBlock(hash common.Hash) bool {
@@ -217,13 +217,13 @@ func TestThrottling(t *testing.T) {
217217
}
218218
}()
219219

220-
// Synchronize the two threads and verify
220+
// Synchronise the two threads and verify
221221
err := <-errc
222222
done <- struct{}{}
223223
<-done
224224

225225
if err != nil {
226-
t.Fatalf("failed to synchronize blocks: %v", err)
226+
t.Fatalf("failed to synchronise blocks: %v", err)
227227
}
228228
if len(took) != targetBlocks {
229229
t.Fatalf("downloaded block mismatch: have %v, want %v", len(took), targetBlocks)

eth/handler.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ import (
1919
)
2020

2121
const (
22-
peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
23-
blockProcTimer = 500 * time.Millisecond
24-
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
22+
forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
23+
blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process
24+
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
2525
blockProcAmount = 256
2626
)
2727

@@ -324,7 +324,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
324324
}
325325
self.BroadcastBlock(hash, request.Block)
326326
} else {
327-
go self.synchronize(p)
327+
go self.synchronise(p)
328328
}
329329
default:
330330
return errResp(ErrInvalidMsgCode, "%v", msg.Code)

eth/sync.go

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@ import (
1212
// Sync contains all synchronisation code for the eth protocol
1313

1414
func (pm *ProtocolManager) update() {
15-
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
16-
itimer := time.NewTimer(peerCountTimeout)
17-
// btimer is used for picking of blocks from the downloader
18-
btimer := time.Tick(blockProcTimer)
15+
forceSync := time.Tick(forceSyncCycle)
16+
blockProc := time.Tick(blockProcCycle)
1917

2018
for {
2119
select {
@@ -24,27 +22,22 @@ func (pm *ProtocolManager) update() {
2422
if len(pm.peers) < minDesiredPeerCount {
2523
break
2624
}
27-
28-
// Find the best peer
25+
// Find the best peer and synchronise with it
2926
peer := getBestPeer(pm.peers)
3027
if peer == nil {
31-
glog.V(logger.Debug).Infoln("Sync attempt cancelled. No peers available")
28+
glog.V(logger.Debug).Infoln("Sync attempt canceled. No peers available")
3229
}
30+
go pm.synchronise(peer)
3331

34-
itimer.Stop()
35-
go pm.synchronize(peer)
36-
case <-itimer.C:
37-
// The timer will make sure that the downloader keeps an active state
38-
// in which it attempts to always check the network for highest td peers
39-
// Either select the peer or restart the timer if no peers could
40-
// be selected.
32+
case <-forceSync:
33+
// Force a sync even if not enough peers are present
4134
if peer := getBestPeer(pm.peers); peer != nil {
42-
go pm.synchronize(peer)
43-
} else {
44-
itimer.Reset(5 * time.Second)
35+
go pm.synchronise(peer)
4536
}
46-
case <-btimer:
37+
case <-blockProc:
38+
// Try to pull some blocks from the downloaded
4739
go pm.processBlocks()
40+
4841
case <-pm.quitSync:
4942
return
5043
}
@@ -59,11 +52,11 @@ func (pm *ProtocolManager) processBlocks() error {
5952
pm.wg.Add(1)
6053
defer pm.wg.Done()
6154

55+
// Take a batch of blocks (will return nil if a previous batch has not reached the chain yet)
6256
blocks := pm.downloader.TakeBlocks()
6357
if len(blocks) == 0 {
6458
return nil
6559
}
66-
6760
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
6861

6962
for len(blocks) != 0 && !pm.quit {
@@ -77,7 +70,7 @@ func (pm *ProtocolManager) processBlocks() error {
7770
return nil
7871
}
7972

80-
func (pm *ProtocolManager) synchronize(peer *peer) {
73+
func (pm *ProtocolManager) synchronise(peer *peer) {
8174
// Make sure the peer's TD is higher than our own. If not drop.
8275
if peer.td.Cmp(pm.chainman.Td()) <= 0 {
8376
return
@@ -89,12 +82,21 @@ func (pm *ProtocolManager) synchronize(peer *peer) {
8982
return
9083
}
9184
// Get the hashes from the peer (synchronously)
92-
err := pm.downloader.Synchronize(peer.id, peer.recentHash)
93-
if err != nil && err == downloader.ErrBadPeer {
94-
glog.V(logger.Debug).Infoln("removed peer from peer set due to bad action")
85+
glog.V(logger.Debug).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash)
86+
87+
err := pm.downloader.Synchronise(peer.id, peer.recentHash)
88+
switch err {
89+
case nil:
90+
glog.V(logger.Debug).Infof("Synchronisation completed")
91+
92+
case downloader.ErrBusy:
93+
glog.V(logger.Debug).Infof("Synchronisation already in progress")
94+
95+
case downloader.ErrTimeout:
96+
glog.V(logger.Debug).Infof("Removing peer %v due to sync timeout", peer.id)
9597
pm.removePeer(peer)
96-
} else if err != nil {
97-
// handle error
98-
glog.V(logger.Detail).Infoln("error downloading:", err)
98+
99+
default:
100+
glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
99101
}
100102
}

0 commit comments

Comments
 (0)