Skip to content

Commit 7b5107b

Browse files
timcooijmansfjl
andauthored
p2p/discover: avoid dropping unverified nodes when table is almost empty (#21396)
This change improves discovery behavior in small networks. Very small networks would often fail to bootstrap because all member nodes were dropping table content due to findnode failure. The check is now changed to avoid dropping nodes on findnode failure when their bucket is almost empty. It also relaxes the liveness check requirement for FINDNODE/v4 response nodes, returning unverified nodes as results when there aren't any verified nodes yet. The "findnode failed" log now reports whether the node was dropped instead of the number of results. The value of the "results" was always zero by definition. Co-authored-by: Felix Lange <[email protected]>
1 parent bdde616 commit 7b5107b

File tree

5 files changed

+140
-25
lines changed

5 files changed

+140
-25
lines changed

p2p/discover/lookup.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,7 @@ func (it *lookup) startQueries() bool {
104104

105105
// The first query returns nodes from the local table.
106106
if it.queries == -1 {
107-
it.tab.mutex.Lock()
108-
closest := it.tab.closest(it.result.target, bucketSize, false)
109-
it.tab.mutex.Unlock()
107+
closest := it.tab.findnodeByID(it.result.target, bucketSize, false)
110108
// Avoid finishing the lookup too quickly if table is empty. It'd be better to wait
111109
// for the table to fill in this case, but there is no good mechanism for that
112110
// yet.
@@ -150,11 +148,14 @@ func (it *lookup) query(n *node, reply chan<- []*node) {
150148
} else if len(r) == 0 {
151149
fails++
152150
it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
153-
it.tab.log.Trace("Findnode failed", "id", n.ID(), "failcount", fails, "results", len(r), "err", err)
154-
if fails >= maxFindnodeFailures {
155-
it.tab.log.Trace("Too many findnode failures, dropping", "id", n.ID(), "failcount", fails)
151+
// Remove the node from the local table if it fails to return anything useful too
152+
// many times, but only if there are enough other nodes in the bucket.
153+
dropped := false
154+
if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 {
155+
dropped = true
156156
it.tab.delete(n)
157157
}
158+
it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "failcount", fails, "dropped", dropped, "err", err)
158159
} else if fails > 0 {
159160
// Reset failure counter because it counts _consecutive_ failures.
160161
it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0)

p2p/discover/table.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -392,22 +392,35 @@ func (tab *Table) copyLiveNodes() {
392392
}
393393
}
394394

395-
// closest returns the n nodes in the table that are closest to the
396-
// given id. The caller must hold tab.mutex.
397-
func (tab *Table) closest(target enode.ID, nresults int, checklive bool) *nodesByDistance {
398-
// This is a very wasteful way to find the closest nodes but
399-
// obviously correct. I believe that tree-based buckets would make
400-
// this easier to implement efficiently.
401-
close := &nodesByDistance{target: target}
395+
// findnodeByID returns the n nodes in the table that are closest to the given id.
396+
// This is used by the FINDNODE/v4 handler.
397+
//
398+
// The preferLive parameter says whether the caller wants liveness-checked results. If
399+
// preferLive is true and the table contains any verified nodes, the result will not
400+
// contain unverified nodes. However, if there are no verified nodes at all, the result
401+
// will contain unverified nodes.
402+
func (tab *Table) findnodeByID(target enode.ID, nresults int, preferLive bool) *nodesByDistance {
403+
tab.mutex.Lock()
404+
defer tab.mutex.Unlock()
405+
406+
// Scan all buckets. There might be a better way to do this, but there aren't that many
407+
// buckets, so this solution should be fine. The worst-case complexity of this loop
408+
// is O(tab.len() * nresults).
409+
nodes := &nodesByDistance{target: target}
410+
liveNodes := &nodesByDistance{target: target}
402411
for _, b := range &tab.buckets {
403412
for _, n := range b.entries {
404-
if checklive && n.livenessChecks == 0 {
405-
continue
413+
nodes.push(n, nresults)
414+
if preferLive && n.livenessChecks > 0 {
415+
liveNodes.push(n, nresults)
406416
}
407-
close.push(n, nresults)
408417
}
409418
}
410-
return close
419+
420+
if preferLive && len(liveNodes.entries) > 0 {
421+
return liveNodes
422+
}
423+
return nodes
411424
}
412425

413426
// len returns the number of nodes in the table.
@@ -421,6 +434,14 @@ func (tab *Table) len() (n int) {
421434
return n
422435
}
423436

437+
// bucketLen returns the number of nodes in the bucket for the given ID.
438+
func (tab *Table) bucketLen(id enode.ID) int {
439+
tab.mutex.Lock()
440+
defer tab.mutex.Unlock()
441+
442+
return len(tab.bucket(id).entries)
443+
}
444+
424445
// bucket returns the bucket for the given node ID hash.
425446
func (tab *Table) bucket(id enode.ID) *bucket {
426447
d := enode.LogDist(tab.self().ID(), id)

p2p/discover/table_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ func checkIPLimitInvariant(t *testing.T, tab *Table) {
190190
}
191191
}
192192

193-
func TestTable_closest(t *testing.T) {
193+
func TestTable_findnodeByID(t *testing.T) {
194194
t.Parallel()
195195

196196
test := func(test *closeTest) bool {
@@ -202,7 +202,7 @@ func TestTable_closest(t *testing.T) {
202202
fillTable(tab, test.All)
203203

204204
// check that closest(Target, N) returns nodes
205-
result := tab.closest(test.Target, test.N, false).entries
205+
result := tab.findnodeByID(test.Target, test.N, false).entries
206206
if hasDuplicates(result) {
207207
t.Errorf("result contains duplicates")
208208
return false

p2p/discover/v4_udp.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,16 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke
324324
Target: target,
325325
Expiration: uint64(time.Now().Add(expiration).Unix()),
326326
})
327-
return nodes, <-rm.errc
327+
// Ensure that callers don't see a timeout if the node actually responded. Since
328+
// findnode can receive more than one neighbors response, the reply matcher will be
329+
// active until the remote node sends enough nodes. If the remote end doesn't have
330+
// enough nodes the reply matcher will time out waiting for the second reply, but
331+
// there's no need for an error in that case.
332+
err := <-rm.errc
333+
if err == errTimeout && rm.reply != nil {
334+
err = nil
335+
}
336+
return nodes, err
328337
}
329338

330339
// RequestENR sends enrRequest to the given node and waits for a response.
@@ -453,9 +462,9 @@ func (t *UDPv4) loop() {
453462
if p.from == r.from && p.ptype == r.data.Kind() && p.ip.Equal(r.ip) {
454463
ok, requestDone := p.callback(r.data)
455464
matched = matched || ok
465+
p.reply = r.data
456466
// Remove the matcher if callback indicates that all replies have been received.
457467
if requestDone {
458-
p.reply = r.data
459468
p.errc <- nil
460469
plist.Remove(el)
461470
}
@@ -715,9 +724,7 @@ func (t *UDPv4) handleFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno
715724

716725
// Determine closest nodes.
717726
target := enode.ID(crypto.Keccak256Hash(req.Target[:]))
718-
t.tab.mutex.Lock()
719-
closest := t.tab.closest(target, bucketSize, true).entries
720-
t.tab.mutex.Unlock()
727+
closest := t.tab.findnodeByID(target, bucketSize, true).entries
721728

722729
// Send neighbors in chunks with at most maxNeighbors per packet
723730
// to stay below the packet size limit.

p2p/discover/v4_udp_test.go

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
crand "crypto/rand"
2323
"encoding/binary"
2424
"errors"
25+
"fmt"
2526
"io"
2627
"math/rand"
2728
"net"
@@ -277,7 +278,7 @@ func TestUDPv4_findnode(t *testing.T) {
277278
test.table.db.UpdateLastPongReceived(remoteID, test.remoteaddr.IP, time.Now())
278279

279280
// check that closest neighbors are returned.
280-
expected := test.table.closest(testTarget.ID(), bucketSize, true)
281+
expected := test.table.findnodeByID(testTarget.ID(), bucketSize, true)
281282
test.packetIn(nil, &v4wire.Findnode{Target: testTarget, Expiration: futureExp})
282283
waitNeighbors := func(want []*node) {
283284
test.waitPacketOut(func(p *v4wire.Neighbors, to *net.UDPAddr, hash []byte) {
@@ -493,6 +494,91 @@ func TestUDPv4_EIP868(t *testing.T) {
493494
})
494495
}
495496

497+
// This test verifies that a small network of nodes can boot up into a healthy state.
498+
func TestUDPv4_smallNetConvergence(t *testing.T) {
499+
t.Parallel()
500+
501+
// Start the network.
502+
nodes := make([]*UDPv4, 4)
503+
for i := range nodes {
504+
var cfg Config
505+
if i > 0 {
506+
bn := nodes[0].Self()
507+
cfg.Bootnodes = []*enode.Node{bn}
508+
}
509+
nodes[i] = startLocalhostV4(t, cfg)
510+
defer nodes[i].Close()
511+
}
512+
513+
// Run through the iterator on all nodes until
514+
// they have all found each other.
515+
status := make(chan error, len(nodes))
516+
for i := range nodes {
517+
node := nodes[i]
518+
go func() {
519+
found := make(map[enode.ID]bool, len(nodes))
520+
it := node.RandomNodes()
521+
for it.Next() {
522+
found[it.Node().ID()] = true
523+
if len(found) == len(nodes) {
524+
status <- nil
525+
return
526+
}
527+
}
528+
status <- fmt.Errorf("node %s didn't find all nodes", node.Self().ID().TerminalString())
529+
}()
530+
}
531+
532+
// Wait for all status reports.
533+
timeout := time.NewTimer(30 * time.Second)
534+
defer timeout.Stop()
535+
for received := 0; received < len(nodes); {
536+
select {
537+
case <-timeout.C:
538+
for _, node := range nodes {
539+
node.Close()
540+
}
541+
case err := <-status:
542+
received++
543+
if err != nil {
544+
t.Error("ERROR:", err)
545+
return
546+
}
547+
}
548+
}
549+
}
550+
551+
func startLocalhostV4(t *testing.T, cfg Config) *UDPv4 {
552+
t.Helper()
553+
554+
cfg.PrivateKey = newkey()
555+
db, _ := enode.OpenDB("")
556+
ln := enode.NewLocalNode(db, cfg.PrivateKey)
557+
558+
// Prefix logs with node ID.
559+
lprefix := fmt.Sprintf("(%s)", ln.ID().TerminalString())
560+
lfmt := log.TerminalFormat(false)
561+
cfg.Log = testlog.Logger(t, log.LvlTrace)
562+
cfg.Log.SetHandler(log.FuncHandler(func(r *log.Record) error {
563+
t.Logf("%s %s", lprefix, lfmt.Format(r))
564+
return nil
565+
}))
566+
567+
// Listen.
568+
socket, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IP{127, 0, 0, 1}})
569+
if err != nil {
570+
t.Fatal(err)
571+
}
572+
realaddr := socket.LocalAddr().(*net.UDPAddr)
573+
ln.SetStaticIP(realaddr.IP)
574+
ln.SetFallbackUDP(realaddr.Port)
575+
udp, err := ListenV4(socket, ln, cfg)
576+
if err != nil {
577+
t.Fatal(err)
578+
}
579+
return udp
580+
}
581+
496582
// dgramPipe is a fake UDP socket. It queues all sent datagrams.
497583
type dgramPipe struct {
498584
mu *sync.Mutex

0 commit comments

Comments
 (0)