Skip to content

Commit 415969f

Browse files
authored
Merge pull request #16769 from karalabe/async-broadcasts
eth: propagate blocks and transactions async
2 parents ab6bdbd + d9cee2c commit 415969f

File tree

2 files changed

+117
-12
lines changed

2 files changed

+117
-12
lines changed

eth/handler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -698,15 +698,15 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
698698
// Send the block to a subset of our peers
699699
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
700700
for _, peer := range transfer {
701-
peer.SendNewBlock(block, td)
701+
peer.AsyncSendNewBlock(block, td)
702702
}
703703
log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
704704
return
705705
}
706706
// Otherwise if the block is indeed in out own chain, announce it
707707
if pm.blockchain.HasBlock(hash, block.NumberU64()) {
708708
for _, peer := range peers {
709-
peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
709+
peer.AsyncSendNewBlockHash(block)
710710
}
711711
log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
712712
}
@@ -727,7 +727,7 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
727727
}
728728
// FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
729729
for peer, txs := range txset {
730-
peer.SendTransactions(txs)
730+
peer.AsyncSendTransactions(txs)
731731
}
732732
}
733733

eth/peer.go

Lines changed: 114 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,24 @@ var (
3737
)
3838

3939
const (
40-
maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
41-
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
40+
maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
41+
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
42+
43+
// maxQueuedTxs is the maximum number of transaction lists to queue up before
44+
// dropping broadcasts. This is a sensitive number as a transaction list might
45+
// contain a single transaction, or thousands.
46+
maxQueuedTxs = 128
47+
48+
// maxQueuedProps is the maximum number of block propagations to queue up before
49+
// dropping broadcasts. There's not much point in queueing stale blocks, so a few
50+
// that might cover uncles should be enough.
51+
maxQueuedProps = 4
52+
53+
// maxQueuedAnns is the maximum number of block announcements to queue up before
54+
// dropping broadcasts. Similarly to block propagations, there's no point to queue
55+
// above some healthy uncle limit, so use that.
56+
maxQueuedAnns = 4
57+
4258
handshakeTimeout = 5 * time.Second
4359
)
4460

@@ -50,6 +66,12 @@ type PeerInfo struct {
5066
Head string `json:"head"` // SHA3 hash of the peer's best owned block
5167
}
5268

69+
// propEvent is a block propagation, waiting for its turn in the broadcast queue.
70+
type propEvent struct {
71+
block *types.Block
72+
td *big.Int
73+
}
74+
5375
type peer struct {
5476
id string
5577

@@ -63,23 +85,64 @@ type peer struct {
6385
td *big.Int
6486
lock sync.RWMutex
6587

66-
knownTxs *set.Set // Set of transaction hashes known to be known by this peer
67-
knownBlocks *set.Set // Set of block hashes known to be known by this peer
88+
knownTxs *set.Set // Set of transaction hashes known to be known by this peer
89+
knownBlocks *set.Set // Set of block hashes known to be known by this peer
90+
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
91+
queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
92+
queuedAnns chan *types.Block // Queue of blocks to announce to the peer
93+
term chan struct{} // Termination channel to stop the broadcaster
6894
}
6995

7096
func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
71-
id := p.ID()
72-
7397
return &peer{
7498
Peer: p,
7599
rw: rw,
76100
version: version,
77-
id: fmt.Sprintf("%x", id[:8]),
101+
id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
78102
knownTxs: set.New(),
79103
knownBlocks: set.New(),
104+
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
105+
queuedProps: make(chan *propEvent, maxQueuedProps),
106+
queuedAnns: make(chan *types.Block, maxQueuedAnns),
107+
term: make(chan struct{}),
108+
}
109+
}
110+
111+
// broadcast is a write loop that multiplexes block propagations, announcements
112+
// and transaction broadcasts into the remote peer. The goal is to have an async
113+
// writer that does not lock up node internals.
114+
func (p *peer) broadcast() {
115+
for {
116+
select {
117+
case txs := <-p.queuedTxs:
118+
if err := p.SendTransactions(txs); err != nil {
119+
return
120+
}
121+
p.Log().Trace("Broadcast transactions", "count", len(txs))
122+
123+
case prop := <-p.queuedProps:
124+
if err := p.SendNewBlock(prop.block, prop.td); err != nil {
125+
return
126+
}
127+
p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)
128+
129+
case block := <-p.queuedAnns:
130+
if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
131+
return
132+
}
133+
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
134+
135+
case <-p.term:
136+
return
137+
}
80138
}
81139
}
82140

