Skip to content

Commit 4918c82

Browse files
committed
[release/1.4.6] eth/downloader, trie: pull head state concurrently with chain
(cherry picked from commit 4f1d92b)
1 parent 5904d58 commit 4918c82

File tree

3 files changed

+41
-30
lines changed

3 files changed

+41
-30
lines changed

eth/downloader/downloader.go

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/ethereum/go-ethereum/logger"
3636
"github.com/ethereum/go-ethereum/logger/glog"
3737
"github.com/ethereum/go-ethereum/params"
38+
"github.com/ethereum/go-ethereum/trie"
3839
"github.com/rcrowley/go-metrics"
3940
)
4041

@@ -114,7 +115,6 @@ type Downloader struct {
114115
// Statistics
115116
syncStatsChainOrigin uint64 // Origin block number where syncing started at
116117
syncStatsChainHeight uint64 // Highest block number known when syncing started
117-
syncStatsStateTotal uint64 // Total number of node state entries known so far
118118
syncStatsStateDone uint64 // Number of state trie entries already pulled
119119
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
120120

@@ -321,12 +321,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
321321
empty = true
322322
}
323323
}
324-
// Reset any ephemeral sync statistics
325-
d.syncStatsLock.Lock()
326-
d.syncStatsStateTotal = 0
327-
d.syncStatsStateDone = 0
328-
d.syncStatsLock.Unlock()
329-
330324
// Create cancel channel for aborting mid-flight
331325
d.cancelLock.Lock()
332326
d.cancelCh = make(chan struct{})
@@ -382,7 +376,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
382376
d.syncStatsLock.Unlock()
383377

384378
// Initiate the sync using a concurrent hash and block retrieval algorithm
385-
d.queue.Prepare(origin+1, d.mode, 0)
379+
d.queue.Prepare(origin+1, d.mode, 0, nil)
386380
if d.syncInitHook != nil {
387381
d.syncInitHook(origin, latest)
388382
}
@@ -397,30 +391,32 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
397391
if err != nil {
398392
return err
399393
}
400-
origin, err := d.findAncestor(p, latest)
394+
height := latest.Number.Uint64()
395+
396+
origin, err := d.findAncestor(p, height)
401397
if err != nil {
402398
return err
403399
}
404400
d.syncStatsLock.Lock()
405401
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
406402
d.syncStatsChainOrigin = origin
407403
}
408-
d.syncStatsChainHeight = latest
404+
d.syncStatsChainHeight = height
409405
d.syncStatsLock.Unlock()
410406

