Skip to content

Commit 5f341e5

Browse files
committed
Merge pull request #1212 from fjl/p2p-eth-block-timeout
eth, p2p: improve write timeouts and behaviour under load
2 parents fda49f2 + 73c3555 commit 5f341e5

File tree

8 files changed

+341
-362
lines changed

8 files changed

+341
-362
lines changed

core/transaction_pool.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
247247
}
248248

249249
// GetTransactions returns all currently processable transactions.
250+
// The returned slice may be modified by the caller.
250251
func (self *TxPool) GetTransactions() (txs types.Transactions) {
251252
self.mu.Lock()
252253
defer self.mu.Unlock()

core/types/transaction.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ func (tx *Transaction) Hash() common.Hash {
6767
})
6868
}
6969

70+
// Size returns the encoded RLP size of tx.
71+
func (self *Transaction) Size() common.StorageSize {
72+
c := writeCounter(0)
73+
rlp.Encode(&c, self)
74+
return common.StorageSize(c)
75+
}
76+
7077
func (self *Transaction) Data() []byte {
7178
return self.Payload
7279
}

eth/downloader/downloader.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -263,23 +263,29 @@ func (d *Downloader) Cancel() bool {
263263

264264
// XXX Make synchronous
265265
func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
266-
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
267-
268-
start := time.Now()
269-
270-
// Add the hash to the queue first, and start hash retrieval
271-
d.queue.Insert([]common.Hash{h})
272-
p.getHashes(h)
273-
274266
var (
267+
start = time.Now()
275268
active = p // active peer will help determine the current active peer
276269
head = common.Hash{} // common and last hash
277270

278-
timeout = time.NewTimer(hashTTL) // timer to dump a non-responsive active peer
271+
timeout = time.NewTimer(0) // timer to dump a non-responsive active peer
279272
attempted = make(map[string]bool) // attempted peers will help with retries
280273
crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks
281274
)
282275
defer crossTicker.Stop()
276+
defer timeout.Stop()
277+
278+
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
279+
<-timeout.C // timeout channel should be initially empty.
280+
281+
getHashes := func(from common.Hash) {
282+
active.getHashes(from)
283+
timeout.Reset(hashTTL)
284+
}
285+
286+
// Add the hash to the queue, and start hash retrieval.
287+
d.queue.Insert([]common.Hash{h})
288+
getHashes(h)
283289

284290
attempted[p.id] = true
285291
for finished := false; !finished; {
@@ -293,7 +299,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
293299
glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
294300
break
295301
}
296-
timeout.Reset(hashTTL)
302+
timeout.Stop()
297303

298304
// Make sure the peer actually gave something valid
299305
if len(hashPack.hashes) == 0 {
@@ -345,7 +351,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
345351
active.getBlocks([]common.Hash{origin})
346352

347353
// Also fetch a fresh
348-
active.getHashes(head)
354+
getHashes(head)
349355
continue
350356
}
351357
// We're done, prepare the download cache and proceed pulling the blocks
@@ -399,7 +405,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
399405
// set p to the active peer. this will invalidate any hashes that may be returned
400406
// by our previous (delayed) peer.
401407
active = p
402-
p.getHashes(head)
408+
getHashes(head)
403409
glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)", p.id)
404410
}
405411
}

eth/handler.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ import (
1818
"github.com/ethereum/go-ethereum/rlp"
1919
)
2020

