Skip to content

Commit 9af1f71

Browse files
cskiralyfjl
andauthored
eth: stabilize tx relay peer selection (#31714)
When maxPeers was just above some perfect square, and a few peers dropped for some reason, we changed the peer selection function. When new peers were acquired, we changed again. This PR improves the selection function, in two ways. First, it will always select sqrt(peers) to broadcast to. Second, the selection now uses siphash with a secret key, to guard against information leaks about tx source. --------- Signed-off-by: Csaba Kiraly <[email protected]> Co-authored-by: Felix Lange <[email protected]>
1 parent 3a89051 commit 9af1f71

File tree

5 files changed

+197
-51
lines changed

5 files changed

+197
-51
lines changed

eth/handler.go

Lines changed: 84 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,22 @@
1717
package eth
1818

1919
import (
20+
"cmp"
21+
crand "crypto/rand"
2022
"errors"
2123
"maps"
2224
"math"
23-
"math/big"
2425
"slices"
2526
"sync"
2627
"sync/atomic"
2728
"time"
2829

30+
"github.com/dchest/siphash"
2931
"github.com/ethereum/go-ethereum/common"
3032
"github.com/ethereum/go-ethereum/core"
3133
"github.com/ethereum/go-ethereum/core/rawdb"
3234
"github.com/ethereum/go-ethereum/core/txpool"
3335
"github.com/ethereum/go-ethereum/core/types"
34-
"github.com/ethereum/go-ethereum/crypto"
3536
"github.com/ethereum/go-ethereum/eth/downloader"
3637
"github.com/ethereum/go-ethereum/eth/ethconfig"
3738
"github.com/ethereum/go-ethereum/eth/fetcher"
@@ -119,9 +120,10 @@ type handler struct {
119120
chain *core.BlockChain
120121
maxPeers int
121122

122-
downloader *downloader.Downloader
123-
txFetcher *fetcher.TxFetcher
124-
peers *peerSet
123+
downloader *downloader.Downloader
124+
txFetcher *fetcher.TxFetcher
125+
peers *peerSet
126+
txBroadcastKey [16]byte
125127

126128
eventMux *event.TypeMux
127129
txsCh chan core.NewTxsEvent
@@ -153,6 +155,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
153155
txpool: config.TxPool,
154156
chain: config.Chain,
155157
peers: newPeerSet(),
158+
txBroadcastKey: newBroadcastChoiceKey(),
156159
requiredBlocks: config.RequiredBlocks,
157160
quitSync: make(chan struct{}),
158161
handlerDoneCh: make(chan struct{}),
@@ -480,58 +483,40 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
480483

481484
txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly
482485
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
483-
)
484-
// Broadcast transactions to a batch of peers not knowing about it
485-
direct := big.NewInt(int64(math.Sqrt(float64(h.peers.len())))) // Approximate number of peers to broadcast to
486-
if direct.BitLen() == 0 {
487-
direct = big.NewInt(1)
488-
}
489-
total := new(big.Int).Exp(direct, big.NewInt(2), nil) // Stabilise total peer count a bit based on sqrt peers
490486

491-
var (
492-
signer = types.LatestSigner(h.chain.Config()) // Don't care about chain status, we just need *a* sender
493-
hasher = crypto.NewKeccakState()
494-
hash = make([]byte, 32)
487+
signer = types.LatestSigner(h.chain.Config())
488+
choice = newBroadcastChoice(h.nodeID, h.txBroadcastKey)
489+
peers = h.peers.all()
495490
)
491+
496492
for _, tx := range txs {
497-
var maybeDirect bool
493+
var directSet map[*ethPeer]struct{}
498494
switch {
499495
case tx.Type() == types.BlobTxType:
500496
blobTxs++
501497
case tx.Size() > txMaxBroadcastSize:
502498
largeTxs++
503499
default:
504-
maybeDirect = true
500+
// Get transaction sender address. Here we can ignore any error
501+
// since we're just interested in any value.
502+
txSender, _ := types.Sender(signer, tx)
503+
directSet = choice.choosePeers(peers, txSender)
505504
}
506-
// Send the transaction (if it's small enough) directly to a subset of
507-
// the peers that have not received it yet, ensuring that the flow of
508-
// transactions is grouped by account to (try and) avoid nonce gaps.
509-
//
510-
// To do this, we hash the local enode IW with together with a peer's
511-
// enode ID together with the transaction sender and broadcast if
512-
// `sha(self, peer, sender) mod peers < sqrt(peers)`.
513-
for _, peer := range h.peers.peersWithoutTransaction(tx.Hash()) {
514-
var broadcast bool
515-
if maybeDirect {
516-
hasher.Reset()
517-
hasher.Write(h.nodeID.Bytes())
518-
hasher.Write(peer.Node().ID().Bytes())
519-
520-
from, _ := types.Sender(signer, tx) // Ignore error, we only use the addr as a propagation target splitter
521-
hasher.Write(from.Bytes())
522-
523-
hasher.Read(hash)
524-
if new(big.Int).Mod(new(big.Int).SetBytes(hash), total).Cmp(direct) < 0 {
525-
broadcast = true
526-
}
505+
506+
for _, peer := range peers {
507+
if peer.KnownTransaction(tx.Hash()) {
508+
continue
527509
}
528-
if broadcast {
510+
if _, ok := directSet[peer]; ok {
511+
// Send direct.
529512
txset[peer] = append(txset[peer], tx.Hash())
530513
} else {
514+
// Send announcement.
531515
annos[peer] = append(annos[peer], tx.Hash())
532516
}
533517
}
534518
}
519+
535520
for peer, hashes := range txset {
536521
directCount += len(hashes)
537522
peer.AsyncSendTransactions(hashes)
@@ -696,3 +681,62 @@ func (st *blockRangeState) stop() {
696681
func (st *blockRangeState) currentRange() eth.BlockRangeUpdatePacket {
697682
return *st.next.Load()
698683
}
684+
685+
// broadcastChoice implements a deterministic random choice of peers. This is designed
686+
// specifically for choosing which peer receives a direct broadcast of a transaction.
687+
//
688+
// The choice is made based on the involved p2p node IDs and the transaction sender,
689+
// ensuring that the flow of transactions is grouped by account to (try and) avoid nonce
690+
// gaps.
691+
type broadcastChoice struct {
692+
self enode.ID
693+
key [16]byte
694+
buffer map[*ethPeer]struct{}
695+
tmp []broadcastPeer
696+
}
697+
698+
type broadcastPeer struct {
699+
p *ethPeer
700+
score uint64
701+
}
702+
703+
func newBroadcastChoiceKey() (k [16]byte) {
704+
crand.Read(k[:])
705+
return k
706+
}
707+
708+
func newBroadcastChoice(self enode.ID, key [16]byte) *broadcastChoice {
709+
return &broadcastChoice{
710+
self: self,
711+
key: key,
712+
buffer: make(map[*ethPeer]struct{}),
713+
}
714+
}
715+
716+
// choosePeers selects the peers that will receive a direct transaction broadcast message.
717+
// Note the return value will only stay valid until the next call to choosePeers.
718+
func (bc *broadcastChoice) choosePeers(peers []*ethPeer, txSender common.Address) map[*ethPeer]struct{} {
719+
// Compute randomized scores.
720+
bc.tmp = slices.Grow(bc.tmp[:0], len(peers))[:len(peers)]
721+
hash := siphash.New(bc.key[:])
722+
for i, peer := range peers {
723+
hash.Reset()
724+
hash.Write(bc.self[:])
725+
hash.Write(peer.Peer.Peer.ID().Bytes())
726+
hash.Write(txSender[:])
727+
bc.tmp[i] = broadcastPeer{peer, hash.Sum64()}
728+
}
729+
730+
// Sort by score.
731+
slices.SortFunc(bc.tmp, func(a, b broadcastPeer) int {
732+
return cmp.Compare(a.score, b.score)
733+
})
734+
735+
// Take top n.
736+
clear(bc.buffer)
737+
n := int(math.Ceil(math.Sqrt(float64(len(bc.tmp)))))
738+
for i := range n {
739+
bc.buffer[bc.tmp[i].p] = struct{}{}
740+
}
741+
return bc.buffer
742+
}

eth/handler_test.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717
package eth
1818

1919
import (
20+
"maps"
2021
"math/big"
22+
"math/rand"
2123
"sort"
2224
"sync"
25+
"testing"
2326

2427
"github.com/ethereum/go-ethereum/common"
2528
"github.com/ethereum/go-ethereum/consensus/ethash"
@@ -29,8 +32,11 @@ import (
2932
"github.com/ethereum/go-ethereum/core/types"
3033
"github.com/ethereum/go-ethereum/crypto"
3134
"github.com/ethereum/go-ethereum/eth/ethconfig"
35+
"github.com/ethereum/go-ethereum/eth/protocols/eth"
3236
"github.com/ethereum/go-ethereum/ethdb"
3337
"github.com/ethereum/go-ethereum/event"
38+
"github.com/ethereum/go-ethereum/p2p"
39+
"github.com/ethereum/go-ethereum/p2p/enode"
3440
"github.com/ethereum/go-ethereum/params"
3541
"github.com/ethereum/go-ethereum/rlp"
3642
"github.com/holiman/uint256"
@@ -212,3 +218,102 @@ func (b *testHandler) close() {
212218
b.handler.Stop()
213219
b.chain.Stop()
214220
}
221+
222+
func TestBroadcastChoice(t *testing.T) {
223+
self := enode.HexID("1111111111111111111111111111111111111111111111111111111111111111")
224+
choice49 := newBroadcastChoice(self, [16]byte{1})
225+
choice50 := newBroadcastChoice(self, [16]byte{1})
226+
227+
// Create test peers and random tx sender addresses.
228+
rand := rand.New(rand.NewSource(33))
229+
txsenders := make([]common.Address, 400)
230+
for i := range txsenders {
231+
rand.Read(txsenders[i][:])
232+
}
233+
peers := createTestPeers(rand, 50)
234+
defer closePeers(peers)
235+
236+
// Evaluate choice49 first.
237+
expectedCount := 7 // sqrt(49)
238+
var chosen49 = make([]map[*ethPeer]struct{}, len(txsenders))
239+
for i, txSender := range txsenders {
240+
set := choice49.choosePeers(peers[:49], txSender)
241+
chosen49[i] = maps.Clone(set)
242+
243+
// Sanity check choices. Here we check that the function selects different peers
244+
// for different transaction senders.
245+
if len(set) != expectedCount {
246+
t.Fatalf("choice49 produced wrong count %d, want %d", len(set), expectedCount)
247+
}
248+
if i > 0 && maps.Equal(set, chosen49[i-1]) {
249+
t.Errorf("choice49 for tx %d is equal to tx %d", i, i-1)
250+
}
251+
}
252+
253+
// Evaluate choice50 for the same peers and transactions. It should always yield more
254+
// peers than choice49, and the chosen set should be a superset of choice49's.
255+
for i, txSender := range txsenders {
256+
set := choice50.choosePeers(peers[:50], txSender)
257+
if len(set) < len(chosen49[i]) {
258+
t.Errorf("for tx %d, choice50 has less peers than choice49", i)
259+
}
260+
for p := range chosen49[i] {
261+
if _, ok := set[p]; !ok {
262+
t.Errorf("for tx %d, choice50 did not choose peer %v, but choice49 did", i, p.ID())
263+
}
264+
}
265+
}
266+
}
267+
268+
func BenchmarkBroadcastChoice(b *testing.B) {
269+
b.Run("50", func(b *testing.B) {
270+
benchmarkBroadcastChoice(b, 50)
271+
})
272+
b.Run("200", func(b *testing.B) {
273+
benchmarkBroadcastChoice(b, 200)
274+
})
275+
b.Run("500", func(b *testing.B) {
276+
benchmarkBroadcastChoice(b, 500)
277+
})
278+
}
279+
280+
// This measures the overhead of sending one transaction to N peers.
281+
func benchmarkBroadcastChoice(b *testing.B, npeers int) {
282+
rand := rand.New(rand.NewSource(33))
283+
peers := createTestPeers(rand, npeers)
284+
defer closePeers(peers)
285+
286+
txsenders := make([]common.Address, b.N)
287+
for i := range txsenders {
288+
rand.Read(txsenders[i][:])
289+
}
290+
291+
self := enode.HexID("1111111111111111111111111111111111111111111111111111111111111111")
292+
choice := newBroadcastChoice(self, [16]byte{1})
293+
294+
b.ResetTimer()
295+
for i := range b.N {
296+
set := choice.choosePeers(peers, txsenders[i])
297+
if len(set) == 0 {
298+
b.Fatal("empty result")
299+
}
300+
}
301+
}
302+
303+
func createTestPeers(rand *rand.Rand, n int) []*ethPeer {
304+
peers := make([]*ethPeer, n)
305+
for i := range peers {
306+
var id enode.ID
307+
rand.Read(id[:])
308+
p2pPeer := p2p.NewPeer(id, "test", nil)
309+
ep := eth.NewPeer(eth.ETH69, p2pPeer, nil, nil)
310+
peers[i] = &ethPeer{Peer: ep}
311+
}
312+
return peers
313+
}
314+
315+
func closePeers(peers []*ethPeer) {
316+
for _, p := range peers {
317+
p.Close()
318+
}
319+
}

eth/peerset.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ package eth
1919
import (
2020
"errors"
2121
"fmt"
22+
"maps"
23+
"slices"
2224
"sync"
2325

24-
"github.com/ethereum/go-ethereum/common"
2526
"github.com/ethereum/go-ethereum/eth/protocols/eth"
2627
"github.com/ethereum/go-ethereum/eth/protocols/snap"
2728
"github.com/ethereum/go-ethereum/p2p"
@@ -191,19 +192,12 @@ func (ps *peerSet) peer(id string) *ethPeer {
191192
return ps.peers[id]
192193
}
193194

194-
// peersWithoutTransaction retrieves a list of peers that do not have a given
195-
// transaction in their set of known hashes.
196-
func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer {
195+
// all returns all current peers.
196+
func (ps *peerSet) all() []*ethPeer {
197197
ps.lock.RLock()
198198
defer ps.lock.RUnlock()
199199

200-
list := make([]*ethPeer, 0, len(ps.peers))
201-
for _, p := range ps.peers {
202-
if !p.KnownTransaction(hash) {
203-
list = append(list, p)
204-
}
205-
}
206-
return list
200+
return slices.Collect(maps.Values(ps.peers))
207201
}
208202

209203
// len returns if the current number of `eth` peers in the set. Since the `snap`

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ require (
1717
github.com/crate-crypto/go-eth-kzg v1.3.0
1818
github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a
1919
github.com/davecgh/go-spew v1.1.1
20+
github.com/dchest/siphash v1.2.3
2021
github.com/deckarep/golang-set/v2 v2.6.0
2122
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1
2223
github.com/donovanhide/eventsource v0.0.0-20210830082556-c59027999da0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV
8787
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
8888
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
8989
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
90+
github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA=
91+
github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc=
9092
github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM=
9193
github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4=
9294
github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=

0 commit comments

Comments
 (0)