Skip to content

Commit 6c73a59

Browse files
committed
eth: limit number of sent transactions based on message size
Nodes that are out of sync will queue many transactions, which causes the initial transactions message to grow very large. Larger transactions messages can make communication impossible if the message is too big to send. Big transactions messages also exhaust egress bandwidth, which degrades other peer connections. The new approach to combat these issues is to send transactions in smaller batches. This commit introduces a new goroutine that handles delivery of all initial transaction transfers. Size-limited packs of transactions are sent to one peer at a time, conserving precious egress bandwidth.
1 parent 41b2008 commit 6c73a59

File tree

2 files changed

+109
-9
lines changed

2 files changed

+109
-9
lines changed

eth/handler.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,11 @@ type ProtocolManager struct {
5353
txSub event.Subscription
5454
minedBlockSub event.Subscription
5555

56+
// channels for fetcher, syncer, txsyncLoop
5657
newPeerCh chan *peer
5758
newHashCh chan []*blockAnnounce
5859
newBlockCh chan chan []*types.Block
60+
txsyncCh chan *txsync
5961
quitSync chan struct{}
6062

6163
// wait group is used for graceful shutdowns during downloading
@@ -76,9 +78,9 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
7678
newPeerCh: make(chan *peer, 1),
7779
newHashCh: make(chan []*blockAnnounce, 1),
7880
newBlockCh: make(chan chan []*types.Block),
81+
txsyncCh: make(chan *txsync),
7982
quitSync: make(chan struct{}),
8083
}
81-
8284
manager.SubProtocol = p2p.Protocol{
8385
Name: "eth",
8486
Version: uint(protocolVersion),
@@ -118,13 +120,14 @@ func (pm *ProtocolManager) Start() {
118120
// broadcast transactions
119121
pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
120122
go pm.txBroadcastLoop()
121-
122123
// broadcast mined blocks
123124
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
124125
go pm.minedBroadcastLoop()
125126

127+
// start sync handlers
126128
go pm.syncer()
127129
go pm.fetcher()
130+
go pm.txsyncLoop()
128131
}
129132

130133
func (pm *ProtocolManager) Stop() {
@@ -135,7 +138,7 @@ func (pm *ProtocolManager) Stop() {
135138
pm.quit = true
136139
pm.txSub.Unsubscribe() // quits txBroadcastLoop
137140
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
138-
close(pm.quitSync) // quits the sync handler
141+
close(pm.quitSync) // quits syncer, fetcher, txsyncLoop
139142

140143
// Wait for any process action
141144
pm.wg.Wait()
@@ -150,26 +153,29 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter
150153
}
151154

152155
func (pm *ProtocolManager) handle(p *peer) error {
153-
// Execute the Ethereum handshake, short circuit if fails
156+
// Execute the Ethereum handshake.
154157
if err := p.handleStatus(); err != nil {
155158
return err
156159
}
157-
// Register the peer locally and in the downloader too
160+
161+
// Register the peer locally.
158162
glog.V(logger.Detail).Infoln("Adding peer", p.id)
159163
if err := pm.peers.Register(p); err != nil {
160164
glog.V(logger.Error).Infoln("Addition failed:", err)
161165
return err
162166
}
163167
defer pm.removePeer(p.id)
164168

169+
// Register the peer in the downloader. If the downloader
170+
// considers it banned, we disconnect.
165171
if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil {
166172
return err
167173
}
168-
// propagate existing transactions. new transactions appearing
174+
175+
// Propagate existing transactions. new transactions appearing
169176
// after this will be sent via broadcasts.
170-
if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil {
171-
return err
172-
}
177+
pm.syncTransactions(p)
178+
173179
// main loop. handle incoming messages.
174180
for {
175181
if err := pm.handleMsg(p); err != nil {

eth/sync.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package eth
22

33
import (
44
"math"
5+
"math/rand"
56
"sync/atomic"
67
"time"
78

@@ -10,6 +11,7 @@ import (
1011
"github.com/ethereum/go-ethereum/eth/downloader"
1112
"github.com/ethereum/go-ethereum/logger"
1213
"github.com/ethereum/go-ethereum/logger/glog"
14+
"github.com/ethereum/go-ethereum/p2p/discover"
1315
)
1416

1517
const (
@@ -20,6 +22,10 @@ const (
2022
notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block
2123
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
2224
blockProcAmount = 256
25+
26+
// This is the target size for the packs of transactions sent by txsyncLoop.
27+
// A pack can get larger than this if a single transactions exceeds this size.
28+
txsyncPackSize = 100 * 1024
2329
)
2430

2531
// blockAnnounce is the hash notification of the availability of a new block in
@@ -30,6 +36,94 @@ type blockAnnounce struct {
3036
time time.Time
3137
}
3238

39+
type txsync struct {
40+
p *peer
41+
txs []*types.Transaction
42+
}
43+
44+
// syncTransactions starts sending all currently pending transactions to the given peer.
45+
func (pm *ProtocolManager) syncTransactions(p *peer) {
46+
txs := pm.txpool.GetTransactions()
47+
if len(txs) == 0 {
48+
return
49+
}
50+
select {
51+
case pm.txsyncCh <- &txsync{p, txs}:
52+
case <-pm.quitSync:
53+
}
54+
}
55+
56+
// txsyncLoop takes care of the initial transaction sync for each new
57+
// connection. When a new peer appears, we relay all currently pending
58+
// transactions. In order to minimise egress bandwidth usage, we send
59+
// the transactions in small packs to one peer at a time.
60+
func (pm *ProtocolManager) txsyncLoop() {
61+
var (
62+
pending = make(map[discover.NodeID]*txsync)
63+
sending = false // whether a send is active
64+
pack = new(txsync) // the pack that is being sent
65+
done = make(chan error, 1) // result of the send
66+
)
67+
68+
// send starts a sending a pack of transactions from the sync.
69+
send := func(s *txsync) {
70+
// Fill pack with transactions up to the target size.
71+
size := common.StorageSize(0)
72+
pack.p = s.p
73+
pack.txs = pack.txs[:0]
74+
for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
75+
pack.txs = append(pack.txs, s.txs[i])
76+
size += s.txs[i].Size()
77+
}
78+
// Remove the transactions that will be sent.
79+
s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
80+
if len(s.txs) == 0 {
81+
delete(pending, s.p.ID())
82+
}
83+
// Send the pack in the background.
84+
glog.V(logger.Detail).Infof("%v: sending %d transactions (%v)", s.p.Peer, len(pack.txs), size)
85+
sending = true
86+
go func() { done <- pack.p.sendTransactions(pack.txs) }()
87+
}
88+
89+
// pick chooses the next pending sync.
90+
pick := func() *txsync {
91+
if len(pending) == 0 {
92+
return nil
93+
}
94+
n := rand.Intn(len(pending)) + 1
95+
for _, s := range pending {
96+
if n--; n == 0 {
97+
return s
98+
}
99+
}
100+
return nil
101+
}
102+
103+
for {
104+
select {
105+
case s := <-pm.txsyncCh:
106+
pending[s.p.ID()] = s
107+
if !sending {
108+
send(s)
109+
}
110+
case err := <-done:
111+
sending = false
112+
// Stop tracking peers that cause send failures.
113+
if err != nil {
114+
glog.V(logger.Debug).Infof("%v: tx send failed: %v", pack.p.Peer, err)
115+
delete(pending, pack.p.ID())
116+
}
117+
// Schedule the next send.
118+
if s := pick(); s != nil {
119+
send(s)
120+
}
121+
case <-pm.quitSync:
122+
return
123+
}
124+
}
125+
}
126+
33127
// fetcher is responsible for collecting hash notifications, and periodically
34128
// checking all unknown ones and individually fetching them.
35129
func (pm *ProtocolManager) fetcher() {

0 commit comments

Comments
 (0)