21+
// This is the target maximum size of returned blocks for the
22+
// getBlocks message. The reply message may exceed it
23+
// if a single block is larger than the limit.
24+
const maxBlockRespSize = 2 * 1024 * 1024
25+
2126
func errResp(code errCode, format string, v ...interface{}) error {
2227
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
2328
}
@@ -48,9 +53,11 @@ type ProtocolManager struct {
4853
txSub event.Subscription
4954
minedBlockSub event.Subscription
5055

56+
// channels for fetcher, syncer, txsyncLoop
5157
newPeerCh chan *peer
5258
newHashCh chan []*blockAnnounce
5359
newBlockCh chan chan []*types.Block
60+
txsyncCh chan *txsync
5461
quitSync chan struct{}
5562

5663
// wait group is used for graceful shutdowns during downloading
@@ -71,9 +78,9 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
7178
newPeerCh: make(chan *peer, 1),
7279
newHashCh: make(chan []*blockAnnounce, 1),
7380
newBlockCh: make(chan chan []*types.Block),
81+
txsyncCh: make(chan *txsync),
7482
quitSync: make(chan struct{}),
7583
}
76-
7784
manager.SubProtocol = p2p.Protocol{
7885
Name: "eth",
7986
Version: uint(protocolVersion),
@@ -113,13 +120,14 @@ func (pm *ProtocolManager) Start() {
113120
// broadcast transactions
114121
pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
115122
go pm.txBroadcastLoop()
116-
117123
// broadcast mined blocks
118124
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
119125
go pm.minedBroadcastLoop()
120126

127+
// start sync handlers
121128
go pm.syncer()
122129
go pm.fetcher()
130+
go pm.txsyncLoop()
123131
}
124132

125133
func (pm *ProtocolManager) Stop() {
@@ -130,7 +138,7 @@ func (pm *ProtocolManager) Stop() {
130138
pm.quit = true
131139
pm.txSub.Unsubscribe() // quits txBroadcastLoop
132140
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
133-
close(pm.quitSync) // quits the sync handler
141+
close(pm.quitSync) // quits syncer, fetcher, txsyncLoop
134142

135143
// Wait for any process action
136144
pm.wg.Wait()
@@ -145,26 +153,29 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter
145153
}
146154

147155
func (pm *ProtocolManager) handle(p *peer) error {
148-
// Execute the Ethereum handshake, short circuit if fails
156+
// Execute the Ethereum handshake.
149157
if err := p.handleStatus(); err != nil {
150158
return err
151159
}
152-
// Register the peer locally and in the downloader too
160+
161+
// Register the peer locally.
153162
glog.V(logger.Detail).Infoln("Adding peer", p.id)
154163
if err := pm.peers.Register(p); err != nil {
155164
glog.V(logger.Error).Infoln("Addition failed:", err)
156165
return err
157166
}
158167
defer pm.removePeer(p.id)
159168

169+
// Register the peer in the downloader. If the downloader
170+
// considers it banned, we disconnect.
160171
if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil {
161172
return err
162173
}
163-
// propagate existing transactions. new transactions appearing
174+
175+
// Propagate existing transactions. new transactions appearing
164176
// after this will be sent via broadcasts.
165-
if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil {
166-
return err
167-
}
177+
pm.syncTransactions(p)
178+
168179
// main loop. handle incoming messages.
169180
for {
170181
if err := pm.handleMsg(p); err != nil {
@@ -246,7 +257,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
246257
if _, err := msgStream.List(); err != nil {
247258
return err
248259
}
249-
var i int
260+
var (
261+
i int
262+
totalsize common.StorageSize
263+
)
250264
for {
251265
i++
252266
var hash common.Hash
@@ -260,8 +274,9 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
260274
block := self.chainman.GetBlock(hash)
261275
if block != nil {
262276
blocks = append(blocks, block)
277+
totalsize += block.Size()
263278
}
264-
if i == downloader.MaxBlockFetch {
279+
if i == downloader.MaxBlockFetch || totalsize > maxBlockRespSize {
265280
break
266281
}
267282
}

eth/protocol.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,12 @@ var errorToString = map[int]string{
5757
ErrSuspendedPeer: "Suspended peer",
5858
}
5959

60-
// backend is the interface the ethereum protocol backend should implement
61-
// used as an argument to EthProtocol
6260
type txPool interface {
61+
// AddTransactions should add the given transactions to the pool.
6362
AddTransactions([]*types.Transaction)
63+
64+
// GetTransactions should return pending transactions.
65+
// The slice should be modifiable by the caller.
6466
GetTransactions() types.Transactions
6567
}
6668

0 commit comments

Comments
 (0)