Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ metaData
# execution API authentication
jwt.hex

# local execution client data
execution/

# local documentation
CLAUDE.md

# manual testing
tmp

Expand Down
1 change: 1 addition & 0 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,7 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
DenyListCIDR: slice.SplitCommaSeparated(cliCtx.StringSlice(cmd.P2PDenyList.Name)),
IPColocationWhitelist: colocationWhitelist,
EnableUPnP: cliCtx.Bool(cmd.EnableUPnPFlag.Name),
EnableAutoNAT: cliCtx.Bool(cmd.EnableAutoNATFlag.Name),
StateNotifier: b,
DB: b.db,
StateGen: b.stateGen,
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ go_library(
"@com_github_libp2p_go_libp2p//core/connmgr:go_default_library",
"@com_github_libp2p_go_libp2p//core/control:go_default_library",
"@com_github_libp2p_go_libp2p//core/crypto:go_default_library",
"@com_github_libp2p_go_libp2p//core/event:go_default_library",
"@com_github_libp2p_go_libp2p//core/host:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
Expand All @@ -101,10 +102,8 @@ go_library(
"@com_github_libp2p_go_libp2p//p2p/security/noise:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/transport/quic:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/transport/tcp:go_default_library",
"@com_github_libp2p_go_libp2p_mplex//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_libp2p_go_mplex//:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_multiformats_go_multiaddr//net:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
Expand Down Expand Up @@ -194,6 +193,7 @@ go_test(
"@com_github_libp2p_go_libp2p//:go_default_library",
"@com_github_libp2p_go_libp2p//core/connmgr:go_default_library",
"@com_github_libp2p_go_libp2p//core/crypto:go_default_library",
"@com_github_libp2p_go_libp2p//core/event:go_default_library",
"@com_github_libp2p_go_libp2p//core/host:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
type Config struct {
NoDiscovery bool
EnableUPnP bool
EnableAutoNAT bool
StaticPeerID bool
DisableLivenessCheck bool
StaticPeers []string
Expand Down
13 changes: 4 additions & 9 deletions beacon-chain/p2p/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,17 @@ import (
"crypto/ecdsa"
"fmt"
"net"
"time"

"github.com/OffchainLabs/prysm/v7/config/features"
ecdsaprysm "github.com/OffchainLabs/prysm/v7/crypto/ecdsa"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/libp2p/go-libp2p"
mplex "github.com/libp2p/go-libp2p-mplex"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/security/noise"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
libp2ptcp "github.com/libp2p/go-libp2p/p2p/transport/tcp"
gomplex "github.com/libp2p/go-mplex"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -110,7 +107,6 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op
libp2p.ConnectionGater(s),
libp2p.Transport(libp2ptcp.NewTCPTransport),
libp2p.DefaultMuxers,
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
libp2p.Security(noise.ID, noise.New),
libp2p.Ping(false), // Disable Ping Service.
}
Expand Down Expand Up @@ -162,6 +158,10 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op
options = append(options, libp2p.ResourceManager(&network.NullResourceManager{}))
}

if cfg.EnableAutoNAT {
options = append(options, libp2p.EnableAutoNATv2())
}

return options, nil
}

Expand Down Expand Up @@ -217,8 +217,3 @@ func privKeyOption(privkey *ecdsa.PrivateKey) libp2p.Option {
return cfg.Apply(libp2p.Identity(ifaceKey))
}
}

// Configures stream timeouts on mplex.
func configureMplex() {
gomplex.ResetStreamTimeout = 5 * time.Second
}
25 changes: 24 additions & 1 deletion beacon-chain/p2p/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ func TestDefaultMultiplexers(t *testing.T) {
assert.NoError(t, err)

assert.Equal(t, protocol.ID("/yamux/1.0.0"), cfg.Muxers[0].ID)
assert.Equal(t, protocol.ID("/mplex/6.7.0"), cfg.Muxers[1].ID)
}

func TestSetConnManagerOption(t *testing.T) {
Expand Down Expand Up @@ -188,6 +187,30 @@ func checkLimit(t *testing.T, cm connmgr.ConnManager, expected int) {
}
}

func TestBuildOptions_EnableAutoNAT(t *testing.T) {
params.SetupTestConfigCleanup(t)
p2pCfg := &Config{
UDPPort: 2000,
TCPPort: 3000,
QUICPort: 3000,
EnableAutoNAT: true,
StateNotifier: &mock.MockStateNotifier{},
}
svc := &Service{cfg: p2pCfg}
var err error
svc.privKey, err = privKey(svc.cfg)
require.NoError(t, err)
ipAddr := network.IPAddr()
opts, err := svc.buildOptions(ipAddr, svc.privKey)
require.NoError(t, err)

// Verify that options were built without error when EnableAutoNAT is true.
// The actual AutoNAT v2 behavior is tested by libp2p itself.
var cfg libp2p.Config
err = cfg.Apply(append(opts, libp2p.FallbackDefaults)...)
assert.NoError(t, err)
}

func TestMultiAddressBuilderWithID(t *testing.T) {
testCases := []struct {
name string
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/peers/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestPeerExplicitAdd(t *testing.T) {

resAddress, err := p.Address(id)
require.NoError(t, err)
assert.Equal(t, address, resAddress, "Unexpected address")
assert.Equal(t, address.Equal(resAddress), true, "Unexpected address")

resDirection, err := p.Direction(id)
require.NoError(t, err)
Expand All @@ -72,7 +72,7 @@ func TestPeerExplicitAdd(t *testing.T) {

resAddress2, err := p.Address(id)
require.NoError(t, err)
assert.Equal(t, address2, resAddress2, "Unexpected address")
assert.Equal(t, address2.Equal(resAddress2), true, "Unexpected address")

resDirection2, err := p.Direction(id)
require.NoError(t, err)
Expand Down
47 changes: 45 additions & 2 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -154,8 +155,6 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
return nil, errors.Wrapf(err, "failed to build p2p options")
}

// Sets mplex timeouts
configureMplex()
h, err := libp2p.New(opts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to create p2p host")
Expand Down Expand Up @@ -259,6 +258,11 @@ func (s *Service) Start() {
// current epoch.
s.RefreshPersistentSubnets()

if s.cfg.EnableAutoNAT {
s.subscribeReachabilityEvents()
log.Info("AutoNAT v2 enabled for address reachability detection")
}

// Periodic functions.
async.RunEvery(s.ctx, params.BeaconConfig().TtfbTimeoutDuration(), func() {
ensurePeerConnections(s.ctx, s.host, s.peers, relayNodes...)
Expand Down Expand Up @@ -557,3 +561,42 @@ func (s *Service) downscorePeer(peerID peer.ID, reason string) {
newScore := s.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
}

func (s *Service) subscribeReachabilityEvents() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please return an error an let the caller decide what to do instead of logging an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

sub, err := s.host.EventBus().Subscribe(new(event.EvtHostReachableAddrsChanged))
if err != nil {
log.WithError(err).Error("Failed to subscribe to reachability events")
return
}

go func() {
defer func() {
if err := sub.Close(); err != nil {
log.WithError(err).Debug("Failed to close reachability event subscription")
}
}()
for {
select {
case <-s.ctx.Done():
return
case ev := <-sub.Out():

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why an empty line here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if event, ok := ev.(event.EvtHostReachableAddrsChanged); ok {
log.WithFields(logrus.Fields{
"reachable": multiaddrsToStrings(event.Reachable),
"unreachable": multiaddrsToStrings(event.Unreachable),
"unknown": multiaddrsToStrings(event.Unknown),
}).Info("Address reachability changed")
}
}
}
}()
}

func multiaddrsToStrings(addrs []multiaddr.Multiaddr) []string {
strs := make([]string, len(addrs))
for i, a := range addrs {
strs[i] = a.String()
}
return strs
}
56 changes: 56 additions & 0 deletions beacon-chain/p2p/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/OffchainLabs/prysm/v7/testing/require"
prysmTime "github.com/OffchainLabs/prysm/v7/time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
noise "github.com/libp2p/go-libp2p/p2p/security/noise"
Expand Down Expand Up @@ -400,3 +401,58 @@ func TestService_connectWithPeer(t *testing.T) {
})
}
}

func TestService_SubscribeReachabilityEvents(t *testing.T) {
hook := logTest.NewGlobal()
ctx := t.Context()

h, _, _ := createHost(t, 0)
defer func() {
if err := h.Close(); err != nil {
t.Fatal(err)
}
}()

// Create service with the host
s := &Service{
ctx: ctx,
host: h,
cfg: &Config{EnableAutoNAT: true},
}

// Get an emitter for the reachability event
emitter, err := h.EventBus().Emitter(new(event.EvtHostReachableAddrsChanged))
require.NoError(t, err)
defer func() {
if err := emitter.Close(); err != nil {
t.Fatal(err)
}
}()
// Subscribe to reachability events
s.subscribeReachabilityEvents()

// Create test multiaddrs for each reachability state
reachableAddr, err := multiaddr.NewMultiaddr("/ip4/192.168.1.1/tcp/9000")
require.NoError(t, err)
unreachableAddr, err := multiaddr.NewMultiaddr("/ip4/10.0.0.1/tcp/9001")
require.NoError(t, err)
unknownAddr, err := multiaddr.NewMultiaddr("/ip4/172.16.0.1/tcp/9002")
require.NoError(t, err)

// Emit a reachability event with all address types
err = emitter.Emit(event.EvtHostReachableAddrsChanged{
Reachable: []multiaddr.Multiaddr{reachableAddr},
Unreachable: []multiaddr.Multiaddr{unreachableAddr},
Unknown: []multiaddr.Multiaddr{unknownAddr},
})
require.NoError(t, err)

// Wait for the event to be processed
time.Sleep(100 * time.Millisecond)

// Verify the log message contains all addresses
require.LogsContain(t, hook, "Address reachability changed")
require.LogsContain(t, hook, "/ip4/192.168.1.1/tcp/9000")
require.LogsContain(t, hook, "/ip4/10.0.0.1/tcp/9001")
require.LogsContain(t, hook, "/ip4/172.16.0.1/tcp/9002")
}
1 change: 0 additions & 1 deletion beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ go_library(
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p//core/protocol:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_mplex//:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
Expand Down
3 changes: 1 addition & 2 deletions beacon-chain/sync/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/OffchainLabs/prysm/v7/config/params"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/network"
multiplex "github.com/libp2p/go-mplex"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -148,7 +147,7 @@ func closeStreamAndWait(stream network.Stream, log *logrus.Entry) {
}

func isUnwantedError(err error) bool {
for _, e := range []error{network.ErrReset, multiplex.ErrShutdown, io.EOF, types.ErrIODeadline} {
for _, e := range []error{network.ErrReset, io.EOF, types.ErrIODeadline} {
if errors.Is(err, e) || err.Error() == e.Error() {
return true
}
Expand Down
3 changes: 3 additions & 0 deletions changelog/aarsh_libp2p_autonatv2_per_address.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Added

- Add support for detecting and logging per address reachability via libp2p AutoNAT v2.
1 change: 1 addition & 0 deletions cmd/beacon-chain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ var appFlags = []cli.Flag{
debug.MutexProfileFractionFlag,
cmd.LogFileName,
cmd.EnableUPnPFlag,
cmd.EnableAutoNATFlag,
cmd.ConfigFileFlag,
cmd.ChainConfigFileFlag,
cmd.GrpcMaxCallRecvMsgSizeFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/beacon-chain/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ var appHelpFlagGroups = []flagGroup{
Flags: []cli.Flag{
cmd.BootstrapNode,
cmd.EnableUPnPFlag,
cmd.EnableAutoNATFlag,
cmd.NoDiscovery,
cmd.P2PAllowList,
cmd.P2PDenyList,
Expand Down
5 changes: 5 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ var (
Name: "enable-upnp",
Usage: "Enable the service (Beacon chain or Validator) to use UPnP when possible.",
}
// EnableAutoNATFlag enables AutoNAT v2 service for per-address reachability detection.
EnableAutoNATFlag = &cli.BoolFlag{
Name: "enable-autonat",
Usage: "Enable AutoNAT v2 service for per-address reachability detection. Helps diagnose connectivity issues behind NAT/firewalls.",
}
// ConfigFileFlag specifies the filepath to load flag values.
ConfigFileFlag = &cli.StringFlag{
Name: "config-file",
Expand Down
Loading
Loading