141+
// close signals the broadcast goroutine to terminate.
142+
func (p *peer) close() {
143+
close(p.term)
144+
}
145+
83146
// Info gathers and returns a collection of metadata known about a peer.
84147
func (p *peer) Info() *PeerInfo {
85148
hash, td := p.Head()
@@ -139,6 +202,19 @@ func (p *peer) SendTransactions(txs types.Transactions) error {
139202
return p2p.Send(p.rw, TxMsg, txs)
140203
}
141204

205+
// AsyncSendTransactions queues list of transactions propagation to a remote
206+
// peer. If the peer's broadcast queue is full, the event is silently dropped.
207+
func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
208+
select {
209+
case p.queuedTxs <- txs:
210+
for _, tx := range txs {
211+
p.knownTxs.Add(tx.Hash())
212+
}
213+
default:
214+
p.Log().Debug("Dropping transaction propagation", "count", len(txs))
215+
}
216+
}
217+
142218
// SendNewBlockHashes announces the availability of a number of blocks through
143219
// a hash notification.
144220
func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
@@ -153,12 +229,35 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error
153229
return p2p.Send(p.rw, NewBlockHashesMsg, request)
154230
}
155231

232+
// AsyncSendNewBlockHash queues the availability of a block for propagation to a
233+
// remote peer. If the peer's broadcast queue is full, the event is silently
234+
// dropped.
235+
func (p *peer) AsyncSendNewBlockHash(block *types.Block) {
236+
select {
237+
case p.queuedAnns <- block:
238+
p.knownBlocks.Add(block.Hash())
239+
default:
240+
p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
241+
}
242+
}
243+
156244
// SendNewBlock propagates an entire block to a remote peer.
157245
func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
158246
p.knownBlocks.Add(block.Hash())
159247
return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
160248
}
161249

250+
// AsyncSendNewBlock queues an entire block for propagation to a remote peer. If
251+
// the peer's broadcast queue is full, the event is silently dropped.
252+
func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
253+
select {
254+
case p.queuedProps <- &propEvent{block: block, td: td}:
255+
p.knownBlocks.Add(block.Hash())
256+
default:
257+
p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
258+
}
259+
}
260+
162261
// SendBlockHeaders sends a batch of block headers to the remote peer.
163262
func (p *peer) SendBlockHeaders(headers []*types.Header) error {
164263
return p2p.Send(p.rw, BlockHeadersMsg, headers)
@@ -313,7 +412,8 @@ func newPeerSet() *peerSet {
313412
}
314413

315414
// Register injects a new peer into the working set, or returns an error if the
316-
// peer is already known.
415+
// peer is already known. If a new peer it registered, its broadcast loop is also
416+
// started.
317417
func (ps *peerSet) Register(p *peer) error {
318418
ps.lock.Lock()
319419
defer ps.lock.Unlock()
@@ -325,6 +425,8 @@ func (ps *peerSet) Register(p *peer) error {
325425
return errAlreadyRegistered
326426
}
327427
ps.peers[p.id] = p
428+
go p.broadcast()
429+
328430
return nil
329431
}
330432

@@ -334,10 +436,13 @@ func (ps *peerSet) Unregister(id string) error {
334436
ps.lock.Lock()
335437
defer ps.lock.Unlock()
336438

337-
if _, ok := ps.peers[id]; !ok {
439+
p, ok := ps.peers[id]
440+
if !ok {
338441
return errNotRegistered
339442
}
340443
delete(ps.peers, id)
444+
p.close()
445+
341446
return nil
342447
}
343448

0 commit comments

Comments
 (0)