Skip to content

Commit a4c9b34

Browse files
authored
p2p/discover: add waitForNodes
This improves the latency of lookups in small networks and test setups. When the local node table runs empty, the lookupIterator will trigger refresh to try and fill the table again. The behaviour of lookup in case of an empty table is changed: - Previously, lookup waited fixed 1 second before trying to continue the lookup - Now, lookup on an empty table returns immediately, and a better wait implementation is part of the LookupIterator. It reinitialises the table, and continues the interator as soon as a node becomes available.
2 parents 6a7f64e + de9fb97 commit a4c9b34

File tree

3 files changed

+129
-48
lines changed

3 files changed

+129
-48
lines changed

p2p/discover/lookup.go

Lines changed: 74 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
// lookup performs a network search for nodes close to the given target. It approaches the
2828
// target by querying nodes that are closer to it on each iteration. The given target does
2929
// not need to be an actual node identifier.
30+
// lookup on an empty table will return immediately with no nodes.
3031
type lookup struct {
3132
tab *Table
3233
queryfunc queryFunc
@@ -49,11 +50,15 @@ func newLookup(ctx context.Context, tab *Table, target enode.ID, q queryFunc) *l
4950
result: nodesByDistance{target: target},
5051
replyCh: make(chan []*enode.Node, alpha),
5152
cancelCh: ctx.Done(),
52-
queries: -1,
5353
}
5454
// Don't query further if we hit ourself.
5555
// Unlikely to happen often in practice.
5656
it.asked[tab.self().ID()] = true
57+
it.seen[tab.self().ID()] = true
58+
59+
// Initialize the lookup with nodes from table.
60+
closest := it.tab.findnodeByID(it.result.target, bucketSize, false)
61+
it.addNodes(closest.entries)
5762
return it
5863
}
5964

@@ -64,22 +69,19 @@ func (it *lookup) run() []*enode.Node {
6469
return it.result.entries
6570
}
6671

