Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,7 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
DenyListCIDR: slice.SplitCommaSeparated(cliCtx.StringSlice(cmd.P2PDenyList.Name)),
IPColocationWhitelist: colocationWhitelist,
EnableUPnP: cliCtx.Bool(cmd.EnableUPnPFlag.Name),
EnableAutoNAT: cliCtx.Bool(cmd.EnableAutoNATFlag.Name),
StateNotifier: b,
DB: b.db,
StateGen: b.stateGen,
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ go_library(
"@com_github_libp2p_go_libp2p//core/connmgr:go_default_library",
"@com_github_libp2p_go_libp2p//core/control:go_default_library",
"@com_github_libp2p_go_libp2p//core/crypto:go_default_library",
"@com_github_libp2p_go_libp2p//core/event:go_default_library",
"@com_github_libp2p_go_libp2p//core/host:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
Expand All @@ -101,10 +102,8 @@ go_library(
"@com_github_libp2p_go_libp2p//p2p/security/noise:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/transport/quic:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/transport/tcp:go_default_library",
"@com_github_libp2p_go_libp2p_mplex//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_libp2p_go_mplex//:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_multiformats_go_multiaddr//net:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
Expand Down Expand Up @@ -194,6 +193,7 @@ go_test(
"@com_github_libp2p_go_libp2p//:go_default_library",
"@com_github_libp2p_go_libp2p//core/connmgr:go_default_library",
"@com_github_libp2p_go_libp2p//core/crypto:go_default_library",
"@com_github_libp2p_go_libp2p//core/event:go_default_library",
"@com_github_libp2p_go_libp2p//core/host:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
type Config struct {
NoDiscovery bool
EnableUPnP bool
EnableAutoNAT bool
StaticPeerID bool
DisableLivenessCheck bool
StaticPeers []string
Expand Down
13 changes: 4 additions & 9 deletions beacon-chain/p2p/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,17 @@ import (
"crypto/ecdsa"
"fmt"
"net"
"time"

"github.com/OffchainLabs/prysm/v7/config/features"
ecdsaprysm "github.com/OffchainLabs/prysm/v7/crypto/ecdsa"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/libp2p/go-libp2p"
mplex "github.com/libp2p/go-libp2p-mplex"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/security/noise"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
libp2ptcp "github.com/libp2p/go-libp2p/p2p/transport/tcp"
gomplex "github.com/libp2p/go-mplex"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -110,7 +107,6 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op
libp2p.ConnectionGater(s),
libp2p.Transport(libp2ptcp.NewTCPTransport),
libp2p.DefaultMuxers,
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
libp2p.Security(noise.ID, noise.New),
libp2p.Ping(false), // Disable Ping Service.
}
Expand Down Expand Up @@ -162,6 +158,10 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op
options = append(options, libp2p.ResourceManager(&network.NullResourceManager{}))
}

if cfg.EnableAutoNAT {
options = append(options, libp2p.EnableAutoNATv2())
}

return options, nil
}

Expand Down Expand Up @@ -217,8 +217,3 @@ func privKeyOption(privkey *ecdsa.PrivateKey) libp2p.Option {
return cfg.Apply(libp2p.Identity(ifaceKey))
}
}

// Configures stream timeouts on mplex.
func configureMplex() {
gomplex.ResetStreamTimeout = 5 * time.Second
}
25 changes: 24 additions & 1 deletion beacon-chain/p2p/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ func TestDefaultMultiplexers(t *testing.T) {
assert.NoError(t, err)

assert.Equal(t, protocol.ID("/yamux/1.0.0"), cfg.Muxers[0].ID)
assert.Equal(t, protocol.ID("/mplex/6.7.0"), cfg.Muxers[1].ID)
}

func TestSetConnManagerOption(t *testing.T) {
Expand Down Expand Up @@ -188,6 +187,30 @@ func checkLimit(t *testing.T, cm connmgr.ConnManager, expected int) {
}
}

func TestBuildOptions_EnableAutoNAT(t *testing.T) {
params.SetupTestConfigCleanup(t)
p2pCfg := &Config{
UDPPort: 2000,
TCPPort: 3000,
QUICPort: 3000,
EnableAutoNAT: true,
StateNotifier: &mock.MockStateNotifier{},
}
svc := &Service{cfg: p2pCfg}
var err error
svc.privKey, err = privKey(svc.cfg)
require.NoError(t, err)
ipAddr := network.IPAddr()
opts, err := svc.buildOptions(ipAddr, svc.privKey)
require.NoError(t, err)

// Verify that options were built without error when EnableAutoNAT is true.
// The actual AutoNAT v2 behavior is tested by libp2p itself.
var cfg libp2p.Config
err = cfg.Apply(append(opts, libp2p.FallbackDefaults)...)
assert.NoError(t, err)
}

func TestMultiAddressBuilderWithID(t *testing.T) {
testCases := []struct {
name string
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/peers/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestPeerExplicitAdd(t *testing.T) {

resAddress, err := p.Address(id)
require.NoError(t, err)
assert.Equal(t, address, resAddress, "Unexpected address")
assert.Equal(t, address.Equal(resAddress), true, "Unexpected address")

resDirection, err := p.Direction(id)
require.NoError(t, err)
Expand All @@ -72,7 +72,7 @@ func TestPeerExplicitAdd(t *testing.T) {

resAddress2, err := p.Address(id)
require.NoError(t, err)
assert.Equal(t, address2, resAddress2, "Unexpected address")
assert.Equal(t, address2.Equal(resAddress2), true, "Unexpected address")

resDirection2, err := p.Direction(id)
require.NoError(t, err)
Expand Down
45 changes: 43 additions & 2 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -154,8 +155,6 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
return nil, errors.Wrapf(err, "failed to build p2p options")
}

// Sets mplex timeouts
configureMplex()
h, err := libp2p.New(opts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to create p2p host")
Expand Down Expand Up @@ -259,6 +258,11 @@ func (s *Service) Start() {
// current epoch.
s.RefreshPersistentSubnets()

if s.cfg.EnableAutoNAT {
s.subscribeReachabilityEvents()
log.Info("AutoNAT v2 enabled for address reachability detection")
}

// Periodic functions.
async.RunEvery(s.ctx, params.BeaconConfig().TtfbTimeoutDuration(), func() {
ensurePeerConnections(s.ctx, s.host, s.peers, relayNodes...)
Expand Down Expand Up @@ -557,3 +561,40 @@ func (s *Service) downscorePeer(peerID peer.ID, reason string) {
newScore := s.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
}

func (s *Service) subscribeReachabilityEvents() {
sub, err := s.host.EventBus().Subscribe(new(event.EvtHostReachableAddrsChanged))
if err != nil {
log.WithError(err).Error("Failed to subscribe to reachability events")
return
}

go func() {
defer func() {
if err := sub.Close(); err != nil {
log.WithError(err).Debug("Failed to close reachability event subscription")
}
}()
for {
select {
case <-s.ctx.Done():
return
case ev := <-sub.Out():
e := ev.(event.EvtHostReachableAddrsChanged)
log.WithFields(logrus.Fields{
"reachable": multiaddrsToStrings(e.Reachable),
"unreachable": multiaddrsToStrings(e.Unreachable),
"unknown": multiaddrsToStrings(e.Unknown),
}).Info("Address reachability changed")
}
}
}()
}

func multiaddrsToStrings(addrs []multiaddr.Multiaddr) []string {
strs := make([]string, len(addrs))
for i, a := range addrs {
strs[i] = a.String()
}
return strs
}
56 changes: 56 additions & 0 deletions beacon-chain/p2p/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/OffchainLabs/prysm/v7/testing/require"
prysmTime "github.com/OffchainLabs/prysm/v7/time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
noise "github.com/libp2p/go-libp2p/p2p/security/noise"
Expand Down Expand Up @@ -400,3 +401,58 @@ func TestService_connectWithPeer(t *testing.T) {
})
}
}

func TestService_SubscribeReachabilityEvents(t *testing.T) {
hook := logTest.NewGlobal()
ctx := t.Context()

h, _, _ := createHost(t, 0)
defer func() {
if err := h.Close(); err != nil {
t.Fatal(err)
}
}()

// Create service with the host
s := &Service{
ctx: ctx,
host: h,
cfg: &Config{EnableAutoNAT: true},
}

// Get an emitter for the reachability event
emitter, err := h.EventBus().Emitter(new(event.EvtHostReachableAddrsChanged))
require.NoError(t, err)
defer func() {
if err := emitter.Close(); err != nil {
t.Fatal(err)
}
}()
// Subscribe to reachability events
s.subscribeReachabilityEvents()

// Create test multiaddrs for each reachability state
reachableAddr, err := multiaddr.NewMultiaddr("/ip4/192.168.1.1/tcp/9000")
require.NoError(t, err)
unreachableAddr, err := multiaddr.NewMultiaddr("/ip4/10.0.0.1/tcp/9001")
require.NoError(t, err)
unknownAddr, err := multiaddr.NewMultiaddr("/ip4/172.16.0.1/tcp/9002")
require.NoError(t, err)

// Emit a reachability event with all address types
err = emitter.Emit(event.EvtHostReachableAddrsChanged{
Reachable: []multiaddr.Multiaddr{reachableAddr},
Unreachable: []multiaddr.Multiaddr{unreachableAddr},
Unknown: []multiaddr.Multiaddr{unknownAddr},
})
require.NoError(t, err)

// Wait for the event to be processed
time.Sleep(100 * time.Millisecond)

// Verify the log message contains all addresses
require.LogsContain(t, hook, "Address reachability changed")
require.LogsContain(t, hook, "/ip4/192.168.1.1/tcp/9000")
require.LogsContain(t, hook, "/ip4/10.0.0.1/tcp/9001")
require.LogsContain(t, hook, "/ip4/172.16.0.1/tcp/9002")
}
1 change: 0 additions & 1 deletion beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ go_library(
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p//core/protocol:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_mplex//:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
Expand Down
3 changes: 1 addition & 2 deletions beacon-chain/sync/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/OffchainLabs/prysm/v7/config/params"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/network"
multiplex "github.com/libp2p/go-mplex"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -148,7 +147,7 @@ func closeStreamAndWait(stream network.Stream, log *logrus.Entry) {
}

func isUnwantedError(err error) bool {
for _, e := range []error{network.ErrReset, multiplex.ErrShutdown, io.EOF, types.ErrIODeadline} {
for _, e := range []error{network.ErrReset, io.EOF, types.ErrIODeadline} {
if errors.Is(err, e) || err.Error() == e.Error() {
return true
}
Expand Down
3 changes: 3 additions & 0 deletions changelog/aarsh_libp2p_autonatv2_per_address.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Added

- Add support for detecting and logging per address reachability via libp2p AutoNAT v2.
1 change: 1 addition & 0 deletions cmd/beacon-chain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ var appFlags = []cli.Flag{
debug.MutexProfileFractionFlag,
cmd.LogFileName,
cmd.EnableUPnPFlag,
cmd.EnableAutoNATFlag,
cmd.ConfigFileFlag,
cmd.ChainConfigFileFlag,
cmd.GrpcMaxCallRecvMsgSizeFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/beacon-chain/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ var appHelpFlagGroups = []flagGroup{
Flags: []cli.Flag{
cmd.BootstrapNode,
cmd.EnableUPnPFlag,
cmd.EnableAutoNATFlag,
cmd.NoDiscovery,
cmd.P2PAllowList,
cmd.P2PDenyList,
Expand Down
5 changes: 5 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ var (
Name: "enable-upnp",
Usage: "Enable the service (Beacon chain or Validator) to use UPnP when possible.",
}
// EnableAutoNATFlag enables AutoNAT v2 service for per-address reachability detection.
EnableAutoNATFlag = &cli.BoolFlag{
Name: "enable-autonat",
Usage: "Enable AutoNAT v2 service for per-address reachability detection. Helps diagnose connectivity issues behind NAT/firewalls.",
}
// ConfigFileFlag specifies the filepath to load flag values.
ConfigFileFlag = &cli.StringFlag{
Name: "config-file",
Expand Down
18 changes: 12 additions & 6 deletions deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1974,8 +1974,8 @@ def prysm_deps():
],
build_file_proto_mode = "disable_global",
importpath = "github.com/libp2p/go-libp2p",
sum = "h1:1Ur6rPCf3GR+g8jkrnaQaM0ha2IGespsnNlCqJLLALE=",
version = "v0.39.1",
sum = "h1:A8foZk+ZEhZTv0Jb++7xUFlrFhBDv4j2Vh/uq4YX+KE=",
version = "v0.42.0",
)
go_repository(
name = "com_github_libp2p_go_libp2p_asn_util",
Expand Down Expand Up @@ -2038,6 +2038,12 @@ def prysm_deps():
sum = "h1:nrLh89LN/LEiqcFiqdKDRHjGstN300C1269K/EX0CPU=",
version = "v4.0.2",
)
go_repository(
name = "com_github_libp2p_go_yamux_v5",
importpath = "github.com/libp2p/go-yamux/v5",
sum = "h1:f0WoX/bEF2E8SbE4c/k1Mo+/9z0O4oC/hWEA+nfYRSg=",
version = "v5.0.1",
)
go_repository(
name = "com_github_libp2p_zeroconf_v2",
importpath = "github.com/libp2p/zeroconf/v2",
Expand Down Expand Up @@ -2323,8 +2329,8 @@ def prysm_deps():
go_repository(
name = "com_github_multiformats_go_multiaddr",
importpath = "github.com/multiformats/go-multiaddr",
sum = "h1:bfrHrJhrRuh/NXH5mCnemjpbGjzRw/b+tJFOD41g2tU=",
version = "v0.14.0",
sum = "h1:oGWEVKioVQcdIOBlYM8BH1rZDWOGJSqr9/BKl6zQ4qc=",
version = "v0.16.0",
)
go_repository(
name = "com_github_multiformats_go_multiaddr_dns",
Expand Down Expand Up @@ -2915,8 +2921,8 @@ def prysm_deps():
"gazelle:exclude tools.go",
],
importpath = "github.com/quic-go/quic-go",
sum = "h1:x09Agz4ATTMEP3qb5P0MRxNZfd6O9wAyK3qwwqQZVQc=",
version = "v0.49.1-0.20250925085836-275c172fec2b",
sum = "h1:/SlHrCRElyaU6MaEPKqKr9z83sBg2v4FLLvWM+Z47pA=",
version = "v0.52.0",
)
go_repository(
name = "com_github_quic_go_webtransport_go",
Expand Down
Loading
Loading