Skip to content

Commit f16fab9

Browse files
committed
Merge pull request #1953 from karalabe/switch-to-fast-peers
eth/downloader: fetch data proportionally to peer capacity
2 parents 4c2933a + b6f5523 commit f16fab9

File tree

3 files changed

+258
-246
lines changed

3 files changed

+258
-246
lines changed

eth/downloader/downloader.go

Lines changed: 74 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,17 @@ var (
4545
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
4646
MaxStateFetch = 384 // Amount of node state values to allow fetching per request
4747

48-
hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out
49-
blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth
50-
blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired
51-
headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out
52-
bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth
53-
bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired
54-
receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
55-
receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired
56-
stateSoftTTL = 2 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
57-
stateHardTTL = 3 * stateSoftTTL // [eth/63] Maximum time allowance before a node data request is considered expired
48+
hashTTL = 3 * time.Second // [eth/61] Time it takes for a hash request to time out
49+
blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
50+
blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
51+
52+
headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out
53+
bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request
54+
bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
55+
receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request
56+
receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired
57+
stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request
58+
stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired
5859

5960
maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
6061
maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
@@ -486,7 +487,7 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
486487
// Request the advertised remote head block and wait for the response
487488
go p.getBlocks([]common.Hash{p.head})
488489

489-
timeout := time.After(blockSoftTTL)
490+
timeout := time.After(hashTTL)
490491
for {
491492
select {
492493
case <-d.cancelCh:
@@ -779,47 +780,27 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
779780
// If the peer was previously banned and failed to deliver it's pack
780781
// in a reasonable time frame, ignore it's message.
781782
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
782-
// Deliver the received chunk of blocks, and demote in case of errors
783783
blocks := packet.(*blockPack).blocks
784-
err := d.queue.DeliverBlocks(peer.id, blocks)
785-
switch err {
786-
case nil:
787-
// If no blocks were delivered, demote the peer (need the delivery above)
788-
if len(blocks) == 0 {
789-
peer.Demote()
790-
peer.SetBlocksIdle()
791-
glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
792-
break
793-
}
794-
// All was successful, promote the peer and potentially start processing
795-
peer.Promote()
796-
peer.SetBlocksIdle()
797-
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
798784

799-
case errInvalidChain:
800-
// The hash chain is invalid (blocks are not ordered properly), abort
785+
// Deliver the received chunk of blocks and check chain validity
786+
accepted, err := d.queue.DeliverBlocks(peer.id, blocks)
787+
if err == errInvalidChain {
801788
return err
802-
803-
case errNoFetchesPending:
804-
// Peer probably timed out with its delivery but came through
805-
// in the end, demote, but allow to to pull from this peer.
806-
peer.Demote()
807-
peer.SetBlocksIdle()
808-
glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)
809-
810-
case errStaleDelivery:
811-
// Delivered something completely else than requested, usually
812-
// caused by a timeout and delivery during a new sync cycle.
813-
// Don't set it to idle as the original request should still be
814-
// in flight.
815-
peer.Demote()
816-
glog.V(logger.Detail).Infof("%s: stale delivery", peer)
817-
789+
}
790+
// Unless a peer delivered something completely else than requested (usually
791+
// caused by a timed out request which came through in the end), set it to
792+
// idle. If the delivery's stale, the peer should have already been idled.
793+
if err != errStaleDelivery {
794+
peer.SetBlocksIdle(accepted)
795+
}
796+
// Issue a log to the user to see what's going on
797+
switch {
798+
case err == nil && len(blocks) == 0:
799+
glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
800+
case err == nil:
801+
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
818802
default:
819-
// Peer did something semi-useful, demote but keep it around
820-
peer.Demote()
821-
peer.SetBlocksIdle()
822-
glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
803+
glog.V(logger.Detail).Infof("%s: delivery failed: %v", peer, err)
823804
}
824805
}
825806
// Blocks arrived, try to update the progress
@@ -852,10 +833,15 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
852833
return errNoPeers
853834
}
854835
// Check for block request timeouts and demote the responsible peers
855-
for _, pid := range d.queue.ExpireBlocks(blockHardTTL) {
836+
for pid, fails := range d.queue.ExpireBlocks(blockTTL) {
856837
if peer := d.peers.Peer(pid); peer != nil {
857-
peer.Demote()
858-
glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
838+
if fails > 1 {
839+
glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
840+
peer.SetBlocksIdle(0)
841+
} else {
842+
glog.V(logger.Debug).Infof("%s: stalling block delivery, dropping", peer)
843+
d.dropPeer(pid)
844+
}
859845
}
860846
}
861847
// If there's nothing more to fetch, wait or terminate
@@ -1281,14 +1267,14 @@ func (d *Downloader) fetchBodies(from uint64) error {
12811267
glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
12821268

12831269
var (
1284-
deliver = func(packet dataPack) error {
1270+
deliver = func(packet dataPack) (int, error) {
12851271
pack := packet.(*bodyPack)
12861272
return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
12871273
}
1288-
expire = func() []string { return d.queue.ExpireBodies(bodyHardTTL) }
1274+
expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) }
12891275
fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
12901276
capacity = func(p *peer) int { return p.BlockCapacity() }
1291-
setIdle = func(p *peer) { p.SetBodiesIdle() }
1277+
setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
12921278
)
12931279
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
12941280
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
@@ -1305,14 +1291,14 @@ func (d *Downloader) fetchReceipts(from uint64) error {
13051291
glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)
13061292

13071293
var (
1308-
deliver = func(packet dataPack) error {
1294+
deliver = func(packet dataPack) (int, error) {
13091295
pack := packet.(*receiptPack)
13101296
return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
13111297
}
1312-
expire = func() []string { return d.queue.ExpireReceipts(receiptHardTTL) }
1298+
expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) }
13131299
fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
13141300
capacity = func(p *peer) int { return p.ReceiptCapacity() }
1315-
setIdle = func(p *peer) { p.SetReceiptsIdle() }
1301+
setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
13161302
)
13171303
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
13181304
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
@@ -1329,7 +1315,7 @@ func (d *Downloader) fetchNodeData() error {
13291315
glog.V(logger.Debug).Infof("Downloading node state data")
13301316

13311317
var (
1332-
deliver = func(packet dataPack) error {
1318+
deliver = func(packet dataPack) (int, error) {
13331319
start := time.Now()
13341320
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
13351321
if err != nil {
@@ -1352,14 +1338,14 @@ func (d *Downloader) fetchNodeData() error {
13521338
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
13531339
})
13541340
}
1355-
expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) }
1341+
expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }
13561342
throttle = func() bool { return false }
13571343
reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
13581344
return d.queue.ReserveNodeData(p, count), false, nil
13591345
}
13601346
fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
13611347
capacity = func(p *peer) int { return p.NodeDataCapacity() }
1362-
setIdle = func(p *peer) { p.SetNodeDataIdle() }
1348+
setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
13631349
)
13641350
err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
13651351
d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
@@ -1372,10 +1358,10 @@ func (d *Downloader) fetchNodeData() error {
13721358
// fetchParts iteratively downloads scheduled block parts, taking any available
13731359
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
13741360
// also periodically checking for timeouts.
1375-
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool,
1376-
expire func() []string, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
1361+
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
1362+
expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
13771363
fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
1378-
idle func() ([]*peer, int), setIdle func(*peer), kind string) error {
1364+
idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error {
13791365

13801366
// Create a ticker to detect expired retrieval tasks
13811367
ticker := time.NewTicker(100 * time.Millisecond)
@@ -1394,45 +1380,25 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
13941380
// If the peer was previously banned and failed to deliver it's pack
13951381
// in a reasonable time frame, ignore it's message.
13961382
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
1397-
// Deliver the received chunk of data, and demote in case of errors
1398-
switch err := deliver(packet); err {
1399-
case nil:
1400-
// If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!)
1401-
if packet.Items() == 0 {
1402-
peer.Demote()
1403-
setIdle(peer)
1404-
glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
1405-
break
1406-
}
1407-
// All was successful, promote the peer and potentially start processing
1408-
peer.Promote()
1409-
setIdle(peer)
1410-
glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
1411-
1412-
case errInvalidChain:
1413-
// The hash chain is invalid (blocks are not ordered properly), abort
1383+
// Deliver the received chunk of data and check chain validity
1384+
accepted, err := deliver(packet)
1385+
if err == errInvalidChain {
14141386
return err
1415-
1416-
case errNoFetchesPending:
1417-
// Peer probably timed out with its delivery but came through
1418-
// in the end, demote, but allow to to pull from this peer.
1419-
peer.Demote()
1420-
setIdle(peer)
1421-
glog.V(logger.Detail).Infof("%s: out of bound %s delivery", peer, strings.ToLower(kind))
1422-
1423-
case errStaleDelivery:
1424-
// Delivered something completely else than requested, usually
1425-
// caused by a timeout and delivery during a new sync cycle.
1426-
// Don't set it to idle as the original request should still be
1427-
// in flight.
1428-
peer.Demote()
1429-
glog.V(logger.Detail).Infof("%s: %s stale delivery", peer, strings.ToLower(kind))
1430-
1387+
}
1388+
// Unless a peer delivered something completely else than requested (usually
1389+
// caused by a timed out request which came through in the end), set it to
1390+
// idle. If the delivery's stale, the peer should have already been idled.
1391+
if err != errStaleDelivery {
1392+
setIdle(peer, accepted)
1393+
}
1394+
// Issue a log to the user to see what's going on
1395+
switch {
1396+
case err == nil && packet.Items() == 0:
1397+
glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
1398+
case err == nil:
1399+
glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
14311400
default:
1432-
// Peer did something semi-useful, demote but keep it around
1433-
peer.Demote()
1434-
setIdle(peer)
1435-
glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err)
1401+
glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err)
14361402
}
14371403
}
14381404
// Blocks assembled, try to update the progress
@@ -1465,11 +1431,15 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
14651431
return errNoPeers
14661432
}
14671433
// Check for fetch request timeouts and demote the responsible peers
1468-
for _, pid := range expire() {
1434+
for pid, fails := range expire() {
14691435
if peer := d.peers.Peer(pid); peer != nil {
1470-
peer.Demote()
1471-
setIdle(peer)
1472-
glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
1436+
if fails > 1 {
1437+
glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
1438+
setIdle(peer, 0)
1439+
} else {
1440+
glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind))
1441+
d.dropPeer(pid)
1442+
}
14731443
}
14741444
}
14751445
// If there's nothing more to fetch, wait or terminate

0 commit comments

Comments
 (0)