Skip to content

Commit ffb7104

Browse files
fix(p2p): update peer address on reconnect and optimize reacher (#5348)
1 parent b2f6762 commit ffb7104

File tree

3 files changed

+241
-35
lines changed

3 files changed

+241
-35
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright 2026 The Swarm Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package reacher
6+
7+
// peerHeap is a min-heap of peers ordered by retryAfter time.
8+
type peerHeap []*peer
9+
10+
func (h peerHeap) Len() int { return len(h) }
11+
func (h peerHeap) Less(i, j int) bool { return h[i].retryAfter.Before(h[j].retryAfter) }
12+
func (h peerHeap) Swap(i, j int) {
13+
h[i], h[j] = h[j], h[i]
14+
h[i].index = i
15+
h[j].index = j
16+
}
17+
18+
func (h *peerHeap) Push(x any) {
19+
n := len(*h)
20+
p := x.(*peer)
21+
p.index = n
22+
*h = append(*h, p)
23+
}
24+
25+
func (h *peerHeap) Pop() any {
26+
old := *h
27+
n := len(old)
28+
p := old[n-1]
29+
old[n-1] = nil // avoid memory leak
30+
p.index = -1 // for safety
31+
*h = old[0 : n-1]
32+
return p
33+
}

pkg/p2p/libp2p/internal/reacher/reacher.go

Lines changed: 46 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package reacher
88

99
import (
10+
"container/heap"
1011
"context"
1112
"sync"
1213
"time"
@@ -27,11 +28,13 @@ type peer struct {
2728
overlay swarm.Address
2829
addr ma.Multiaddr
2930
retryAfter time.Time
31+
index int // index in the heap
3032
}
3133

3234
type reacher struct {
33-
mu sync.Mutex
34-
peers map[string]*peer
35+
mu sync.Mutex
36+
peerHeap peerHeap // min-heap ordered by retryAfter
37+
peerIndex map[string]*peer // lookup by overlay for O(1) access
3538

3639
newPeer chan struct{}
3740
quit chan struct{}
@@ -53,12 +56,13 @@ type Options struct {
5356

5457
func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options, log log.Logger) *reacher {
5558
r := &reacher{
56-
newPeer: make(chan struct{}, 1),
57-
quit: make(chan struct{}),
58-
pinger: streamer,
59-
peers: make(map[string]*peer),
60-
notifier: notifier,
61-
logger: log.WithName("reacher").Register(),
59+
newPeer: make(chan struct{}, 1),
60+
quit: make(chan struct{}),
61+
pinger: streamer,
62+
peerHeap: make(peerHeap, 0),
63+
peerIndex: make(map[string]*peer),
64+
notifier: notifier,
65+
logger: log.WithName("reacher").Register(),
6266
}
6367

6468
if o == nil {
@@ -79,7 +83,7 @@ func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options, log lo
7983
func (r *reacher) manage() {
8084
defer r.wg.Done()
8185

82-
c := make(chan *peer)
86+
c := make(chan peer)
8387
defer close(c)
8488

8589
ctx, cancel := context.WithCancel(context.Background())
@@ -92,7 +96,7 @@ func (r *reacher) manage() {
9296

9397
for {
9498

95-
p, tryAfter := r.tryAcquirePeer()
99+
p, ok, tryAfter := r.tryAcquirePeer()
96100

97101
// if no peer is returned,
98102
// wait until either more work or the closest retry-after time.
@@ -110,7 +114,7 @@ func (r *reacher) manage() {
110114
}
111115

112116
// wait for work
113-
if p == nil {
117+
if !ok {
114118
select {
115119
case <-r.quit:
116120
return
@@ -128,7 +132,7 @@ func (r *reacher) manage() {
128132
}
129133
}
130134

131-
func (r *reacher) ping(c chan *peer, ctx context.Context) {
135+
func (r *reacher) ping(c chan peer, ctx context.Context) {
132136
defer r.wg.Done()
133137
for p := range c {
134138
func() {
@@ -146,38 +150,34 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) {
146150
}
147151
}
148152

149-
func (r *reacher) tryAcquirePeer() (*peer, time.Duration) {
153+
func (r *reacher) tryAcquirePeer() (peer, bool, time.Duration) {
150154
r.mu.Lock()
151155
defer r.mu.Unlock()
152156

153-
var (
154-
now = time.Now()
155-
nextClosest time.Time
156-
)
157+
if len(r.peerHeap) == 0 {
158+
return peer{}, false, 0
159+
}
157160

158-
for _, p := range r.peers {
161+
now := time.Now()
159162

160-
// retry after has expired, retry
161-
if now.After(p.retryAfter) {
162-
p.retryAfter = time.Now().Add(r.options.RetryAfterDuration)
163-
return p, 0
164-
}
163+
// Peek at the peer with the earliest retryAfter
164+
p := r.peerHeap[0]
165165

166-
// here, we find the peer with the earliest retry after
167-
if nextClosest.IsZero() || p.retryAfter.Before(nextClosest) {
168-
nextClosest = p.retryAfter
169-
}
166+
// If retryAfter has not expired, return time to wait
167+
if now.Before(p.retryAfter) {
168+
return peer{}, false, time.Until(p.retryAfter)
170169
}
171170

172-
if nextClosest.IsZero() {
173-
return nil, 0
174-
}
171+
// Update retryAfter and fix heap position
172+
p.retryAfter = time.Now().Add(r.options.RetryAfterDuration)
173+
heap.Fix(&r.peerHeap, p.index)
175174

176-
// return the time to wait until the closest retry after
177-
return nil, time.Until(nextClosest)
175+
// Return a copy so callers can read fields without holding the lock.
176+
return *p, true, 0
178177
}
179178

180179
// Connected adds a new peer to the queue for testing reachability.
180+
// If the peer already exists, its address is updated.
181181
func (r *reacher) Connected(overlay swarm.Address, addr ma.Multiaddr) {
182182
if addr == nil {
183183
return
@@ -186,8 +186,15 @@ func (r *reacher) Connected(overlay swarm.Address, addr ma.Multiaddr) {
186186
r.mu.Lock()
187187
defer r.mu.Unlock()
188188

189-
if _, ok := r.peers[overlay.ByteString()]; !ok {
190-
r.peers[overlay.ByteString()] = &peer{overlay: overlay, addr: addr}
189+
key := overlay.ByteString()
190+
if existing, ok := r.peerIndex[key]; ok {
191+
existing.addr = addr // Update address for reconnecting peer
192+
existing.retryAfter = time.Time{} // Reset to trigger immediate re-ping
193+
heap.Fix(&r.peerHeap, existing.index)
194+
} else {
195+
p := &peer{overlay: overlay, addr: addr}
196+
r.peerIndex[key] = p
197+
heap.Push(&r.peerHeap, p)
191198
}
192199

193200
select {
@@ -201,7 +208,11 @@ func (r *reacher) Disconnected(overlay swarm.Address) {
201208
r.mu.Lock()
202209
defer r.mu.Unlock()
203210

204-
delete(r.peers, overlay.ByteString())
211+
key := overlay.ByteString()
212+
if p, ok := r.peerIndex[key]; ok {
213+
heap.Remove(&r.peerHeap, p.index)
214+
delete(r.peerIndex, key)
215+
}
205216
}
206217

207218
// Close stops the worker. Must be called once.

pkg/p2p/libp2p/internal/reacher/reacher_test.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package reacher_test
77
import (
88
"context"
99
"errors"
10+
"sync"
1011
"testing"
1112
"time"
1213

@@ -127,6 +128,167 @@ func TestDisconnected(t *testing.T) {
127128
})
128129
}
129130

131+
func TestAddressUpdateOnReconnect(t *testing.T) {
132+
t.Parallel()
133+
134+
synctest.Test(t, func(t *testing.T) {
135+
// Use 1 worker and a known retry duration to make timing deterministic.
136+
options := reacher.Options{
137+
PingTimeout: time.Second * 5,
138+
Workers: 1,
139+
RetryAfterDuration: time.Minute,
140+
}
141+
142+
overlay := swarm.RandAddress(t)
143+
oldAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/7071/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb")
144+
newAddr, _ := ma.NewMultiaddr("/ip4/192.168.1.1/tcp/7072/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb")
145+
146+
var pingsMu sync.Mutex
147+
var pings []ma.Multiaddr
148+
pinged := make(chan struct{}, 8)
149+
150+
pingFunc := func(_ context.Context, a ma.Multiaddr) (time.Duration, error) {
151+
pingsMu.Lock()
152+
pings = append(pings, a)
153+
pingsMu.Unlock()
154+
pinged <- struct{}{}
155+
return 0, nil
156+
}
157+
158+
reachableFunc := func(addr swarm.Address, status p2p.ReachabilityStatus) {}
159+
160+
mock := newMock(pingFunc, reachableFunc)
161+
162+
r := reacher.New(mock, mock, &options, log.Noop)
163+
testutil.CleanupCloser(t, r)
164+
165+
// First connection with old address – triggers initial ping.
166+
r.Connected(overlay, oldAddr)
167+
168+
select {
169+
case <-time.After(time.Second * 10):
170+
t.Fatal("timed out waiting for initial ping")
171+
case <-pinged:
172+
}
173+
174+
// Verify old address was pinged first.
175+
pingsMu.Lock()
176+
if len(pings) != 1 {
177+
t.Fatalf("expected 1 ping after initial connect, got %d", len(pings))
178+
}
179+
if !pings[0].Equal(oldAddr) {
180+
t.Fatalf("first ping should use old address, got %s", pings[0])
181+
}
182+
pingsMu.Unlock()
183+
184+
// Reconnect with a new address — should trigger immediate re-ping.
185+
r.Connected(overlay, newAddr)
186+
187+
select {
188+
case <-time.After(time.Second * 10):
189+
t.Fatal("timed out waiting for reconnect ping")
190+
case <-pinged:
191+
}
192+
193+
// Verify the reconnect pinged the new address.
194+
pingsMu.Lock()
195+
if len(pings) != 2 {
196+
t.Fatalf("expected 2 pings after reconnect, got %d", len(pings))
197+
}
198+
if !pings[1].Equal(newAddr) {
199+
t.Fatalf("reconnect ping should use new address, got %s", pings[1])
200+
}
201+
pingsMu.Unlock()
202+
203+
// Advance time past the retry duration — should trigger a scheduled re-ping.
204+
time.Sleep(options.RetryAfterDuration + time.Second)
205+
206+
select {
207+
case <-time.After(time.Second * 10):
208+
t.Fatal("timed out waiting for scheduled re-ping")
209+
case <-pinged:
210+
}
211+
212+
// Verify the scheduled re-ping used the new address.
213+
pingsMu.Lock()
214+
if len(pings) != 3 {
215+
t.Fatalf("expected 3 pings after retry duration, got %d", len(pings))
216+
}
217+
if !pings[2].Equal(newAddr) {
218+
t.Fatalf("scheduled re-ping should use new address, got %s", pings[2])
219+
}
220+
pingsMu.Unlock()
221+
})
222+
}
223+
224+
func TestHeapOrdering(t *testing.T) {
225+
t.Parallel()
226+
227+
synctest.Test(t, func(t *testing.T) {
228+
// Use single worker to ensure sequential processing
229+
options := reacher.Options{
230+
PingTimeout: time.Second * 5,
231+
Workers: 1,
232+
RetryAfterDuration: time.Second * 10,
233+
}
234+
235+
var pingOrder []swarm.Address
236+
var pingOrderMu sync.Mutex
237+
allPinged := make(chan struct{})
238+
239+
overlay1 := swarm.RandAddress(t)
240+
overlay2 := swarm.RandAddress(t)
241+
overlay3 := swarm.RandAddress(t)
242+
addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/7071/p2p/16Uiu2HAmTBuJT9LvNmBiQiNoTsxE5mtNy6YG3paw79m94CRa9sRb")
243+
244+
pingFunc := func(_ context.Context, _ ma.Multiaddr) (time.Duration, error) {
245+
return 0, nil
246+
}
247+
248+
reachableFunc := func(overlay swarm.Address, status p2p.ReachabilityStatus) {
249+
pingOrderMu.Lock()
250+
pingOrder = append(pingOrder, overlay)
251+
if len(pingOrder) == 3 {
252+
close(allPinged)
253+
}
254+
pingOrderMu.Unlock()
255+
}
256+
257+
mock := newMock(pingFunc, reachableFunc)
258+
259+
r := reacher.New(mock, mock, &options, log.Noop)
260+
testutil.CleanupCloser(t, r)
261+
262+
// Add peers - they should all be pinged since retryAfter starts at zero
263+
r.Connected(overlay1, addr)
264+
r.Connected(overlay2, addr)
265+
r.Connected(overlay3, addr)
266+
267+
select {
268+
case <-time.After(time.Second * 5):
269+
t.Fatalf("test timed out, only %d peers pinged", len(pingOrder))
270+
case <-allPinged:
271+
}
272+
273+
// Verify all three peers were pinged
274+
pingOrderMu.Lock()
275+
defer pingOrderMu.Unlock()
276+
277+
if len(pingOrder) != 3 {
278+
t.Fatalf("expected 3 peers pinged, got %d", len(pingOrder))
279+
}
280+
281+
// Verify all overlays are present (order may vary due to heap with same retryAfter)
282+
seen := make(map[string]bool)
283+
for _, o := range pingOrder {
284+
seen[o.String()] = true
285+
}
286+
if !seen[overlay1.String()] || !seen[overlay2.String()] || !seen[overlay3.String()] {
287+
t.Fatalf("not all peers were pinged")
288+
}
289+
})
290+
}
291+
130292
type mock struct {
131293
pingFunc func(context.Context, ma.Multiaddr) (time.Duration, error)
132294
reachableFunc func(swarm.Address, p2p.ReachabilityStatus)

0 commit comments

Comments
 (0)