diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 6a1c7494ee..307e8e6574 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -379,8 +379,12 @@ loop: case op := <-tab.addNodeCh: tab.mutex.Lock() - ok := tab.handleAddNode(op) + ok, addedNode := tab.handleAddNode(op) tab.mutex.Unlock() + // Send notification outside of mutex to avoid deadlock. + if addedNode != nil { + tab.nodeFeed.Send(addedNode) + } tab.addNodeHandled <- ok case op := <-tab.trackRequestCh: @@ -455,8 +459,13 @@ func (tab *Table) loadSeedNodes() { tab.log.Trace("Found seed node in database", "id", seed.ID(), "addr", addr, "age", age) } tab.mutex.Lock() - tab.handleAddNode(addNodeOp{node: seed, isInbound: false}) + _, addedNode := tab.handleAddNode(addNodeOp{node: seed, isInbound: false}) tab.mutex.Unlock() + + // Send notification outside of mutex to avoid deadlock. + if addedNode != nil { + tab.nodeFeed.Send(addedNode) + } } } @@ -507,30 +516,34 @@ func (tab *Table) removeIP(b *bucket, ip netip.Addr) { // handleAddNode adds the node in the request to the table, if there is space. // The caller must hold tab.mutex. -func (tab *Table) handleAddNode(req addNodeOp) bool { +// +// Returns (true, addedNode) if the node was added, (false, nil) otherwise. +// The caller should call nodeFeed.Send(addedNode) AFTER releasing the mutex +// to notify subscribers about the new node. +func (tab *Table) handleAddNode(req addNodeOp) (bool, *enode.Node) { if req.node.ID() == tab.self().ID() { - return false + return false, nil } // For nodes from inbound contact, there is an additional safety measure: if the table // is still initializing the node is not added. if req.isInbound && !tab.isInitDone() { - return false + return false, nil } b := tab.bucket(req.node.ID()) n, _ := tab.bumpInBucket(b, req.node, req.isInbound) if n != nil { // Already in bucket. - return false + return false, nil } if len(b.entries) >= bucketSize { // Bucket full, maybe add as replacement. tab.addReplacement(b, req.node) - return false + return false, nil } if !tab.addIP(b, req.node.IPAddr()) { // Can't add: IP limit reached. - return false + return false, nil } // Add to bucket. @@ -541,8 +554,8 @@ func (tab *Table) handleAddNode(req addNodeOp) bool { } b.entries = append(b.entries, wn) b.replacements = deleteNode(b.replacements, wn.ID()) - tab.nodeAdded(b, wn) - return true + addedNode := tab.nodeAdded(b, wn) + return true, addedNode } // addReplacement adds n to the replacement cache of bucket b. @@ -563,20 +576,28 @@ func (tab *Table) addReplacement(b *bucket, n *enode.Node) { } } -func (tab *Table) nodeAdded(b *bucket, n *tableNode) { +// nodeAdded is called when a node is added to a bucket. +// It returns the added node for the caller to send a notification +// via nodeFeed.Send() AFTER releasing the mutex. +// +// The caller must hold tab.mutex. +func (tab *Table) nodeAdded(b *bucket, n *tableNode) *enode.Node { if n.addedToTable.IsZero() { n.addedToTable = time.Now() } n.addedToBucket = time.Now() tab.revalidation.nodeAdded(tab, n) - tab.nodeFeed.Send(n.Node) + // NOTE: nodeFeed.Send() is NOT called here to avoid deadlock. + // The caller must send the notification after releasing the mutex. + if tab.nodeAddedHook != nil { tab.nodeAddedHook(b, n) } if metrics.Enabled() { bucketsCounter[b.index].Inc(1) } + return n.Node } func (tab *Table) nodeRemoved(b *bucket, n *tableNode) { @@ -591,11 +612,18 @@ func (tab *Table) nodeRemoved(b *bucket, n *tableNode) { // deleteInBucket removes node n from the table. // If there are replacement nodes in the bucket, the node is replaced. -func (tab *Table) deleteInBucket(b *bucket, id enode.ID) *tableNode { +// +// Returns (replacementNode, nodeToNotify) where: +// - replacementNode is the tableNode that replaced the deleted node (or nil) +// - nodeToNotify is the enode.Node that should be sent via nodeFeed.Send() AFTER +// releasing the mutex (or nil if no notification needed) +// +// The caller must hold tab.mutex. +func (tab *Table) deleteInBucket(b *bucket, id enode.ID) (*tableNode, *enode.Node) { index := slices.IndexFunc(b.entries, func(e *tableNode) bool { return e.ID() == id }) if index == -1 { // Entry has been removed already. - return nil + return nil, nil } // Remove the node. @@ -607,15 +635,15 @@ func (tab *Table) deleteInBucket(b *bucket, id enode.ID) *tableNode { // Add replacement. if len(b.replacements) == 0 { tab.log.Debug("Removed dead node", "b", b.index, "id", n.ID(), "ip", n.IPAddr()) - return nil + return nil, nil } rindex := tab.rand.Intn(len(b.replacements)) rep := b.replacements[rindex] b.replacements = slices.Delete(b.replacements, rindex, rindex+1) b.entries = append(b.entries, rep) - tab.nodeAdded(b, rep) + nodeToNotify := tab.nodeAdded(b, rep) tab.log.Debug("Replaced dead node", "b", b.index, "id", n.ID(), "ip", n.IPAddr(), "r", rep.ID(), "rip", rep.IPAddr()) - return rep + return rep, nodeToNotify } // bumpInBucket updates a node record if it exists in the bucket. @@ -670,8 +698,10 @@ func (tab *Table) handleTrackRequest(op trackRequestOp) { tab.db.UpdateFindFails(op.node.ID(), op.node.IPAddr(), fails) } + // Collect nodes to notify after releasing mutex. + var nodesToNotify []*enode.Node + tab.mutex.Lock() - defer tab.mutex.Unlock() b := tab.bucket(op.node.ID()) // Remove the node from the local table if it fails to return anything useful too @@ -679,12 +709,25 @@ func (tab *Table) handleTrackRequest(op trackRequestOp) { // condition specifically exists to make bootstrapping in smaller test networks more // reliable. if fails >= maxFindnodeFailures && len(b.entries) >= bucketSize/4 { - tab.deleteInBucket(b, op.node.ID()) + _, nodeToNotify := tab.deleteInBucket(b, op.node.ID()) + if nodeToNotify != nil { + nodesToNotify = append(nodesToNotify, nodeToNotify) + } } // Add found nodes. for _, n := range op.foundNodes { - tab.handleAddNode(addNodeOp{n, false, false}) + _, addedNode := tab.handleAddNode(addNodeOp{n, false, false}) + if addedNode != nil { + nodesToNotify = append(nodesToNotify, addedNode) + } + } + + tab.mutex.Unlock() + + // Send notifications outside of mutex to avoid deadlock. + for _, n := range nodesToNotify { + tab.nodeFeed.Send(n) } } @@ -702,9 +745,14 @@ func pushNode(list []*tableNode, n *tableNode, max int) ([]*tableNode, *tableNod // deleteNode removes a node from the table. func (tab *Table) deleteNode(n *enode.Node) { tab.mutex.Lock() - defer tab.mutex.Unlock() b := tab.bucket(n.ID()) - tab.deleteInBucket(b, n.ID()) + _, nodeToNotify := tab.deleteInBucket(b, n.ID()) + tab.mutex.Unlock() + + // Send notification outside of mutex to avoid deadlock. + if nodeToNotify != nil { + tab.nodeFeed.Send(nodeToNotify) + } } // waitForNodes blocks until the table contains at least n nodes. diff --git a/p2p/discover/table_reval.go b/p2p/discover/table_reval.go index 1519313d19..56cac62855 100644 --- a/p2p/discover/table_reval.go +++ b/p2p/discover/table_reval.go @@ -154,17 +154,23 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons }() // Remaining logic needs access to Table internals. + var nodeToNotify *enode.Node + tab.mutex.Lock() - defer tab.mutex.Unlock() if !resp.didRespond { n.livenessChecks /= 3 if n.livenessChecks <= 0 { - tab.deleteInBucket(b, n.ID()) + _, nodeToNotify = tab.deleteInBucket(b, n.ID()) } else { tab.log.Debug("Node revalidation failed", "b", b.index, "id", n.ID(), "checks", n.livenessChecks, "q", n.revalList.name) tr.moveToList(&tr.fast, n, now, &tab.rand) } + tab.mutex.Unlock() + // Send notification outside of mutex to avoid deadlock. + if nodeToNotify != nil { + tab.nodeFeed.Send(nodeToNotify) + } return } @@ -181,6 +187,8 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons if !endpointChanged { tr.moveToList(&tr.slow, n, now, &tab.rand) } + + tab.mutex.Unlock() } // moveToList ensures n is in the 'dest' list.