Skip to content

Commit db9f977

Browse files
feat: updated dial scheduler to track pending inbound connections
1 parent 6a7e8bc commit db9f977

File tree

2 files changed

+56
-16
lines changed

2 files changed

+56
-16
lines changed

p2p/dial.go

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ var (
7676
errSelf = errors.New("is self")
7777
errAlreadyDialing = errors.New("already dialing")
7878
errAlreadyConnected = errors.New("already connected")
79+
errPendingInbound = errors.New("peer has pending inbound connection")
7980
errRecentlyDialed = errors.New("recently dialed")
8081
errNetRestrict = errors.New("not contained in netrestrict list")
8182
errNoPort = errors.New("node does not provide TCP port")
@@ -104,12 +105,15 @@ type dialScheduler struct {
104105
remStaticCh chan *enode.Node
105106
addPeerCh chan *conn
106107
remPeerCh chan *conn
108+
addPendingCh chan enode.ID
109+
remPendingCh chan enode.ID
107110

108111
// Everything below here belongs to loop and
109112
// should only be accessed by code on the loop goroutine.
110-
dialing map[enode.ID]*dialTask // active tasks
111-
peers map[enode.ID]struct{} // all connected peers
112-
dialPeers int // current number of dialed peers
113+
dialing map[enode.ID]*dialTask // active tasks
114+
peers map[enode.ID]struct{} // all connected peers
115+
pendingInbound map[enode.ID]struct{} // in-progress inbound connections
116+
dialPeers int // current number of dialed peers
113117

114118
// The static map tracks all static dial tasks. The subset of usable static dial tasks
115119
// (i.e. those passing checkDial) is kept in staticPool. The scheduler prefers
@@ -163,19 +167,22 @@ func (cfg dialConfig) withDefaults() dialConfig {
163167
func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {
164168
cfg := config.withDefaults()
165169
d := &dialScheduler{
166-
dialConfig: cfg,
167-
historyTimer: mclock.NewAlarm(cfg.clock),
168-
setupFunc: setupFunc,
169-
dnsLookupFunc: net.DefaultResolver.LookupNetIP,
170-
dialing: make(map[enode.ID]*dialTask),
171-
static: make(map[enode.ID]*dialTask),
172-
peers: make(map[enode.ID]struct{}),
173-
doneCh: make(chan *dialTask),
174-
nodesIn: make(chan *enode.Node),
175-
addStaticCh: make(chan *enode.Node),
176-
remStaticCh: make(chan *enode.Node),
177-
addPeerCh: make(chan *conn),
178-
remPeerCh: make(chan *conn),
170+
dialConfig: cfg,
171+
historyTimer: mclock.NewAlarm(cfg.clock),
172+
setupFunc: setupFunc,
173+
dnsLookupFunc: net.DefaultResolver.LookupNetIP,
174+
dialing: make(map[enode.ID]*dialTask),
175+
static: make(map[enode.ID]*dialTask),
176+
peers: make(map[enode.ID]struct{}),
177+
pendingInbound: make(map[enode.ID]struct{}),
178+
doneCh: make(chan *dialTask),
179+
nodesIn: make(chan *enode.Node),
180+
addStaticCh: make(chan *enode.Node),
181+
remStaticCh: make(chan *enode.Node),
182+
addPeerCh: make(chan *conn),
183+
remPeerCh: make(chan *conn),
184+
addPendingCh: make(chan enode.ID),
185+
remPendingCh: make(chan enode.ID),
179186
}
180187
d.lastStatsLog = d.clock.Now()
181188
d.ctx, d.cancel = context.WithCancel(context.Background())
@@ -223,6 +230,22 @@ func (d *dialScheduler) peerRemoved(c *conn) {
223230
}
224231
}
225232

233+
// inboundPending notifies the scheduler about a pending inbound connection.
234+
func (d *dialScheduler) inboundPending(id enode.ID) {
235+
select {
236+
case d.addPendingCh <- id:
237+
case <-d.ctx.Done():
238+
}
239+
}
240+
241+
// inboundCompleted notifies the scheduler that an inbound connection completed or failed.
242+
func (d *dialScheduler) inboundCompleted(id enode.ID) {
243+
select {
244+
case d.remPendingCh <- id:
245+
case <-d.ctx.Done():
246+
}
247+
}
248+
226249
// loop is the main loop of the dialer.
227250
func (d *dialScheduler) loop(it enode.Iterator) {
228251
var (
@@ -276,6 +299,15 @@ loop:
276299
delete(d.peers, c.node.ID())
277300
d.updateStaticPool(c.node.ID())
278301

302+
case id := <-d.addPendingCh:
303+
d.pendingInbound[id] = struct{}{}
304+
d.log.Trace("Marked node as pending inbound", "id", id)
305+
306+
case id := <-d.remPendingCh:
307+
delete(d.pendingInbound, id)
308+
d.updateStaticPool(id)
309+
d.log.Trace("Unmarked node as pending inbound", "id", id)
310+
279311
case node := <-d.addStaticCh:
280312
id := node.ID()
281313
_, exists := d.static[id]
@@ -390,6 +422,9 @@ func (d *dialScheduler) checkDial(n *enode.Node) error {
390422
if _, ok := d.peers[n.ID()]; ok {
391423
return errAlreadyConnected
392424
}
425+
if _, ok := d.pendingInbound[n.ID()]; ok {
426+
return errPendingInbound
427+
}
393428
if d.netRestrict != nil && !d.netRestrict.ContainsAddr(n.IPAddr()) {
394429
return errNetRestrict
395430
}

p2p/server.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,12 +708,14 @@ running:
708708
}
709709

710710
pendingInbound[nodeID] = time.Now()
711+
srv.dialsched.inboundPending(nodeID)
711712
srv.log.Trace("Tracking pending inbound connection", "id", nodeID, "pending_count", len(pendingInbound))
712713
}
713714

714715
err := srv.postHandshakeChecks(peers, inboundCount, c)
715716
if err != nil && c.flags&inboundConn != 0 && c.flags&trustedConn == 0 {
716717
delete(pendingInbound, nodeID)
718+
srv.dialsched.inboundCompleted(nodeID)
717719
srv.log.Trace("Removed failed pending inbound connection", "id", nodeID, "err", err)
718720
}
719721
c.cont <- err
@@ -732,6 +734,7 @@ running:
732734
if startTime, exists := pendingInbound[nodeID]; exists {
733735
duration := time.Since(startTime)
734736
delete(pendingInbound, nodeID)
737+
srv.dialsched.inboundCompleted(nodeID)
735738
srv.log.Trace("Promoted pending inbound to peer", "id", nodeID,
736739
"handshake_duration", duration, "pending_count", len(pendingInbound))
737740
}
@@ -755,6 +758,7 @@ running:
755758
// Failed to add peer. Clean up pending tracking if it was inbound.
756759
if c.flags&inboundConn != 0 {
757760
delete(pendingInbound, nodeID)
761+
srv.dialsched.inboundCompleted(nodeID)
758762
srv.log.Trace("Removed failed pending inbound at add peer stage",
759763
"id", nodeID, "err", err)
760764
}
@@ -771,6 +775,7 @@ running:
771775
// Remove from pending tracking if present (defensive cleanup).
772776
if _, exists := pendingInbound[nodeID]; exists {
773777
delete(pendingInbound, nodeID)
778+
srv.dialsched.inboundCompleted(nodeID)
774779
srv.log.Trace("Cleaned up pending entry on peer deletion", "id", nodeID)
775780
}
776781

0 commit comments

Comments
 (0)