Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 70 additions & 22 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,12 @@ loop:

case op := <-tab.addNodeCh:
tab.mutex.Lock()
ok := tab.handleAddNode(op)
ok, addedNode := tab.handleAddNode(op)
tab.mutex.Unlock()
// Send notification outside of mutex to avoid deadlock.
if addedNode != nil {
tab.nodeFeed.Send(addedNode)
}
tab.addNodeHandled <- ok

case op := <-tab.trackRequestCh:
Expand Down Expand Up @@ -455,8 +459,13 @@ func (tab *Table) loadSeedNodes() {
tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", addr, "age", age)
}
tab.mutex.Lock()
tab.handleAddNode(addNodeOp{node: seed, isInbound: false})
_, addedNode := tab.handleAddNode(addNodeOp{node: seed, isInbound: false})
tab.mutex.Unlock()

// Send notification outside of mutex to avoid deadlock.
if addedNode != nil {
tab.nodeFeed.Send(addedNode)
}
}
}

Expand Down Expand Up @@ -507,30 +516,34 @@ func (tab *Table) removeIP(b *bucket, ip netip.Addr) {

// handleAddNode adds the node in the request to the table, if there is space.
// The caller must hold tab.mutex.
func (tab *Table) handleAddNode(req addNodeOp) bool {
//
// Returns (true, addedNode) if the node was added, (false, nil) otherwise.
// The caller should call nodeFeed.Send(addedNode) AFTER releasing the mutex
// to notify subscribers about the new node.
func (tab *Table) handleAddNode(req addNodeOp) (bool, *enode.Node) {
if req.node.ID() == tab.self().ID() {
return false
return false, nil
}
// For nodes from inbound contact, there is an additional safety measure: if the table
// is still initializing the node is not added.
if req.isInbound && !tab.isInitDone() {
return false
return false, nil
}

b := tab.bucket(req.node.ID())
n, _ := tab.bumpInBucket(b, req.node, req.isInbound)
if n != nil {
// Already in bucket.
return false
return false, nil
}
if len(b.entries) >= bucketSize {
// Bucket full, maybe add as replacement.
tab.addReplacement(b, req.node)
return false
return false, nil
}
if !tab.addIP(b, req.node.IPAddr()) {
// Can't add: IP limit reached.
return false
return false, nil
}

// Add to bucket.
Expand All @@ -541,8 +554,8 @@ func (tab *Table) handleAddNode(req addNodeOp) bool {
}
b.entries = append(b.entries, wn)
b.replacements = deleteNode(b.replacements, wn.ID())
tab.nodeAdded(b, wn)
return true
addedNode := tab.nodeAdded(b, wn)
return true, addedNode
}

// addReplacement adds n to the replacement cache of bucket b.
Expand All @@ -563,20 +576,28 @@ func (tab *Table) addReplacement(b *bucket, n *enode.Node) {
}
}

func (tab *Table) nodeAdded(b *bucket, n *tableNode) {
// nodeAdded is called when a node is added to a bucket.
// It returns the added node for the caller to send a notification
// via nodeFeed.Send() AFTER releasing the mutex.
//
// The caller must hold tab.mutex.
func (tab *Table) nodeAdded(b *bucket, n *tableNode) *enode.Node {
if n.addedToTable.IsZero() {
n.addedToTable = time.Now()
}
n.addedToBucket = time.Now()
tab.revalidation.nodeAdded(tab, n)

tab.nodeFeed.Send(n.Node)
// NOTE: nodeFeed.Send() is NOT called here to avoid deadlock.
// The caller must send the notification after releasing the mutex.

if tab.nodeAddedHook != nil {
tab.nodeAddedHook(b, n)
}
if metrics.Enabled() {
bucketsCounter[b.index].Inc(1)
}
return n.Node
}

func (tab *Table) nodeRemoved(b *bucket, n *tableNode) {
Expand All @@ -591,11 +612,18 @@ func (tab *Table) nodeRemoved(b *bucket, n *tableNode) {

// deleteInBucket removes node n from the table.
// If there are replacement nodes in the bucket, the node is replaced.
func (tab *Table) deleteInBucket(b *bucket, id enode.ID) *tableNode {
//
// Returns (replacementNode, nodeToNotify) where:
// - replacementNode is the tableNode that replaced the deleted node (or nil)
// - nodeToNotify is the enode.Node that should be sent via nodeFeed.Send() AFTER
// releasing the mutex (or nil if no notification needed)
//
// The caller must hold tab.mutex.
func (tab *Table) deleteInBucket(b *bucket, id enode.ID) (*tableNode, *enode.Node) {
index := slices.IndexFunc(b.entries, func(e *tableNode) bool { return e.ID() == id })
if index == -1 {
// Entry has been removed already.
return nil
return nil, nil
}

// Remove the node.
Expand All @@ -607,15 +635,15 @@ func (tab *Table) deleteInBucket(b *bucket, id enode.ID) *tableNode {
// Add replacement.
if len(b.replacements) == 0 {
tab.log.Debug("Removed dead node", "b", b.index, "id", n.ID(), "ip", n.IPAddr())
return nil
return nil, nil
}
rindex := tab.rand.Intn(len(b.replacements))
rep := b.replacements[rindex]
b.replacements = slices.Delete(b.replacements, rindex, rindex+1)
b.entries = append(b.entries, rep)
tab.nodeAdded(b, rep)
nodeToNotify := tab.nodeAdded(b, rep)
tab.log.Debug("Replaced dead node", "b", b.index, "id", n.ID(), "ip", n.IPAddr(), "r", rep.ID(), "rip", rep.IPAddr())
return rep
return rep, nodeToNotify
}

// bumpInBucket updates a node record if it exists in the bucket.
Expand Down Expand Up @@ -670,21 +698,36 @@ func (tab *Table) handleTrackRequest(op trackRequestOp) {
tab.db.UpdateFindFails(op.node.ID(), op.node.IPAddr(), fails)
}

// Collect nodes to notify after releasing mutex.
var nodesToNotify []*enode.Node

tab.mutex.Lock()
defer tab.mutex.Unlock()

b := tab.bucket(op.node.ID())
// Remove the node from the local table if it fails to return anything useful too
// many times, but only if there are enough other nodes in the bucket. This latter
// condition specifically exists to make bootstrapping in smaller test networks more
// reliable.
if fails >= maxFindnodeFailures && len(b.entries) >= bucketSize/4 {
tab.deleteInBucket(b, op.node.ID())
_, nodeToNotify := tab.deleteInBucket(b, op.node.ID())
if nodeToNotify != nil {
nodesToNotify = append(nodesToNotify, nodeToNotify)
}
}

// Add found nodes.
for _, n := range op.foundNodes {
tab.handleAddNode(addNodeOp{n, false, false})
_, addedNode := tab.handleAddNode(addNodeOp{n, false, false})
if addedNode != nil {
nodesToNotify = append(nodesToNotify, addedNode)
}
}

tab.mutex.Unlock()

// Send notifications outside of mutex to avoid deadlock.
for _, n := range nodesToNotify {
tab.nodeFeed.Send(n)
}
}

Expand All @@ -702,9 +745,14 @@ func pushNode(list []*tableNode, n *tableNode, max int) ([]*tableNode, *tableNod
// deleteNode removes a node from the table.
func (tab *Table) deleteNode(n *enode.Node) {
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(n.ID())
tab.deleteInBucket(b, n.ID())
_, nodeToNotify := tab.deleteInBucket(b, n.ID())
tab.mutex.Unlock()

// Send notification outside of mutex to avoid deadlock.
if nodeToNotify != nil {
tab.nodeFeed.Send(nodeToNotify)
}
}

// waitForNodes blocks until the table contains at least n nodes.
Expand Down
12 changes: 10 additions & 2 deletions p2p/discover/table_reval.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,23 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons
}()

// Remaining logic needs access to Table internals.
var nodeToNotify *enode.Node

tab.mutex.Lock()
defer tab.mutex.Unlock()

if !resp.didRespond {
n.livenessChecks /= 3
if n.livenessChecks <= 0 {
tab.deleteInBucket(b, n.ID())
_, nodeToNotify = tab.deleteInBucket(b, n.ID())
} else {
tab.log.Debug("Node revalidation failed", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", n.revalList.name)
tr.moveToList(&tr.fast, n, now, &tab.rand)
}
tab.mutex.Unlock()
// Send notification outside of mutex to avoid deadlock.
if nodeToNotify != nil {
tab.nodeFeed.Send(nodeToNotify)
}
return
}

Expand All @@ -181,6 +187,8 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons
if !endpointChanged {
tr.moveToList(&tr.slow, n, now, &tab.rand)
}

tab.mutex.Unlock()
}

// moveToList ensures n is in the 'dest' list.
Expand Down