411407
// Initiate the sync using a concurrent header and content retrieval algorithm
412408
pivot := uint64(0)
413409
switch d.mode {
414410
case LightSync:
415-
pivot = latest
411+
pivot = height
416412
case FastSync:
417413
// Calculate the new fast/slow sync pivot point
418414
pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
419415
if err != nil {
420416
panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
421417
}
422-
if latest > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
423-
pivot = latest - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
418+
if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
419+
pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
424420
}
425421
// If the point is below the origin, move origin back to ensure state download
426422
if pivot < origin {
@@ -432,9 +428,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
432428
}
433429
glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
434430
}
435-
d.queue.Prepare(origin+1, d.mode, pivot)
431+
d.queue.Prepare(origin+1, d.mode, pivot, latest)
436432
if d.syncInitHook != nil {
437-
d.syncInitHook(origin, latest)
433+
d.syncInitHook(origin, height)
438434
}
439435
return d.spawnSync(origin+1,
440436
func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
@@ -952,7 +948,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
952948

953949
// fetchHeight retrieves the head header of the remote peer to aid in estimating
954950
// the total time a pending synchronisation would take.
955-
func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
951+
func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
956952
glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
957953

958954
// Request the advertised remote head block and wait for the response
@@ -962,7 +958,7 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
962958
for {
963959
select {
964960
case <-d.cancelCh:
965-
return 0, errCancelBlockFetch
961+
return nil, errCancelBlockFetch
966962

967963
case packet := <-d.headerCh:
968964
// Discard anything not from the origin peer
@@ -974,13 +970,13 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
974970
headers := packet.(*headerPack).headers
975971
if len(headers) != 1 {
976972
glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
977-
return 0, errBadPeer
973+
return nil, errBadPeer
978974
}
979-
return headers[0].Number.Uint64(), nil
975+
return headers[0], nil
980976

981977
case <-timeout:
982978
glog.V(logger.Debug).Infof("%v: head header timeout", p)
983-
return 0, errTimeout
979+
return nil, errTimeout
984980

985981
case <-d.bodyCh:
986982
case <-d.stateCh:
@@ -1369,10 +1365,10 @@ func (d *Downloader) fetchNodeData() error {
13691365
deliver = func(packet dataPack) (int, error) {
13701366
start := time.Now()
13711367
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
1372-
// If the peer gave us nothing, stalling fast sync, drop
1373-
if delivered == 0 {
1374-
glog.V(logger.Debug).Infof("peer %s: stalling state delivery, dropping", packet.PeerId())
1375-
d.dropPeer(packet.PeerId())
1368+
// If the peer returned old-requested data, forgive
1369+
if err == trie.ErrNotRequested {
1370+
glog.V(logger.Info).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId())
1371+
return
13761372
}
13771373
if err != nil {
13781374
// If the node data processing failed, the root hash is very wrong, abort
@@ -1381,17 +1377,21 @@ func (d *Downloader) fetchNodeData() error {
13811377
return
13821378
}
13831379
// Processing succeeded, notify state fetcher of continuation
1384-
if d.queue.PendingNodeData() > 0 {
1380+
pending := d.queue.PendingNodeData()
1381+
if pending > 0 {
13851382
select {
13861383
case d.stateWakeCh <- true:
13871384
default:
13881385
}
13891386
}
1390-
// Log a message to the user and return
13911387
d.syncStatsLock.Lock()
1392-
defer d.syncStatsLock.Unlock()
13931388
d.syncStatsStateDone += uint64(delivered)
1394-
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
1389+
d.syncStatsLock.Unlock()
1390+
1391+
// Log a message to the user and return
1392+
if delivered > 0 {
1393+
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d, pending at least %d", delivered, time.Since(start), d.syncStatsStateDone, pending)
1394+
}
13951395
})
13961396
}
13971397
expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }

eth/downloader/queue.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1262,13 +1262,19 @@ func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error,
12621262

12631263
// Prepare configures the result cache to allow accepting and caching inbound
12641264
// fetch results.
1265-
func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64) {
1265+
func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) {
12661266
q.lock.Lock()
12671267
defer q.lock.Unlock()
12681268

1269+
// Prepare the queue for sync results
12691270
if q.resultOffset < offset {
12701271
q.resultOffset = offset
12711272
}
12721273
q.fastSyncPivot = pivot
12731274
q.mode = mode
1275+
1276+
// If long running fast sync, also start up a head stateretrieval immediately
1277+
if mode == FastSync && pivot > 0 {
1278+
q.stateScheduler = state.NewStateSync(head.Root, q.stateDatabase)
1279+
}
12741280
}

trie/sync.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717
package trie
1818

1919
import (
20+
"errors"
2021
"fmt"
2122

2223
"github.com/ethereum/go-ethereum/common"
2324
"github.com/ethereum/go-ethereum/ethdb"
2425
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
2526
)
2627

28+
// ErrNotRequested is returned by the trie sync when it's requested to process a
29+
// node it did not request.
30+
var ErrNotRequested = errors.New("not requested")
31+
2732
// request represents a scheduled or already in-flight state retrieval request.
2833
type request struct {
2934
hash common.Hash // Hash of the node data content to retrieve
@@ -144,7 +149,7 @@ func (s *TrieSync) Process(results []SyncResult) (int, error) {
144149
// If the item was not requested, bail out
145150
request := s.requests[item.Hash]
146151
if request == nil {
147-
return i, fmt.Errorf("not requested: %x", item.Hash)
152+
return i, ErrNotRequested
148153
}
149154
// If the item is a raw entry request, commit directly
150155
if request.object == nil {

0 commit comments

Comments
 (0)