Skip to content

Commit 6a7e8bc

Browse files
feat: tracked pending inbound in server.go
1 parent 5e6f737 commit 6a7e8bc

File tree

1 file changed

+71
-10
lines changed

1 file changed

+71
-10
lines changed

p2p/server.go

Lines changed: 71 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -639,10 +639,12 @@ func (srv *Server) run() {
639639
defer srv.dialsched.stop()
640640

641641
var (
642-
peers = make(map[enode.ID]*Peer)
643-
inboundCount = 0
644-
trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
642+
peers = make(map[enode.ID]*Peer)
643+
inboundCount = 0
644+
trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
645+
pendingInbound = make(map[enode.ID]time.Time) // Track in-progress inbound connections
645646
)
647+
646648
// Put trusted nodes into a map to speed up checks.
647649
// Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
648650
for _, n := range srv.TrustedNodes {
@@ -682,22 +684,62 @@ running:
682684
case c := <-srv.checkpointPostHandshake:
683685
// A connection has passed the encryption handshake so
684686
// the remote identity is known (but hasn't been verified yet).
685-
if trusted[c.node.ID()] {
687+
nodeID := c.node.ID()
688+
if trusted[nodeID] {
686689
// Ensure that the trusted flag is set before checking against MaxPeers.
687690
c.flags |= trustedConn
688691
}
689-
// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
690-
c.cont <- srv.postHandshakeChecks(peers, inboundCount, c)
692+
693+
// Check for duplicate connections: both active peers and in-progress inbound connections.
694+
if c.flags&inboundConn != 0 && c.flags&trustedConn == 0 {
695+
// Check if we already have this peer or if there's already an in-progress inbound connection from them.
696+
if _, exists := peers[nodeID]; exists {
697+
srv.log.Debug("Rejecting duplicate inbound connection (peer exists)", "id", nodeID)
698+
c.cont <- DiscAlreadyConnected
699+
continue running
700+
}
701+
702+
if startTime, exists := pendingInbound[nodeID]; exists {
703+
srv.log.Debug("Rejecting duplicate inbound connection (already pending)",
704+
"id", nodeID, "pending_duration", time.Since(startTime))
705+
c.cont <- DiscAlreadyConnected
706+
continue running
707+
708+
}
709+
710+
pendingInbound[nodeID] = time.Now()
711+
srv.log.Trace("Tracking pending inbound connection", "id", nodeID, "pending_count", len(pendingInbound))
712+
}
713+
714+
err := srv.postHandshakeChecks(peers, inboundCount, c)
715+
if err != nil && c.flags&inboundConn != 0 && c.flags&trustedConn == 0 {
716+
delete(pendingInbound, nodeID)
717+
srv.log.Trace("Removed failed pending inbound connection", "id", nodeID, "err", err)
718+
}
719+
c.cont <- err
691720

692721
case c := <-srv.checkpointAddPeer:
693722
// At this point the connection is past the protocol handshake.
694723
// Its capabilities are known and the remote identity is verified.
724+
nodeID := c.node.ID()
695725
err := srv.addPeerChecks(peers, inboundCount, c)
696726
if err == nil {
697727
// The handshakes are done and it passed all checks.
698728
p := srv.launchPeer(c)
699-
peers[c.node.ID()] = p
700-
srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name())
729+
peers[nodeID] = p
730+
// Remove from pending tracker as it became promoted to proper peer
731+
if c.flags&inboundConn != 0 {
732+
if startTime, exists := pendingInbound[nodeID]; exists {
733+
duration := time.Since(startTime)
734+
delete(pendingInbound, nodeID)
735+
srv.log.Trace("Promoted pending inbound to peer", "id", nodeID,
736+
"handshake_duration", duration, "pending_count", len(pendingInbound))
737+
}
738+
739+
}
740+
741+
srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(),
742+
"conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name())
701743
srv.dialsched.peerAdded(c)
702744
if p.Inbound() {
703745
inboundCount++
@@ -708,14 +750,32 @@ running:
708750
activeOutboundPeerGauge.Inc(1)
709751
}
710752
activePeerGauge.Inc(1)
753+
754+
} else {
755+
// Failed to add peer. Clean up pending tracking if it was inbound.
756+
if c.flags&inboundConn != 0 {
757+
delete(pendingInbound, nodeID)
758+
srv.log.Trace("Removed failed pending inbound at add peer stage",
759+
"id", nodeID, "err", err)
760+
}
761+
711762
}
763+
712764
c.cont <- err
713765

714766
case pd := <-srv.delpeer:
715767
// A peer disconnected.
716768
d := common.PrettyDuration(mclock.Now() - pd.created)
717-
delete(peers, pd.ID())
718-
srv.log.Debug("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err)
769+
nodeID := pd.ID()
770+
delete(peers, nodeID)
771+
// Remove from pending tracking if present (defensive cleanup).
772+
if _, exists := pendingInbound[nodeID]; exists {
773+
delete(pendingInbound, nodeID)
774+
srv.log.Trace("Cleaned up pending entry on peer deletion", "id", nodeID)
775+
}
776+
777+
srv.log.Debug("Removing p2p peer", "peercount", len(peers), "id", nodeID,
778+
"duration", d, "req", pd.requested, "err", pd.err)
719779
srv.dialsched.peerRemoved(pd.rw)
720780
if pd.Inbound() {
721781
inboundCount--
@@ -747,6 +807,7 @@ running:
747807
p := <-srv.delpeer
748808
p.log.Trace("<-delpeer (spindown)")
749809
delete(peers, p.ID())
810+
delete(pendingInbound, p.ID())
750811
}
751812
}
752813

0 commit comments

Comments
 (0)