Skip to content

Commit 86ec742

Browse files
committed
p2p/discover: improve table addition code (#18974)
This change clears up confusion around the two ways in which nodes can be added to the table. When a neighbors packet is received as a reply to findnode, the nodes contained in the reply are added as 'seen' entries if sufficient space is available. When a ping is received and the endpoint verification has taken place, the remote node is added as a 'verified' entry or moved to the front of the bucket if present. This also updates the node's IP address and port if they have changed.
1 parent d9a07fb commit 86ec742

File tree

4 files changed

+175
-62
lines changed

4 files changed

+175
-62
lines changed

p2p/discover/table.go

Lines changed: 81 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ func (tab *Table) findnode(n *node, targetKey encPubkey, reply chan<- []*node) {
328328
// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
329329
// just remove those again during revalidation.
330330
for _, n := range r {
331-
tab.add(n)
331+
tab.addSeenNode(n)
332332
}
333333
reply <- r
334334
}
@@ -443,7 +443,7 @@ func (tab *Table) loadSeedNodes() {
443443
seed := seeds[i]
444444
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
445445
log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
446-
tab.add(seed)
446+
tab.addSeenNode(seed)
447447
}
448448
}
449449

@@ -468,7 +468,7 @@ func (tab *Table) doRevalidate(done chan<- struct{}) {
468468
// The node responded, move it to the front.
469469
last.livenessChecks++
470470
log.Debug("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks)
471-
b.bump(last)
471+
tab.bumpInBucket(b, last)
472472
return
473473
}
474474
// No reply received, pick a replacement or delete the node if there aren't
@@ -551,36 +551,81 @@ func (tab *Table) bucket(id enode.ID) *bucket {
551551
return tab.buckets[d-bucketMinDistance-1]
552552
}
553553

554-
// add attempts to add the given node to its corresponding bucket. If the bucket has space
555-
// available, adding the node succeeds immediately. Otherwise, the node is added if the
556-
// least recently active node in the bucket does not respond to a ping packet.
554+
// addSeenNode adds a node which may or may not be live to the end of a bucket. If the
555+
// bucket has space available, adding the node succeeds immediately. Otherwise, the node is
556+
// added to the replacements list.
557557
//
558558
// The caller must not hold tab.mutex.
559-
func (tab *Table) add(n *node) {
559+
func (tab *Table) addSeenNode(n *node) {
560560
if n.ID() == tab.self().ID() {
561561
return
562562
}
563563

564564
tab.mutex.Lock()
565565
defer tab.mutex.Unlock()
566566
b := tab.bucket(n.ID())
567-
if !tab.bumpOrAdd(b, n) {
568-
// Node is not in table. Add it to the replacement list.
567+
if contains(b.entries, n.ID()) {
568+
// Already in bucket, don't add.
569+
return
570+
}
571+
if len(b.entries) >= bucketSize {
572+
// Bucket full, maybe add as replacement.
569573
tab.addReplacement(b, n)
574+
return
575+
}
576+
if !tab.addIP(b, n.IP()) {
577+
// Can't add: IP limit reached.
578+
return
579+
}
580+
// Add to end of bucket:
581+
b.entries = append(b.entries, n)
582+
b.replacements = deleteNode(b.replacements, n)
583+
n.addedAt = time.Now()
584+
if tab.nodeAddedHook != nil {
585+
tab.nodeAddedHook(n)
570586
}
571587
}
572588

573-
// addThroughPing adds the given node to the table. Compared to plain
574-
// 'add' there is an additional safety measure: if the table is still
575-
// initializing the node is not added. This prevents an attack where the
576-
// table could be filled by just sending ping repeatedly.
589+
// addVerifiedNode adds a node whose existence has been verified recently to the front of a
590+
// bucket. If the node is already in the bucket, it is moved to the front. If the bucket
591+
// has no space, the node is added to the replacements list.
592+
//
593+
// There is an additional safety measure: if the table is still initializing the node
594+
// is not added. This prevents an attack where the table could be filled by just sending
595+
// ping repeatedly.
577596
//
578597
// The caller must not hold tab.mutex.
579-
func (tab *Table) addThroughPing(n *node) {
598+
func (tab *Table) addVerifiedNode(n *node) {
580599
if !tab.isInitDone() {
581600
return
582601
}
583-
tab.add(n)
602+
if n.ID() == tab.self().ID() {
603+
return
604+
}
605+
606+
tab.mutex.Lock()
607+
defer tab.mutex.Unlock()
608+
b := tab.bucket(n.ID())
609+
if tab.bumpInBucket(b, n) {
610+
// Already in bucket, moved to front.
611+
return
612+
}
613+
if len(b.entries) >= bucketSize {
614+
// Bucket full, maybe add as replacement.
615+
tab.addReplacement(b, n)
616+
return
617+
}
618+
if !tab.addIP(b, n.IP()) {
619+
// Can't add: IP limit reached.
620+
return
621+
}
622+
// Add to front of bucket.
623+
b.entries, _ = pushNode(b.entries, n, bucketSize)
624+
b.replacements = deleteNode(b.replacements, n)
625+
n.addedAt = time.Now()
626+
if tab.nodeAddedHook != nil {
627+
tab.nodeAddedHook(n)
628+
}
584629
}
585630

586631
// delete removes an entry from the node table. It is used to evacuate dead nodes.
@@ -651,12 +696,21 @@ func (tab *Table) replace(b *bucket, last *node) *node {
651696
return r
652697
}
653698

654-
// bump moves the given node to the front of the bucket entry list
699+
// bumpInBucket moves the given node to the front of the bucket entry list
655700
// if it is contained in that list.
656-
func (b *bucket) bump(n *node) bool {
701+
func (tab *Table) bumpInBucket(b *bucket, n *node) bool {
657702
for i := range b.entries {
658703
if b.entries[i].ID() == n.ID() {
659-
// move it to the front
704+
if !n.IP().Equal(b.entries[i].IP()) {
705+
// Endpoint has changed, ensure that the new IP fits into table limits.
706+
tab.removeIP(b, b.entries[i].IP())
707+
if !tab.addIP(b, n.IP()) {
708+
// It doesn't, put the previous one back.
709+
tab.addIP(b, b.entries[i].IP())
710+
return false
711+
}
712+
}
713+
// Move it to the front.
660714
copy(b.entries[1:], b.entries[:i])
661715
b.entries[0] = n
662716
return true
@@ -665,29 +719,20 @@ func (b *bucket) bump(n *node) bool {
665719
return false
666720
}
667721

668-
// bumpOrAdd moves n to the front of the bucket entry list or adds it if the list isn't
669-
// full. The return value is true if n is in the bucket.
670-
func (tab *Table) bumpOrAdd(b *bucket, n *node) bool {
671-
if b.bump(n) {
672-
return true
673-
}
674-
if len(b.entries) >= bucketSize || !tab.addIP(b, n.IP()) {
675-
return false
676-
}
677-
b.entries, _ = pushNode(b.entries, n, bucketSize)
678-
b.replacements = deleteNode(b.replacements, n)
679-
n.addedAt = time.Now()
680-
if tab.nodeAddedHook != nil {
681-
tab.nodeAddedHook(n)
682-
}
683-
return true
684-
}
685-
686722
func (tab *Table) deleteInBucket(b *bucket, n *node) {
687723
b.entries = deleteNode(b.entries, n)
688724
tab.removeIP(b, n.IP())
689725
}
690726

727+
func contains(ns []*node, id enode.ID) bool {
728+
for _, n := range ns {
729+
if n.ID() == id {
730+
return true
731+
}
732+
}
733+
return false
734+
}
735+
691736
// pushNode adds n to the front of list, keeping at most max items.
692737
func pushNode(list []*node, n *node, max int) ([]*node, *node) {
693738
if len(list) < max {

p2p/discover/table_test.go

Lines changed: 91 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/ethereum/go-ethereum/crypto"
3131
"github.com/ethereum/go-ethereum/p2p/enode"
3232
"github.com/ethereum/go-ethereum/p2p/enr"
33+
"github.com/ethereum/go-ethereum/p2p/netutil"
3334
)
3435

3536
func TestTable_pingReplace(t *testing.T) {
@@ -64,7 +65,7 @@ func testPingReplace(t *testing.T, newNodeIsResponding, lastInBucketIsResponding
6465
// its bucket if it is unresponsive. Revalidate again to ensure that
6566
transport.dead[last.ID()] = !lastInBucketIsResponding
6667
transport.dead[pingSender.ID()] = !newNodeIsResponding
67-
tab.add(pingSender)
68+
tab.addSeenNode(pingSender)
6869
tab.doRevalidate(make(chan struct{}, 1))
6970
tab.doRevalidate(make(chan struct{}, 1))
7071

@@ -114,10 +115,14 @@ func TestBucket_bumpNoDuplicates(t *testing.T) {
114115
}
115116

116117
prop := func(nodes []*node, bumps []int) (ok bool) {
118+
tab, db := newTestTable(newPingRecorder())
119+
defer db.Close()
120+
defer tab.Close()
121+
117122
b := &bucket{entries: make([]*node, len(nodes))}
118123
copy(b.entries, nodes)
119124
for i, pos := range bumps {
120-
b.bump(b.entries[pos])
125+
tab.bumpInBucket(b, b.entries[pos])
121126
if hasDuplicates(b.entries) {
122127
t.Logf("bucket has duplicates after %d/%d bumps:", i+1, len(bumps))
123128
for _, n := range b.entries {
@@ -126,6 +131,7 @@ func TestBucket_bumpNoDuplicates(t *testing.T) {
126131
return false
127132
}
128133
}
134+
checkIPLimitInvariant(t, tab)
129135
return true
130136
}
131137
if err := quick.Check(prop, cfg); err != nil {
@@ -142,11 +148,12 @@ func TestTable_IPLimit(t *testing.T) {
142148

143149
for i := 0; i < tableIPLimit+1; i++ {
144150
n := nodeAtDistance(tab.self().ID(), i, net.IP{172, 0, 1, byte(i)})
145-
tab.add(n)
151+
tab.addSeenNode(n)
146152
}
147153
if tab.len() > tableIPLimit {
148154
t.Errorf("too many nodes in table")
149155
}
156+
checkIPLimitInvariant(t, tab)
150157
}
151158

152159
// This checks that the per-bucket IP limit is applied correctly.
@@ -159,11 +166,28 @@ func TestTable_BucketIPLimit(t *testing.T) {
159166
d := 3
160167
for i := 0; i < bucketIPLimit+1; i++ {
161168
n := nodeAtDistance(tab.self().ID(), d, net.IP{172, 0, 1, byte(i)})
162-
tab.add(n)
169+
tab.addSeenNode(n)
163170
}
164171
if tab.len() > bucketIPLimit {
165172
t.Errorf("too many nodes in table")
166173
}
174+
checkIPLimitInvariant(t, tab)
175+
}
176+
177+
// checkIPLimitInvariant checks that ip limit sets contain an entry for every
178+
// node in the table and no extra entries.
179+
func checkIPLimitInvariant(t *testing.T, tab *Table) {
180+
t.Helper()
181+
182+
tabset := netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}
183+
for _, b := range tab.buckets {
184+
for _, n := range b.entries {
185+
tabset.Add(n.IP())
186+
}
187+
}
188+
if tabset.String() != tab.ips.String() {
189+
t.Errorf("table IP set is incorrect:\nhave: %v\nwant: %v", tab.ips, tabset)
190+
}
167191
}
168192

169193
func TestTable_closest(t *testing.T) {
@@ -281,6 +305,69 @@ func (*closeTest) Generate(rand *rand.Rand, size int) reflect.Value {
281305
return reflect.ValueOf(t)
282306
}
283307

308+
func TestTable_addVerifiedNode(t *testing.T) {
309+
tab, db := newTestTable(newPingRecorder())
310+
<-tab.initDone
311+
defer db.Close()
312+
defer tab.Close()
313+
314+
// Insert two nodes.
315+
n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1})
316+
n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2})
317+
tab.addSeenNode(n1)
318+
tab.addSeenNode(n2)
319+
320+
// Verify bucket content:
321+
bcontent := []*node{n1, n2}
322+
if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) {
323+
t.Fatalf("wrong bucket content: %v", tab.bucket(n1.ID()).entries)
324+
}
325+
326+
// Add a changed version of n2.
327+
newrec := n2.Record()
328+
newrec.Set(enr.IP{99, 99, 99, 99})
329+
newn2 := wrapNode(enode.SignNull(newrec, n2.ID()))
330+
tab.addVerifiedNode(newn2)
331+
332+
// Check that bucket is updated correctly.
333+
newBcontent := []*node{newn2, n1}
334+
if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, newBcontent) {
335+
t.Fatalf("wrong bucket content after update: %v", tab.bucket(n1.ID()).entries)
336+
}
337+
checkIPLimitInvariant(t, tab)
338+
}
339+
340+
func TestTable_addSeenNode(t *testing.T) {
341+
tab, db := newTestTable(newPingRecorder())
342+
<-tab.initDone
343+
defer db.Close()
344+
defer tab.Close()
345+
346+
// Insert two nodes.
347+
n1 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 1})
348+
n2 := nodeAtDistance(tab.self().ID(), 256, net.IP{88, 77, 66, 2})
349+
tab.addSeenNode(n1)
350+
tab.addSeenNode(n2)
351+
352+
// Verify bucket content:
353+
bcontent := []*node{n1, n2}
354+
if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) {
355+
t.Fatalf("wrong bucket content: %v", tab.bucket(n1.ID()).entries)
356+
}
357+
358+
// Add a changed version of n2.
359+
newrec := n2.Record()
360+
newrec.Set(enr.IP{99, 99, 99, 99})
361+
newn2 := wrapNode(enode.SignNull(newrec, n2.ID()))
362+
tab.addSeenNode(newn2)
363+
364+
// Check that bucket content is unchanged.
365+
if !reflect.DeepEqual(tab.bucket(n1.ID()).entries, bcontent) {
366+
t.Fatalf("wrong bucket content after update: %v", tab.bucket(n1.ID()).entries)
367+
}
368+
checkIPLimitInvariant(t, tab)
369+
}
370+
284371
func TestTable_Lookup(t *testing.T) {
285372
tab, db := newTestTable(lookupTestnet)
286373
defer db.Close()
@@ -535,7 +622,6 @@ func (tn *preminedTestnet) findnode(toid enode.ID, toaddr *net.UDPAddr, target e
535622
}
536623

537624
func (*preminedTestnet) close() {}
538-
func (*preminedTestnet) waitping(from enode.ID) error { return nil }
539625
func (*preminedTestnet) ping(toid enode.ID, toaddr *net.UDPAddr) error { return nil }
540626

541627
// mine generates a testnet struct literal with nodes at

p2p/discover/table_util_test.go

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -86,17 +86,8 @@ func fillBucket(tab *Table, n *node) (last *node) {
8686
// fillTable adds nodes the table to the end of their corresponding bucket
8787
// if the bucket is not full. The caller must not hold tab.mutex.
8888
func fillTable(tab *Table, nodes []*node) {
89-
tab.mutex.Lock()
90-
defer tab.mutex.Unlock()
91-
9289
for _, n := range nodes {
93-
if n.ID() == tab.self().ID() {
94-
continue // don't add self
95-
}
96-
b := tab.bucket(n.ID())
97-
if len(b.entries) < bucketSize {
98-
tab.bumpOrAdd(b, n)
99-
}
90+
tab.addSeenNode(n)
10091
}
10192
}
10293

@@ -154,15 +145,6 @@ func hasDuplicates(slice []*node) bool {
154145
return false
155146
}
156147

157-
func contains(ns []*node, id enode.ID) bool {
158-
for _, n := range ns {
159-
if n.ID() == id {
160-
return true
161-
}
162-
}
163-
return false
164-
}
165-
166148
func sortedByDistanceTo(distbase enode.ID, slice []*node) bool {
167149
var last enode.ID
168150
for i, e := range slice {

p2p/discover/udp.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -661,10 +661,10 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID enode.ID, mac []byte)
661661
n := wrapNode(enode.NewV4(req.senderKey, from.IP, int(req.From.TCP), from.Port))
662662
if time.Since(t.db.LastPongReceived(n.ID(), from.IP)) > bondExpiration {
663663
t.sendPing(fromID, from, func() {
664-
t.tab.addThroughPing(n)
664+
t.tab.addVerifiedNode(n)
665665
})
666666
} else {
667-
t.tab.addThroughPing(n)
667+
t.tab.addVerifiedNode(n)
668668
}
669669

670670
// Update node database and endpoint predictor.

0 commit comments

Comments
 (0)