Skip to content

Commit 0de13b0

Browse files
committed
Merge pull request #1102 from karalabe/maintain-block-origins
eth, eth/downloader: surface downloaded block origin, drop on error
2 parents 5044eb4 + eafdc1f commit 0de13b0

File tree

5 files changed

+39
-23
lines changed

5 files changed

+39
-23
lines changed

eth/downloader/downloader.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ type Downloader struct {
9393
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
9494
}
9595

96+
// Block is an origin-tagged blockchain block.
97+
type Block struct {
98+
RawBlock *types.Block
99+
OriginPeer string
100+
}
101+
96102
func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
97103
downloader := &Downloader{
98104
mux: mux,
@@ -177,7 +183,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
177183
}
178184

179185
// TakeBlocks takes blocks from the queue and yields them to the caller.
180-
func (d *Downloader) TakeBlocks() types.Blocks {
186+
func (d *Downloader) TakeBlocks() []*Block {
181187
return d.queue.TakeBlocks()
182188
}
183189

eth/downloader/downloader_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,10 @@ func (dl *downloadTester) sync(peerId string, head common.Hash) error {
8888
// syncTake is starts synchronising with a remote peer, but concurrently it also
8989
// starts fetching blocks that the downloader retrieved. IT blocks until both go
9090
// routines terminate.
91-
func (dl *downloadTester) syncTake(peerId string, head common.Hash) (types.Blocks, error) {
91+
func (dl *downloadTester) syncTake(peerId string, head common.Hash) ([]*Block, error) {
9292
// Start a block collector to take blocks as they become available
9393
done := make(chan struct{})
94-
took := []*types.Block{}
94+
took := []*Block{}
9595
go func() {
9696
for running := true; running; {
9797
select {
@@ -349,7 +349,7 @@ func TestNonExistingParentAttack(t *testing.T) {
349349
if len(bs) != 1 {
350350
t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
351351
}
352-
if tester.hasBlock(bs[0].ParentHash()) {
352+
if tester.hasBlock(bs[0].RawBlock.ParentHash()) {
353353
t.Fatalf("tester knows about the unknown hash")
354354
}
355355
tester.downloader.Cancel()
@@ -364,7 +364,7 @@ func TestNonExistingParentAttack(t *testing.T) {
364364
if len(bs) != 1 {
365365
t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
366366
}
367-
if !tester.hasBlock(bs[0].ParentHash()) {
367+
if !tester.hasBlock(bs[0].RawBlock.ParentHash()) {
368368
t.Fatalf("tester doesn't know about the origin hash")
369369
}
370370
}

eth/downloader/queue.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ type queue struct {
3636
pendPool map[string]*fetchRequest // Currently pending block retrieval operations
3737

3838
blockPool map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes
39-
blockCache []*types.Block // Downloaded but not yet delivered blocks
39+
blockCache []*Block // Downloaded but not yet delivered blocks
4040
blockOffset int // Offset of the first cached block in the block-chain
4141

4242
lock sync.RWMutex
@@ -148,7 +148,7 @@ func (q *queue) Insert(hashes []common.Hash) []common.Hash {
148148

149149
// GetHeadBlock retrieves the first block from the cache, or nil if it hasn't
150150
// been downloaded yet (or simply non existent).
151-
func (q *queue) GetHeadBlock() *types.Block {
151+
func (q *queue) GetHeadBlock() *Block {
152152
q.lock.RLock()
153153
defer q.lock.RUnlock()
154154

@@ -159,7 +159,7 @@ func (q *queue) GetHeadBlock() *types.Block {
159159
}
160160

161161
// GetBlock retrieves a downloaded block, or nil if non-existent.
162-
func (q *queue) GetBlock(hash common.Hash) *types.Block {
162+
func (q *queue) GetBlock(hash common.Hash) *Block {
163163
q.lock.RLock()
164164
defer q.lock.RUnlock()
165165

@@ -176,18 +176,18 @@ func (q *queue) GetBlock(hash common.Hash) *types.Block {
176176
}
177177

178178
// TakeBlocks retrieves and permanently removes a batch of blocks from the cache.
179-
func (q *queue) TakeBlocks() types.Blocks {
179+
func (q *queue) TakeBlocks() []*Block {
180180
q.lock.Lock()
181181
defer q.lock.Unlock()
182182

183183
// Accumulate all available blocks
184-
var blocks types.Blocks
184+
blocks := []*Block{}
185185
for _, block := range q.blockCache {
186186
if block == nil {
187187
break
188188
}
189189
blocks = append(blocks, block)
190-
delete(q.blockPool, block.Hash())
190+
delete(q.blockPool, block.RawBlock.Hash())
191191
}
192192
// Delete the blocks from the slice and let them be garbage collected
193193
// without this slice trick the blocks would stay in memory until nil
@@ -312,8 +312,10 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
312312
return ErrInvalidChain
313313
}
314314
// Otherwise merge the block and mark the hash block
315-
q.blockCache[index] = block
316-
315+
q.blockCache[index] = &Block{
316+
RawBlock: block,
317+
OriginPeer: id,
318+
}
317319
delete(request.Hashes, hash)
318320
delete(q.hashPool, hash)
319321
q.blockPool[hash] = int(block.NumberU64())
@@ -342,6 +344,6 @@ func (q *queue) Alloc(offset int) {
342344
size = blockCacheLimit
343345
}
344346
if len(q.blockCache) < size {
345-
q.blockCache = append(q.blockCache, make([]*types.Block, size-len(q.blockCache))...)
347+
q.blockCache = append(q.blockCache, make([]*Block, size-len(q.blockCache))...)
346348
}
347349
}

eth/handler.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,13 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
9292
return manager
9393
}
9494

95-
func (pm *ProtocolManager) removePeer(peer *peer) {
95+
func (pm *ProtocolManager) removePeer(id string) {
9696
// Unregister the peer from the downloader
97-
pm.downloader.UnregisterPeer(peer.id)
97+
pm.downloader.UnregisterPeer(id)
9898

9999
// Remove the peer from the Ethereum peer set too
100-
glog.V(logger.Detail).Infoln("Removing peer", peer.id)
101-
if err := pm.peers.Unregister(peer.id); err != nil {
100+
glog.V(logger.Detail).Infoln("Removing peer", id)
101+
if err := pm.peers.Unregister(id); err != nil {
102102
glog.V(logger.Error).Infoln("Removal failed:", err)
103103
}
104104
}
@@ -148,7 +148,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
148148
glog.V(logger.Error).Infoln("Addition failed:", err)
149149
return err
150150
}
151-
defer pm.removePeer(p)
151+
defer pm.removePeer(p.id)
152152

153153
if err := pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks); err != nil {
154154
return err
@@ -315,7 +315,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
315315
if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
316316
glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error")
317317

318-
self.removePeer(p)
318+
self.removePeer(p.id)
319319

320320
return nil
321321
}

eth/sync.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"sync/atomic"
66
"time"
77

8+
"github.com/ethereum/go-ethereum/core/types"
89
"github.com/ethereum/go-ethereum/eth/downloader"
910
"github.com/ethereum/go-ethereum/logger"
1011
"github.com/ethereum/go-ethereum/logger/glog"
@@ -57,13 +58,20 @@ func (pm *ProtocolManager) processBlocks() error {
5758
if len(blocks) == 0 {
5859
return nil
5960
}
60-
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
61+
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
6162

6263
for len(blocks) != 0 && !pm.quit {
64+
// Retrieve the first batch of blocks to insert
6365
max := int(math.Min(float64(len(blocks)), float64(blockProcAmount)))
64-
_, err := pm.chainman.InsertChain(blocks[:max])
66+
raw := make(types.Blocks, 0, max)
67+
for _, block := range blocks[:max] {
68+
raw = append(raw, block.RawBlock)
69+
}
70+
// Try to inset the blocks, drop the originating peer if there's an error
71+
index, err := pm.chainman.InsertChain(raw)
6572
if err != nil {
6673
glog.V(logger.Warn).Infof("Block insertion failed: %v", err)
74+
pm.removePeer(blocks[index].OriginPeer)
6775
pm.downloader.Cancel()
6876
return err
6977
}
@@ -105,7 +113,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
105113

106114
case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed:
107115
glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err)
108-
pm.removePeer(peer)
116+
pm.removePeer(peer.id)
109117

110118
case downloader.ErrPendingQueue:
111119
glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)

0 commit comments

Comments
 (0)