72+
func (it *lookup) empty() bool {
73+
return len(it.replyBuffer) == 0
74+
}
75+
6776
// advance advances the lookup until any new nodes have been found.
6877
// It returns false when the lookup has ended.
6978
func (it *lookup) advance() bool {
7079
for it.startQueries() {
7180
select {
7281
case nodes := <-it.replyCh:
73-
it.replyBuffer = it.replyBuffer[:0]
74-
for _, n := range nodes {
75-
if n != nil && !it.seen[n.ID()] {
76-
it.seen[n.ID()] = true
77-
it.result.push(n, bucketSize)
78-
it.replyBuffer = append(it.replyBuffer, n)
79-
}
80-
}
8182
it.queries--
82-
if len(it.replyBuffer) > 0 {
83+
it.addNodes(nodes)
84+
if !it.empty() {
8385
return true
8486
}
8587
case <-it.cancelCh:
@@ -89,6 +91,17 @@ func (it *lookup) advance() bool {
8991
return false
9092
}
9193

94+
func (it *lookup) addNodes(nodes []*enode.Node) {
95+
it.replyBuffer = it.replyBuffer[:0]
96+
for _, n := range nodes {
97+
if n != nil && !it.seen[n.ID()] {
98+
it.seen[n.ID()] = true
99+
it.result.push(n, bucketSize)
100+
it.replyBuffer = append(it.replyBuffer, n)
101+
}
102+
}
103+
}
104+
92105
func (it *lookup) shutdown() {
93106
for it.queries > 0 {
94107
<-it.replyCh
@@ -103,20 +116,6 @@ func (it *lookup) startQueries() bool {
103116
return false
104117
}
105118

106-
// The first query returns nodes from the local table.
107-
if it.queries == -1 {
108-
closest := it.tab.findnodeByID(it.result.target, bucketSize, false)
109-
// Avoid finishing the lookup too quickly if table is empty. It'd be better to wait
110-
// for the table to fill in this case, but there is no good mechanism for that
111-
// yet.
112-
if len(closest.entries) == 0 {
113-
it.slowdown()
114-
}
115-
it.queries = 1
116-
it.replyCh <- closest.entries
117-
return true
118-
}
119-
120119
// Ask the closest nodes that we haven't asked yet.
121120
for i := 0; i < len(it.result.entries) && it.queries < alpha; i++ {
122121
n := it.result.entries[i]
@@ -130,15 +129,6 @@ func (it *lookup) startQueries() bool {
130129
return it.queries > 0
131130
}
132131

133-
func (it *lookup) slowdown() {
134-
sleep := time.NewTimer(1 * time.Second)
135-
defer sleep.Stop()
136-
select {
137-
case <-sleep.C:
138-
case <-it.tab.closeReq:
139-
}
140-
}
141-
142132
func (it *lookup) query(n *enode.Node, reply chan<- []*enode.Node) {
143133
r, err := it.queryfunc(n)
144134
if !errors.Is(err, errClosed) { // avoid recording failures on shutdown.
@@ -153,12 +143,16 @@ func (it *lookup) query(n *enode.Node, reply chan<- []*enode.Node) {
153143

154144
// lookupIterator performs lookup operations and iterates over all seen nodes.
155145
// When a lookup finishes, a new one is created through nextLookup.
146+
// LookupIterator waits for table initialization and triggers a table refresh
147+
// when necessary.
148+
156149
type lookupIterator struct {
157-
buffer []*enode.Node
158-
nextLookup lookupFunc
159-
ctx context.Context
160-
cancel func()
161-
lookup *lookup
150+
buffer []*enode.Node
151+
nextLookup lookupFunc
152+
ctx context.Context
153+
cancel func()
154+
lookup *lookup
155+
tabRefreshing <-chan struct{}
162156
}
163157

164158
type lookupFunc func(ctx context.Context) *lookup
@@ -182,6 +176,7 @@ func (it *lookupIterator) Next() bool {
182176
if len(it.buffer) > 0 {
183177
it.buffer = it.buffer[1:]
184178
}
179+
185180
// Advance the lookup to refill the buffer.
186181
for len(it.buffer) == 0 {
187182
if it.ctx.Err() != nil {
@@ -191,17 +186,55 @@ func (it *lookupIterator) Next() bool {
191186
}
192187
if it.lookup == nil {
193188
it.lookup = it.nextLookup(it.ctx)
189+
if it.lookup.empty() {
190+
// If the lookup is empty right after creation, it means the local table
191+
// is in a degraded state, and we need to wait for it to fill again.
192+
it.lookupFailed(it.lookup.tab, 1*time.Minute)
193+
it.lookup = nil
194+
continue
195+
}
196+
// Yield the initial nodes from the iterator before advancing the lookup.
197+
it.buffer = it.lookup.replyBuffer
194198
continue
195199
}
196-
if !it.lookup.advance() {
200+
201+
newNodes := it.lookup.advance()
202+
it.buffer = it.lookup.replyBuffer
203+
if !newNodes {
197204
it.lookup = nil
198-
continue
199205
}
200-
it.buffer = it.lookup.replyBuffer
201206
}
202207
return true
203208
}
204209

210+
// lookupFailed handles failed lookup attempts. This can be called when the table has
211+
// exited, or when it runs out of nodes.
212+
func (it *lookupIterator) lookupFailed(tab *Table, timeout time.Duration) {
213+
tout, cancel := context.WithTimeout(it.ctx, timeout)
214+
defer cancel()
215+
216+
// Wait for Table initialization to complete, in case it is still in progress.
217+
select {
218+
case <-tab.initDone:
219+
case <-tout.Done():
220+
return
221+
}
222+
223+
// Wait for ongoing refresh operation, or trigger one.
224+
if it.tabRefreshing == nil {
225+
it.tabRefreshing = tab.refresh()
226+
}
227+
select {
228+
case <-it.tabRefreshing:
229+
it.tabRefreshing = nil
230+
case <-tout.Done():
231+
return
232+
}
233+
234+
// Wait for the table to fill.
235+
tab.waitForNodes(tout, 1)
236+
}
237+
205238
// Close ends the iterator.
206239
func (it *lookupIterator) Close() {
207240
it.cancel()

p2p/discover/table.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232

3333
"github.com/ethereum/go-ethereum/common"
3434
"github.com/ethereum/go-ethereum/common/mclock"
35+
"github.com/ethereum/go-ethereum/event"
3536
"github.com/ethereum/go-ethereum/log"
3637
"github.com/ethereum/go-ethereum/metrics"
3738
"github.com/ethereum/go-ethereum/p2p/enode"
@@ -84,6 +85,7 @@ type Table struct {
8485
closeReq chan struct{}
8586
closed chan struct{}
8687

88+
nodeFeed event.FeedOf[*enode.Node]
8789
nodeAddedHook func(*bucket, *tableNode)
8890
nodeRemovedHook func(*bucket, *tableNode)
8991
}
@@ -567,6 +569,8 @@ func (tab *Table) nodeAdded(b *bucket, n *tableNode) {
567569
}
568570
n.addedToBucket = time.Now()
569571
tab.revalidation.nodeAdded(tab, n)
572+
573+
tab.nodeFeed.Send(n.Node)
570574
if tab.nodeAddedHook != nil {
571575
tab.nodeAddedHook(b, n)
572576
}
@@ -702,3 +706,38 @@ func (tab *Table) deleteNode(n *enode.Node) {
702706
b := tab.bucket(n.ID())
703707
tab.deleteInBucket(b, n.ID())
704708
}
709+
710+
// waitForNodes blocks until the table contains at least n nodes.
711+
func (tab *Table) waitForNodes(ctx context.Context, n int) error {
712+
getlength := func() (count int) {
713+
for _, b := range &tab.buckets {
714+
count += len(b.entries)
715+
}
716+
return count
717+
}
718+
719+
var ch chan *enode.Node
720+
for {
721+
tab.mutex.Lock()
722+
if getlength() >= n {
723+
tab.mutex.Unlock()
724+
return nil
725+
}
726+
if ch == nil {
727+
// Init subscription.
728+
ch = make(chan *enode.Node)
729+
sub := tab.nodeFeed.Subscribe(ch)
730+
defer sub.Unsubscribe()
731+
}
732+
tab.mutex.Unlock()
733+
734+
// Wait for a node add event.
735+
select {
736+
case <-ch:
737+
case <-ctx.Done():
738+
return ctx.Err()
739+
case <-tab.closeReq:
740+
return errClosed
741+
}
742+
}
743+
}

p2p/discover/v4_udp_test.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ import (
2424
"errors"
2525
"fmt"
2626
"io"
27+
"maps"
2728
"math/rand"
2829
"net"
2930
"net/netip"
3031
"reflect"
32+
"slices"
3133
"sync"
3234
"testing"
3335
"time"
@@ -509,18 +511,26 @@ func TestUDPv4_smallNetConvergence(t *testing.T) {
509511
// they have all found each other.
510512
status := make(chan error, len(nodes))
511513
for i := range nodes {
512-
node := nodes[i]
514+
self := nodes[i]
513515
go func() {
514-
found := make(map[enode.ID]bool, len(nodes))
515-
it := node.RandomNodes()
516+
missing := make(map[enode.ID]bool, len(nodes))
517+
for _, n := range nodes {
518+
if n.Self().ID() == self.Self().ID() {
519+
continue // skip self
520+
}
521+
missing[n.Self().ID()] = true
522+
}
523+
524+
it := self.RandomNodes()
516525
for it.Next() {
517-
found[it.Node().ID()] = true
518-
if len(found) == len(nodes) {
526+
delete(missing, it.Node().ID())
527+
if len(missing) == 0 {
519528
status <- nil
520529
return
521530
}
522531
}
523-
status <- fmt.Errorf("node %s didn't find all nodes", node.Self().ID().TerminalString())
532+
missingIDs := slices.Collect(maps.Keys(missing))
533+
status <- fmt.Errorf("node %s didn't find all nodes, missing %v", self.Self().ID().TerminalString(), missingIDs)
524534
}()
525535
}
526536

@@ -537,7 +547,6 @@ func TestUDPv4_smallNetConvergence(t *testing.T) {
537547
received++
538548
if err != nil {
539549
t.Error("ERROR:", err)
540-
return
541550
}
542551
}
543552
}

0 commit comments

Comments
 (0)