Skip to content

Commit fc7abd9

Browse files
committed
eth, eth/downloader: move block processing into the downlaoder
1 parent 0fc7187 commit fc7abd9

File tree

4 files changed

+253
-231
lines changed

4 files changed

+253
-231
lines changed

eth/downloader/downloader.go

Lines changed: 119 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package downloader
33
import (
44
"bytes"
55
"errors"
6+
"math"
67
"math/rand"
78
"sync"
89
"sync/atomic"
@@ -28,25 +29,27 @@ var (
2829
crossCheckCycle = time.Second // Period after which to check for expired cross checks
2930

3031
maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out
32+
maxBlockProcess = 256 // Number of blocks to import at once into the chain
3133
)
3234

3335
var (
34-
errBusy = errors.New("busy")
35-
errUnknownPeer = errors.New("peer is unknown or unhealthy")
36-
errBadPeer = errors.New("action from bad peer ignored")
37-
errStallingPeer = errors.New("peer is stalling")
38-
errBannedHead = errors.New("peer head hash already banned")
39-
errNoPeers = errors.New("no peers to keep download active")
40-
errPendingQueue = errors.New("pending items in queue")
41-
errTimeout = errors.New("timeout")
42-
errEmptyHashSet = errors.New("empty hash set by peer")
43-
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
44-
errAlreadyInPool = errors.New("hash already in pool")
45-
errInvalidChain = errors.New("retrieved hash chain is invalid")
46-
errCrossCheckFailed = errors.New("block cross-check failed")
47-
errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
48-
errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
49-
errNoSyncActive = errors.New("no sync active")
36+
errBusy = errors.New("busy")
37+
errUnknownPeer = errors.New("peer is unknown or unhealthy")
38+
errBadPeer = errors.New("action from bad peer ignored")
39+
errStallingPeer = errors.New("peer is stalling")
40+
errBannedHead = errors.New("peer head hash already banned")
41+
errNoPeers = errors.New("no peers to keep download active")
42+
errPendingQueue = errors.New("pending items in queue")
43+
errTimeout = errors.New("timeout")
44+
errEmptyHashSet = errors.New("empty hash set by peer")
45+
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
46+
errAlreadyInPool = errors.New("hash already in pool")
47+
errInvalidChain = errors.New("retrieved hash chain is invalid")
48+
errCrossCheckFailed = errors.New("block cross-check failed")
49+
errCancelHashFetch = errors.New("hash fetching canceled (requested)")
50+
errCancelBlockFetch = errors.New("block downloading canceled (requested)")
51+
errCancelChainImport = errors.New("chain importing canceled (requested)")
52+
errNoSyncActive = errors.New("no sync active")
5053
)
5154

5255
// hashCheckFn is a callback type for verifying a hash's presence in the local chain.
@@ -55,6 +58,9 @@ type hashCheckFn func(common.Hash) bool
5558
// blockRetrievalFn is a callback type for retrieving a block from the local chain.
5659
type blockRetrievalFn func(common.Hash) *types.Block
5760

61+
// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
62+
type chainInsertFn func(types.Blocks) (int, error)
63+
5864
// peerDropFn is a callback type for dropping a peer detected as malicious.
5965
type peerDropFn func(id string)
6066

@@ -88,13 +94,15 @@ type Downloader struct {
8894
importLock sync.Mutex
8995

9096
// Callbacks
91-
hasBlock hashCheckFn // Checks if a block is present in the chain
92-
getBlock blockRetrievalFn // Retrieves a block from the chain
93-
dropPeer peerDropFn // Retrieved the TD of our own chain
97+
hasBlock hashCheckFn // Checks if a block is present in the chain
98+
getBlock blockRetrievalFn // Retrieves a block from the chain
99+
insertChain chainInsertFn // Injects a batch of blocks into the chain
100+
dropPeer peerDropFn // Retrieved the TD of our own chain
94101

95102
// Status
96103
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
97104
synchronising int32
105+
processing int32
98106
notified int32
99107

100108
// Channels
@@ -113,18 +121,19 @@ type Block struct {
113121
}
114122

115123
// New creates a new downloader to fetch hashes and blocks from remote peers.
116-
func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, dropPeer peerDropFn) *Downloader {
124+
func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader {
117125
// Create the base downloader
118126
downloader := &Downloader{
119-
mux: mux,
120-
queue: newQueue(),
121-
peers: newPeerSet(),
122-
hasBlock: hasBlock,
123-
getBlock: getBlock,
124-
dropPeer: dropPeer,
125-
newPeerCh: make(chan *peer, 1),
126-
hashCh: make(chan hashPack, 1),
127-
blockCh: make(chan blockPack, 1),
127+
mux: mux,
128+
queue: newQueue(),
129+
peers: newPeerSet(),
130+
hasBlock: hasBlock,
131+
getBlock: getBlock,
132+
insertChain: insertChain,
133+
dropPeer: dropPeer,
134+
newPeerCh: make(chan *peer, 1),
135+
hashCh: make(chan hashPack, 1),
136+
blockCh: make(chan blockPack, 1),
128137
}
129138
// Inject all the known bad hashes
130139
downloader.banned = set.New()
@@ -157,7 +166,7 @@ func (d *Downloader) Stats() (pending int, cached int, importing int, estimate t
157166
return
158167
}
159168

160-
// Synchronising returns the state of the downloader
169+
// Synchronising returns whether the downloader is currently retrieving blocks.
161170
func (d *Downloader) Synchronising() bool {
162171
return atomic.LoadInt32(&d.synchronising) > 0
163172
}
@@ -260,19 +269,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error {
260269
return d.syncWithPeer(p, hash)
261270
}
262271

263-
// TakeBlocks takes blocks from the queue and yields them to the caller.
264-
func (d *Downloader) TakeBlocks() []*Block {
265-
blocks := d.queue.TakeBlocks()
266-
if len(blocks) > 0 {
267-
d.importLock.Lock()
268-
d.importStart = time.Now()
269-
d.importQueue = blocks
270-
d.importDone = 0
271-
d.importLock.Unlock()
272-
}
273-
return blocks
274-
}
275-
276272
// Has checks if the downloader knows about a particular hash, meaning that its
277273
// either already downloaded of pending retrieval.
278274
func (d *Downloader) Has(hash common.Hash) bool {
@@ -307,19 +303,16 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
307303

308304
// Cancel cancels all of the operations and resets the queue. It returns true
309305
// if the cancel operation was completed.
310-
func (d *Downloader) Cancel() bool {
311-
// If we're not syncing just return.
312-
hs, bs := d.queue.Size()
313-
if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 {
314-
return false
315-
}
306+
func (d *Downloader) Cancel() {
316307
// Close the current cancel channel
317308
d.cancelLock.Lock()
318-
select {
319-
case <-d.cancelCh:
320-
// Channel was already closed
321-
default:
322-
close(d.cancelCh)
309+
if d.cancelCh != nil {
310+
select {
311+
case <-d.cancelCh:
312+
// Channel was already closed
313+
default:
314+
close(d.cancelCh)
315+
}
323316
}
324317
d.cancelLock.Unlock()
325318

@@ -330,11 +323,11 @@ func (d *Downloader) Cancel() bool {
330323
d.importQueue = nil
331324
d.importDone = 0
332325
d.importLock.Unlock()
333-
334-
return true
335326
}
336327

337-
// XXX Make synchronous
328+
// fetchHahes starts retrieving hashes backwards from a specific peer and hash,
329+
// up until it finds a common ancestor. If the source peer times out, alternative
330+
// ones are tried for continuation.
338331
func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
339332
var (
340333
start = time.Now()
@@ -530,10 +523,13 @@ out:
530523
glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
531524
break
532525
}
533-
// All was successful, promote the peer
526+
// All was successful, promote the peer and potentially start processing
534527
peer.Promote()
535528
peer.SetIdle()
536529
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
530+
if atomic.LoadInt32(&d.processing) == 0 {
531+
go d.process()
532+
}
537533

538534
case errInvalidChain:
539535
// The hash chain is invalid (blocks are not ordered properly), abort
@@ -709,6 +705,71 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
709705
}
710706
}
711707

708+
// process takes blocks from the queue and tries to import them into the chain.
709+
func (d *Downloader) process() (err error) {
710+
// Make sure only one goroutine is ever allowed to process blocks at once
711+
if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
712+
return
713+
}
714+
// If the processor just exited, but there are freshly pending items, try to
715+
// reenter. This is needed because the goroutine spinned up for processing
716+
// the fresh blocks might have been rejected entry to to this present thread
717+
// not yet releasing the `processing` state.
718+
defer func() {
719+
if err == nil && d.queue.GetHeadBlock() != nil {
720+
err = d.process()
721+
}
722+
}()
723+
// Release the lock upon exit (note, before checking for reentry!)
724+
defer atomic.StoreInt32(&d.processing, 0)
725+
726+
// Fetch the current cancel channel to allow termination
727+
d.cancelLock.RLock()
728+
cancel := d.cancelCh
729+
d.cancelLock.RUnlock()
730+
731+
// Repeat the processing as long as there are blocks to import
732+
for {
733+
// Fetch the next batch of blocks
734+
blocks := d.queue.TakeBlocks()
735+
if len(blocks) == 0 {
736+
return nil
737+
}
738+
// Reset the import statistics
739+
d.importLock.Lock()
740+
d.importStart = time.Now()
741+
d.importQueue = blocks
742+
d.importDone = 0
743+
d.importLock.Unlock()
744+
745+
// Actually import the blocks
746+
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
747+
for len(blocks) != 0 { // TODO: quit
748+
// Check for any termination requests
749+
select {
750+
case <-cancel:
751+
return errCancelChainImport
752+
default:
753+
}
754+
// Retrieve the first batch of blocks to insert
755+
max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess)))
756+
raw := make(types.Blocks, 0, max)
757+
for _, block := range blocks[:max] {
758+
raw = append(raw, block.RawBlock)
759+
}
760+
// Try to inset the blocks, drop the originating peer if there's an error
761+
index, err := d.insertChain(raw)
762+
if err != nil {
763+
glog.V(logger.Debug).Infoln("Block #%d import failed:", raw[index].NumberU64(), err)
764+
d.dropPeer(blocks[index].OriginPeer)
765+
d.Cancel()
766+
return errCancelChainImport
767+
}
768+
blocks = blocks[max:]
769+
}
770+
}
771+
}
772+
712773
// DeliverBlocks injects a new batch of blocks received from a remote node.
713774
// This is usually invoked through the BlocksMsg by the protocol handler.
714775
func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {

0 commit comments

Comments
 (0)