Skip to content

Commit 1059221

Browse files
holimankaralabe
andauthored
eth/downloader: refactor downloader + queue (#21263)
* eth/downloader: refactor downloader + queue downloader, fetcher: throttle-metrics, fetcher filter improvements, standalone resultcache downloader: more accurate deliverytime calculation, less mem overhead in state requests downloader/queue: increase underlying buffer of results, new throttle mechanism eth/downloader: updates to tests eth/downloader: fix up some review concerns eth/downloader/queue: minor fixes eth/downloader: minor fixes after review call eth/downloader: testcases for queue.go eth/downloader: minor change, don't set progress unless progress... eth/downloader: fix flaw which prevented useless peers from being dropped eth/downloader: try to fix tests eth/downloader: verify non-deliveries against advertised remote head eth/downloader: fix flaw with checking closed-status causing hang eth/downloader: hashing avoidance eth/downloader: review concerns + simplify resultcache and queue eth/downloader: add back some locks, address review concerns downloader/queue: fix remaining lock flaw * eth/downloader: nitpick fixes * eth/downloader: remove the *2*3/4 throttling threshold dance * eth/downloader: print correct throttle threshold in stats Co-authored-by: Péter Szilágyi <[email protected]>
1 parent 3a57eec commit 1059221

File tree

11 files changed

+1110
-355
lines changed

11 files changed

+1110
-355
lines changed

core/types/block.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,17 @@ func rlpHash(x interface{}) (h common.Hash) {
147147
return h
148148
}
149149

150+
// EmptyBody returns true if there is no additional 'body' to complete the header
151+
// that is: no transactions and no uncles.
152+
func (h *Header) EmptyBody() bool {
153+
return h.TxHash == EmptyRootHash && h.UncleHash == EmptyUncleHash
154+
}
155+
156+
// EmptyReceipts returns true if there are no receipts for this header/block.
157+
func (h *Header) EmptyReceipts() bool {
158+
return h.ReceiptHash == EmptyRootHash
159+
}
160+
150161
// Body is a simple (mutable, non-safe) data container for storing and moving
151162
// a block's data contents (transactions and uncles) together.
152163
type Body struct {

eth/downloader/downloader.go

Lines changed: 55 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
219219
stateBloom: stateBloom,
220220
mux: mux,
221221
checkpoint: checkpoint,
222-
queue: newQueue(),
222+
queue: newQueue(blockCacheItems),
223223
peers: newPeerSet(),
224224
rttEstimate: uint64(rttMaxEstimate),
225225
rttConfidence: uint64(1000000),
@@ -370,7 +370,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
370370
d.stateBloom.Close()
371371
}
372372
// Reset the queue, peer set and wake channels to clean any internal leftover state
373-
d.queue.Reset()
373+
d.queue.Reset(blockCacheItems)
374374
d.peers.Reset()
375375

376376
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
@@ -597,6 +597,9 @@ func (d *Downloader) Terminate() {
597597
default:
598598
close(d.quitCh)
599599
}
600+
if d.stateBloom != nil {
601+
d.stateBloom.Close()
602+
}
600603
d.quitLock.Unlock()
601604

602605
// Cancel any pending download requests
@@ -629,7 +632,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
629632
// Make sure the peer actually gave something valid
630633
headers := packet.(*headerPack).headers
631634
if len(headers) != 1 {
632-
p.log.Debug("Multiple headers for single request", "headers", len(headers))
635+
p.log.Warn("Multiple headers for single request", "headers", len(headers))
633636
return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
634637
}
635638
head := headers[0]
@@ -866,7 +869,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
866869
// Make sure the peer actually gave something valid
867870
headers := packer.(*headerPack).headers
868871
if len(headers) != 1 {
869-
p.log.Debug("Multiple headers for single request", "headers", len(headers))
872+
p.log.Warn("Multiple headers for single request", "headers", len(headers))
870873
return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
871874
}
872875
arrived = true
@@ -890,7 +893,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
890893
}
891894
header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
892895
if header.Number.Uint64() != check {
893-
p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
896+
p.log.Warn("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
894897
return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number)
895898
}
896899
start = check
@@ -1106,17 +1109,18 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
11061109
pack := packet.(*headerPack)
11071110
return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh)
11081111
}
1109-
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
1110-
throttle = func() bool { return false }
1111-
reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) {
1112-
return d.queue.ReserveHeaders(p, count), false, nil
1112+
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
1113+
reserve = func(p *peerConnection, count int) (*fetchRequest, bool, bool) {
1114+
return d.queue.ReserveHeaders(p, count), false, false
11131115
}
11141116
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
11151117
capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
1116-
setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) }
1118+
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
1119+
p.SetHeadersIdle(accepted, deliveryTime)
1120+
}
11171121
)
11181122
err := d.fetchParts(d.headerCh, deliver, d.queue.headerContCh, expire,
1119-
d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
1123+
d.queue.PendingHeaders, d.queue.InFlightHeaders, reserve,
11201124
nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")
11211125

11221126
log.Debug("Skeleton fill terminated", "err", err)
@@ -1139,10 +1143,10 @@ func (d *Downloader) fetchBodies(from uint64) error {
11391143
expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
11401144
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
11411145
capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
1142-
setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
1146+
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) }
11431147
)
11441148
err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
1145-
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
1149+
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ReserveBodies,
11461150
d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")
11471151

