Skip to content

Commit 858f52c

Browse files
fix(p2p): skip addressbook and reacher for unreachable addresses (#5370)
1 parent 4f09e38 commit 858f52c

File tree

3 files changed

+170
-14
lines changed

3 files changed

+170
-14
lines changed

pkg/p2p/libp2p/connections_test.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"reflect"
1414
"strings"
1515
"sync"
16+
"sync/atomic"
1617
"testing"
1718
"time"
1819

@@ -27,13 +28,15 @@ import (
2728
"github.com/ethersphere/bee/v2/pkg/statestore/mock"
2829
"github.com/ethersphere/bee/v2/pkg/swarm"
2930
"github.com/ethersphere/bee/v2/pkg/topology/lightnode"
31+
libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto"
3032
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
3133

3234
libp2pm "github.com/libp2p/go-libp2p"
3335
"github.com/libp2p/go-libp2p/core/event"
3436
"github.com/libp2p/go-libp2p/core/host"
3537
"github.com/libp2p/go-libp2p/core/network"
3638
libp2ppeer "github.com/libp2p/go-libp2p/core/peer"
39+
"github.com/libp2p/go-libp2p/core/peerstore"
3740
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
3841
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
3942
ma "github.com/multiformats/go-multiaddr"
@@ -1485,3 +1488,122 @@ var (
14851488
noopReachability = func(p2p.ReachabilityStatus) {}
14861489
noopReachable = func(swarm.Address, p2p.ReachabilityStatus) {}
14871490
)
1491+
1492+
func TestPeerMultiaddrsNoFallback(t *testing.T) {
1493+
t.Parallel()
1494+
1495+
s1, _ := newService(t, 1, libp2pServiceOpts{})
1496+
1497+
privKey, _, err := libp2pcrypto.GenerateEd25519Key(rand.New(rand.NewSource(time.Now().UnixNano())))
1498+
if err != nil {
1499+
t.Fatal(err)
1500+
}
1501+
unknownPeerID, err := libp2ppeer.IDFromPrivateKey(privKey)
1502+
if err != nil {
1503+
t.Fatal(err)
1504+
}
1505+
1506+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
1507+
defer cancel()
1508+
1509+
addrs, err := s1.PeerMultiaddrs(ctx, unknownPeerID)
1510+
if err != nil {
1511+
t.Fatal(err)
1512+
}
1513+
if len(addrs) != 0 {
1514+
t.Fatalf("expected no addresses for unknown peer, got %v", addrs)
1515+
}
1516+
}
1517+
1518+
type emptyAddrsPeerstore struct {
1519+
peerstore.Peerstore
1520+
targetPeerID libp2ppeer.ID
1521+
mu sync.RWMutex
1522+
}
1523+
1524+
func (p *emptyAddrsPeerstore) setTarget(id libp2ppeer.ID) {
1525+
p.mu.Lock()
1526+
defer p.mu.Unlock()
1527+
p.targetPeerID = id
1528+
}
1529+
1530+
func (p *emptyAddrsPeerstore) isTarget(id libp2ppeer.ID) bool {
1531+
p.mu.RLock()
1532+
defer p.mu.RUnlock()
1533+
return p.targetPeerID == id
1534+
}
1535+
1536+
func (p *emptyAddrsPeerstore) Addrs(id libp2ppeer.ID) []ma.Multiaddr {
1537+
if p.isTarget(id) {
1538+
return nil
1539+
}
1540+
return p.Peerstore.Addrs(id)
1541+
}
1542+
1543+
func (p *emptyAddrsPeerstore) AddrStream(ctx context.Context, id libp2ppeer.ID) <-chan ma.Multiaddr {
1544+
if p.isTarget(id) {
1545+
ch := make(chan ma.Multiaddr)
1546+
go func() {
1547+
<-ctx.Done()
1548+
close(ch)
1549+
}()
1550+
return ch
1551+
}
1552+
return p.Peerstore.AddrStream(ctx, id)
1553+
}
1554+
1555+
type emptyAddrsHost struct {
1556+
host.Host
1557+
ps *emptyAddrsPeerstore
1558+
}
1559+
1560+
func (h *emptyAddrsHost) Peerstore() peerstore.Peerstore {
1561+
return h.ps
1562+
}
1563+
1564+
func TestConnectEmptyPeerstoreSkipsAddressbookAndReacher(t *testing.T) {
1565+
t.Parallel()
1566+
1567+
ab1 := addressbook.New(mock.NewStateStore())
1568+
1569+
s1, _ := newService(t, 1, libp2pServiceOpts{
1570+
Addressbook: ab1,
1571+
libp2pOpts: libp2p.Options{
1572+
FullNode: true,
1573+
},
1574+
})
1575+
1576+
psWrapper := &emptyAddrsPeerstore{Peerstore: s1.Host().Peerstore()}
1577+
1578+
var reachableCalled atomic.Bool
1579+
notifier1 := mockNotifier(noopCf, noopDf, true)
1580+
notifier1.(*notifiee).reachable = func(_ swarm.Address, _ p2p.ReachabilityStatus) {
1581+
reachableCalled.Store(true)
1582+
}
1583+
s1.SetPickyNotifier(notifier1)
1584+
1585+
s2, overlay2 := newService(t, 1, libp2pServiceOpts{
1586+
libp2pOpts: libp2p.Options{
1587+
FullNode: true,
1588+
},
1589+
})
1590+
1591+
psWrapper.setTarget(s2.Host().ID())
1592+
s1.SetHost(&emptyAddrsHost{Host: s1.Host(), ps: psWrapper})
1593+
1594+
_, err := s2.Connect(context.Background(), serviceUnderlayAddress(t, s1))
1595+
if err != nil {
1596+
t.Fatal(err)
1597+
}
1598+
1599+
expectPeersEventually(t, s1, overlay2)
1600+
1601+
_, err = ab1.Get(overlay2)
1602+
if err == nil {
1603+
t.Fatal("expected addressbook to have no entry for NAT peer, but found one")
1604+
}
1605+
1606+
if reachableCalled.Load() {
1607+
t.Fatal("expected reacher not to be notified for NAT peer")
1608+
}
1609+
}

pkg/p2p/libp2p/export_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ func (s *Service) Host() host.Host {
3232
return s.host
3333
}
3434

35+
func (s *Service) SetHost(h host.Host) {
36+
s.host = h
37+
}
38+
3539
type StaticAddressResolver = staticAddressResolver
3640

3741
var (
@@ -63,6 +67,10 @@ func (s *Service) FilterSupportedAddresses(addrs []ma.Multiaddr) []ma.Multiaddr
6367
return s.filterSupportedAddresses(addrs)
6468
}
6569

70+
func (s *Service) PeerMultiaddrs(ctx context.Context, peerID libp2ppeer.ID) ([]ma.Multiaddr, error) {
71+
return s.peerMultiaddrs(ctx, peerID)
72+
}
73+
6674
func (s *Service) SetTransportFlags(hasTCP, hasWS, hasWSS bool) {
6775
s.enabledTransports = map[bzz.TransportType]bool{
6876
bzz.TransportTCP: hasTCP,

pkg/p2p/libp2p/libp2p.go

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ func (s *Service) handleIncoming(stream network.Stream) {
584584
peerID := stream.Conn().RemotePeer()
585585
handshakeStream := newStream(stream, s.metrics)
586586

587-
peerMultiaddrs, err := s.peerMultiaddrs(s.ctx, stream.Conn().RemoteMultiaddr(), peerID)
587+
peerAddrs, err := s.peerMultiaddrs(s.ctx, peerID)
588588
if err != nil {
589589
s.logger.Debug("stream handler: handshake: build remote multiaddrs", "peer_id", peerID, "error", err)
590590
s.logger.Error(nil, "stream handler: handshake: build remote multiaddrs", "peer_id", peerID)
@@ -593,12 +593,28 @@ func (s *Service) handleIncoming(stream network.Stream) {
593593
return
594594
}
595595

596+
// For the handshake we always need an observed address (ObservedUnderlay).
597+
// If the peerstore had no addresses, fall back to RemoteMultiaddr for the
598+
// handshake only. This typically means the peer is behind NAT and its
599+
// address is not reachable from the outside.
600+
observedAddrs := peerAddrs
601+
if len(observedAddrs) == 0 {
602+
observedAddrs, err = buildFullMAs([]ma.Multiaddr{stream.Conn().RemoteMultiaddr()}, peerID)
603+
if err != nil {
604+
s.logger.Debug("stream handler: handshake: build remote multiaddrs fallback", "peer_id", peerID, "error", err)
605+
s.logger.Error(nil, "stream handler: handshake: build remote multiaddrs fallback", "peer_id", peerID)
606+
_ = handshakeStream.Reset()
607+
_ = s.host.Network().ClosePeer(peerID)
608+
return
609+
}
610+
}
611+
596612
bee260Compat := s.bee260BackwardCompatibility(peerID)
597613

598614
i, err := s.handshakeService.Handle(
599615
s.ctx,
600616
handshakeStream,
601-
peerMultiaddrs,
617+
observedAddrs,
602618
handshake.WithBee260Compatibility(bee260Compat),
603619
)
604620
if err != nil {
@@ -644,7 +660,8 @@ func (s *Service) handleIncoming(stream network.Stream) {
644660
return
645661
}
646662

647-
if i.FullNode {
663+
// Only persist in addressbook when we have real peerstore addresses.
664+
if i.FullNode && len(peerAddrs) > 0 {
648665
err = s.addressbook.Put(i.BzzAddress.Overlay, *i.BzzAddress)
649666
if err != nil {
650667
s.logger.Debug("stream handler: addressbook put error", "peer_id", peerID, "error", err)
@@ -727,7 +744,9 @@ func (s *Service) handleIncoming(stream network.Stream) {
727744
return
728745
}
729746

730-
s.notifyReacherConnected(overlay, peerMultiaddrs)
747+
if len(peerAddrs) > 0 {
748+
s.notifyReacherConnected(overlay, peerAddrs)
749+
}
731750

732751
peerUserAgent := appendSpace(s.peerUserAgent(s.ctx, peerID))
733752
s.networkStatus.Store(int32(p2p.NetworkStatusAvailable))
@@ -1029,19 +1048,29 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b
10291048

10301049
handshakeStream := newStream(stream, s.metrics)
10311050

1032-
peerMultiaddrs, err := s.peerMultiaddrs(ctx, stream.Conn().RemoteMultiaddr(), peerID)
1051+
peerAddrs, err := s.peerMultiaddrs(ctx, peerID)
10331052
if err != nil {
10341053
_ = handshakeStream.Reset()
10351054
_ = s.host.Network().ClosePeer(peerID)
10361055
return nil, fmt.Errorf("build peer multiaddrs: %w", err)
10371056
}
10381057

1058+
observedAddrs := peerAddrs
1059+
if len(observedAddrs) == 0 {
1060+
observedAddrs, err = buildFullMAs([]ma.Multiaddr{stream.Conn().RemoteMultiaddr()}, peerID)
1061+
if err != nil {
1062+
_ = handshakeStream.Reset()
1063+
_ = s.host.Network().ClosePeer(peerID)
1064+
return nil, fmt.Errorf("build peer multiaddrs fallback: %w", err)
1065+
}
1066+
}
1067+
10391068
bee260Compat := s.bee260BackwardCompatibility(peerID)
10401069

10411070
i, err := s.handshakeService.Handshake(
10421071
s.ctx,
10431072
handshakeStream,
1044-
peerMultiaddrs,
1073+
observedAddrs,
10451074
handshake.WithBee260Compatibility(bee260Compat),
10461075
)
10471076
if err != nil {
@@ -1120,7 +1149,9 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b
11201149

11211150
s.metrics.CreatedConnectionCount.Inc()
11221151

1123-
s.notifyReacherConnected(overlay, peerMultiaddrs)
1152+
if len(peerAddrs) > 0 {
1153+
s.notifyReacherConnected(overlay, peerAddrs)
1154+
}
11241155

11251156
peerUA := appendSpace(s.peerUserAgent(ctx, peerID))
11261157
loggerV1.Debug("successfully connected to peer (outbound)", "addresses", i.BzzAddress.ShortString(), "light", i.LightString(), "user_agent", peerUA)
@@ -1429,17 +1460,12 @@ func (s *Service) determineCurrentNetworkStatus(err error) error {
14291460
return err
14301461
}
14311462

1432-
// peerMultiaddrs builds full multiaddresses for a peer given information from
1433-
// the libp2p host peerstore. If the peerstore doesn't have addresses yet,
1434-
// it falls back to using the remote address from the active connection.
1435-
func (s *Service) peerMultiaddrs(ctx context.Context, remoteAddr ma.Multiaddr, peerID libp2ppeer.ID) ([]ma.Multiaddr, error) {
1463+
// peerMultiaddrs builds full multiaddresses for a peer using the peerstore.
1464+
func (s *Service) peerMultiaddrs(ctx context.Context, peerID libp2ppeer.ID) ([]ma.Multiaddr, error) {
14361465
waitPeersCtx, cancel := context.WithTimeout(ctx, peerstoreWaitAddrsTimeout)
14371466
defer cancel()
14381467

14391468
mas := waitPeerAddrs(waitPeersCtx, s.host.Peerstore(), peerID)
1440-
if len(mas) == 0 && remoteAddr != nil {
1441-
mas = []ma.Multiaddr{remoteAddr}
1442-
}
14431469

14441470
return buildFullMAs(mas, peerID)
14451471
}

0 commit comments

Comments
 (0)