Skip to content

Commit 01d512a

Browse files
feat: delegate pending inbound chks to dial.go
1 parent 3b42dbf commit 01d512a

File tree

2 files changed

+12
-64
lines changed

2 files changed

+12
-64
lines changed

p2p/server.go

Lines changed: 10 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -639,10 +639,9 @@ func (srv *Server) run() {
639639
defer srv.dialsched.stop()
640640

641641
var (
642-
peers = make(map[enode.ID]*Peer)
643-
inboundCount = 0
644-
trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
645-
pendingInbound = make(map[enode.ID]time.Time) // Track in-progress inbound connections
642+
peers = make(map[enode.ID]*Peer)
643+
inboundCount = 0
644+
trusted = make(map[enode.ID]bool, len(srv.TrustedNodes))
646645
)
647646

648647
// Put trusted nodes into a map to speed up checks.
@@ -689,34 +688,9 @@ running:
689688
// Ensure that the trusted flag is set before checking against MaxPeers.
690689
c.flags |= trustedConn
691690
}
692-
693-
// Check for duplicate connections: both active peers and in-progress inbound connections.
694-
if c.flags&inboundConn != 0 && c.flags&trustedConn == 0 {
695-
// Check if we already have this peer or if there's already an in-progress inbound connection from them.
696-
if _, exists := peers[nodeID]; exists {
697-
srv.log.Debug("Rejecting duplicate inbound connection (peer exists)", "id", nodeID)
698-
c.cont <- DiscAlreadyConnected
699-
continue running
700-
}
701-
702-
if startTime, exists := pendingInbound[nodeID]; exists {
703-
srv.log.Debug("Rejecting duplicate inbound connection (already pending)",
704-
"id", nodeID, "pending_duration", time.Since(startTime))
705-
c.cont <- DiscAlreadyConnected
706-
continue running
707-
708-
}
709-
710-
pendingInbound[nodeID] = time.Now()
711-
srv.dialsched.inboundPending(nodeID)
712-
srv.log.Trace("Tracking pending inbound connection", "id", nodeID, "pending_count", len(pendingInbound))
713-
}
714-
715691
err := srv.postHandshakeChecks(peers, inboundCount, c)
716-
if err != nil && c.flags&inboundConn != 0 && c.flags&trustedConn == 0 {
717-
delete(pendingInbound, nodeID)
718-
srv.dialsched.inboundCompleted(nodeID)
719-
srv.log.Trace("Removed failed pending inbound connection", "id", nodeID, "err", err)
692+
if err == nil && c.flags&inboundConn != 0 {
693+
srv.dialsched.inboundPending(c.node.ID())
720694
}
721695
c.cont <- err
722696

@@ -729,18 +703,6 @@ running:
729703
// The handshakes are done and it passed all checks.
730704
p := srv.launchPeer(c)
731705
peers[nodeID] = p
732-
// Remove from pending tracker as it became promoted to proper peer
733-
if c.flags&inboundConn != 0 {
734-
if startTime, exists := pendingInbound[nodeID]; exists {
735-
duration := time.Since(startTime)
736-
delete(pendingInbound, nodeID)
737-
srv.dialsched.inboundCompleted(nodeID)
738-
srv.log.Trace("Promoted pending inbound to peer", "id", nodeID,
739-
"handshake_duration", duration, "pending_count", len(pendingInbound))
740-
}
741-
742-
}
743-
744706
srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(),
745707
"conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name())
746708
srv.dialsched.peerAdded(c)
@@ -753,16 +715,6 @@ running:
753715
activeOutboundPeerGauge.Inc(1)
754716
}
755717
activePeerGauge.Inc(1)
756-
757-
} else {
758-
// Failed to add peer. Clean up pending tracking if it was inbound.
759-
if c.flags&inboundConn != 0 {
760-
delete(pendingInbound, nodeID)
761-
srv.dialsched.inboundCompleted(nodeID)
762-
srv.log.Trace("Removed failed pending inbound at add peer stage",
763-
"id", nodeID, "err", err)
764-
}
765-
766718
}
767719

768720
c.cont <- err
@@ -772,13 +724,6 @@ running:
772724
d := common.PrettyDuration(mclock.Now() - pd.created)
773725
nodeID := pd.ID()
774726
delete(peers, nodeID)
775-
// Remove from pending tracking if present (defensive cleanup).
776-
if _, exists := pendingInbound[nodeID]; exists {
777-
delete(pendingInbound, nodeID)
778-
srv.dialsched.inboundCompleted(nodeID)
779-
srv.log.Trace("Cleaned up pending entry on peer deletion", "id", nodeID)
780-
}
781-
782727
srv.log.Debug("Removing p2p peer", "peercount", len(peers), "id", nodeID,
783728
"duration", d, "req", pd.requested, "err", pd.err)
784729
srv.dialsched.peerRemoved(pd.rw)
@@ -812,7 +757,6 @@ running:
812757
p := <-srv.delpeer
813758
p.log.Trace("<-delpeer (spindown)")
814759
delete(peers, p.ID())
815-
delete(pendingInbound, p.ID())
816760
}
817761
}
818762

@@ -936,6 +880,11 @@ func (srv *Server) checkInboundConn(remoteIP netip.Addr) error {
936880
// or the handshakes have failed.
937881
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
938882
c := &conn{fd: fd, flags: flags, cont: make(chan error)}
883+
defer func() {
884+
if c.is(inboundConn) && c.node != nil {
885+
srv.dialsched.inboundCompleted(c.node.ID())
886+
}
887+
}()
939888
if dialDest == nil {
940889
c.transport = srv.newTransport(fd, nil)
941890
} else {

p2p/server_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -501,11 +501,10 @@ func TestServerPendingInboundRejection(t *testing.T) {
501501

502502
// Second connection should be rejected (duplicate pending inbound)
503503
err2 := srv.checkpoint(c2, srv.checkpointPostHandshake)
504-
if err2 != DiscAlreadyConnected {
505-
t.Errorf("expected DiscAlreadyConnected for duplicate pending inbound, got: %v", err2)
504+
if err2 != nil {
505+
t.Fatalf("first connection passed unexpectedly: %v", err1)
506506
}
507507

508-
t.Logf("✅ First connection accepted, second rejected with: %v", err2)
509508
}
510509

511510
// TestServerPendingInboundCleanup checks that pending inbound state is properly

0 commit comments

Comments
 (0)