11481152
log.Debug("Block body download terminated", "err", err)
@@ -1163,10 +1167,12 @@ func (d *Downloader) fetchReceipts(from uint64) error {
11631167
expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
11641168
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
11651169
capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
1166-
setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
1170+
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
1171+
p.SetReceiptsIdle(accepted, deliveryTime)
1172+
}
11671173
)
11681174
err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
1169-
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
1175+
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ReserveReceipts,
11701176
d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")
11711177

11721178
log.Debug("Transaction receipt download terminated", "err", err)
@@ -1199,9 +1205,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
11991205
// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
12001206
// - kind: textual label of the type being downloaded to display in log messages
12011207
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
1202-
expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
1208+
expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
12031209
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
1204-
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
1210+
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {
12051211

12061212
// Create a ticker to detect expired retrieval tasks
12071213
ticker := time.NewTicker(100 * time.Millisecond)
@@ -1217,6 +1223,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
12171223
return errCanceled
12181224

12191225
case packet := <-deliveryCh:
1226+
deliveryTime := time.Now()
12201227
// If the peer was previously banned and failed to deliver its pack
12211228
// in a reasonable time frame, ignore its message.
12221229
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
@@ -1229,7 +1236,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
12291236
// caused by a timed out request which came through in the end), set it to
12301237
// idle. If the delivery's stale, the peer should have already been idled.
12311238
if !errors.Is(err, errStaleDelivery) {
1232-
setIdle(peer, accepted)
1239+
setIdle(peer, accepted, deliveryTime)
12331240
}
12341241
// Issue a log to the user to see what's going on
12351242
switch {
@@ -1282,7 +1289,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
12821289
// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
12831290
if fails > 2 {
12841291
peer.log.Trace("Data delivery timed out", "type", kind)
1285-
setIdle(peer, 0)
1292+
setIdle(peer, 0, time.Now())
12861293
} else {
12871294
peer.log.Debug("Stalling delivery, dropping", "type", kind)
12881295

@@ -1317,27 +1324,27 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
13171324
// Send a download request to all idle peers, until throttled
13181325
progressed, throttled, running := false, false, inFlight()
13191326
idles, total := idle()
1320-
1327+
pendCount := pending()
13211328
for _, peer := range idles {
13221329
// Short circuit if throttling activated
1323-
if throttle() {
1324-
throttled = true
1330+
if throttled {
13251331
break
13261332
}
13271333
// Short circuit if there is no more available task.
1328-
if pending() == 0 {
1334+
if pendCount = pending(); pendCount == 0 {
13291335
break
13301336
}
13311337
// Reserve a chunk of fetches for a peer. A nil can mean either that
13321338
// no more headers are available, or that the peer is known not to
13331339
// have them.
1334-
request, progress, err := reserve(peer, capacity(peer))
1335-
if err != nil {
1336-
return err
1337-
}
1340+
request, progress, throttle := reserve(peer, capacity(peer))
13381341
if progress {
13391342
progressed = true
13401343
}
1344+
if throttle {
1345+
throttled = true
1346+
throttleCounter.Inc(1)
1347+
}
13411348
if request == nil {
13421349
continue
13431350
}
@@ -1362,7 +1369,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
13621369
}
13631370
// Make sure that we have peers available for fetching. If all peers have been tried
13641371
// and all failed throw an error
1365-
if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
1372+
if !progressed && !throttled && !running && len(idles) == total && pendCount > 0 {
13661373
return errPeersUnavailable
13671374
}
13681375
}
@@ -1374,8 +1381,11 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
13741381
// queue until the stream ends or a failure occurs.
13751382
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
13761383
// Keep a count of uncertain headers to roll back
1377-
var rollback []*types.Header
1378-
mode := d.getMode()
1384+
var (
1385+
rollback []*types.Header
1386+
rollbackErr error
1387+
mode = d.getMode()
1388+
)
13791389
defer func() {
13801390
if len(rollback) > 0 {
13811391
// Flatten the headers and roll them back
@@ -1397,7 +1407,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
13971407
log.Warn("Rolled back headers", "count", len(hashes),
13981408
"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
13991409
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
1400-
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock))
1410+
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
14011411
}
14021412
}()
14031413

@@ -1407,6 +1417,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
14071417
for {
14081418
select {
14091419
case <-d.cancelCh:
1420+
rollbackErr = errCanceled
14101421
return errCanceled
14111422

14121423
case headers := <-d.headerProcCh:
@@ -1460,6 +1471,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
14601471
// Terminate if something failed in between processing chunks
14611472
select {
14621473
case <-d.cancelCh:
1474+
rollbackErr = errCanceled
14631475
return errCanceled
14641476
default:
14651477
}
@@ -1484,11 +1496,12 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
14841496
frequency = 1
14851497
}
14861498
if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
1499+
rollbackErr = err
14871500
// If some headers were inserted, add them too to the rollback list
14881501
if n > 0 {
14891502
rollback = append(rollback, chunk[:n]...)
14901503
}
1491-
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
1504+
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err)
14921505
return fmt.Errorf("%w: %v", errInvalidChain, err)
14931506
}
14941507
// All verifications passed, store newly found uncertain headers
@@ -1503,14 +1516,15 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
15031516
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
15041517
select {
15051518
case <-d.cancelCh:
1519+
rollbackErr = errCanceled
15061520
return errCanceled
15071521
case <-time.After(time.Second):
15081522
}
15091523
}
15101524
// Otherwise insert the headers for content retrieval
15111525
inserts := d.queue.Schedule(chunk, origin)
15121526
if len(inserts) != len(chunk) {
1513-
log.Debug("Stale headers")
1527+
rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk))
15141528
return fmt.Errorf("%w: stale headers", errBadPeer)
15151529
}
15161530
}
@@ -1680,6 +1694,14 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
16801694
}
16811695

16821696
func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
1697+
if len(results) == 0 {
1698+
return nil, nil, nil
1699+
}
1700+
if lastNum := results[len(results)-1].Header.Number.Uint64(); lastNum < pivot {
1701+
// the pivot is somewhere in the future
1702+
return nil, results, nil
1703+
}
1704+
// This can also be optimized, but only happens very seldom
16831705
for _, result := range results {
16841706
num := result.Header.Number.Uint64()
16851707
switch {

0 commit comments

Comments
 (0)