Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ metaData
# execution API authentication
jwt.hex

# local execution client data
execution/

# local documentation
CLAUDE.md

# manual testing
tmp

Expand Down
1 change: 1 addition & 0 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,7 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
DenyListCIDR: slice.SplitCommaSeparated(cliCtx.StringSlice(cmd.P2PDenyList.Name)),
IPColocationWhitelist: colocationWhitelist,
EnableUPnP: cliCtx.Bool(cmd.EnableUPnPFlag.Name),
EnableAutoNAT: cliCtx.Bool(cmd.EnableAutoNATFlag.Name),
StateNotifier: b,
DB: b.db,
StateGen: b.stateGen,
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ go_library(
"@com_github_libp2p_go_libp2p//core/connmgr:go_default_library",
"@com_github_libp2p_go_libp2p//core/control:go_default_library",
"@com_github_libp2p_go_libp2p//core/crypto:go_default_library",
"@com_github_libp2p_go_libp2p//core/event:go_default_library",
"@com_github_libp2p_go_libp2p//core/host:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
Expand All @@ -101,10 +102,8 @@ go_library(
"@com_github_libp2p_go_libp2p//p2p/security/noise:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/transport/quic:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/transport/tcp:go_default_library",
"@com_github_libp2p_go_libp2p_mplex//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_libp2p_go_mplex//:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_multiformats_go_multiaddr//net:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
Expand Down Expand Up @@ -194,6 +193,7 @@ go_test(
"@com_github_libp2p_go_libp2p//:go_default_library",
"@com_github_libp2p_go_libp2p//core/connmgr:go_default_library",
"@com_github_libp2p_go_libp2p//core/crypto:go_default_library",
"@com_github_libp2p_go_libp2p//core/event:go_default_library",
"@com_github_libp2p_go_libp2p//core/host:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
type Config struct {
NoDiscovery bool
EnableUPnP bool
EnableAutoNAT bool
StaticPeerID bool
DisableLivenessCheck bool
StaticPeers []string
Expand Down
13 changes: 4 additions & 9 deletions beacon-chain/p2p/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,17 @@ import (
"crypto/ecdsa"
"fmt"
"net"
"time"

"github.com/OffchainLabs/prysm/v7/config/features"
ecdsaprysm "github.com/OffchainLabs/prysm/v7/crypto/ecdsa"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/libp2p/go-libp2p"
mplex "github.com/libp2p/go-libp2p-mplex"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/security/noise"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
libp2ptcp "github.com/libp2p/go-libp2p/p2p/transport/tcp"
gomplex "github.com/libp2p/go-mplex"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -110,7 +107,6 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op
libp2p.ConnectionGater(s),
libp2p.Transport(libp2ptcp.NewTCPTransport),
libp2p.DefaultMuxers,
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
libp2p.Security(noise.ID, noise.New),
libp2p.Ping(false), // Disable Ping Service.
}
Expand Down Expand Up @@ -162,6 +158,10 @@ func (s *Service) buildOptions(ip net.IP, priKey *ecdsa.PrivateKey) ([]libp2p.Op
options = append(options, libp2p.ResourceManager(&network.NullResourceManager{}))
}

if cfg.EnableAutoNAT {
options = append(options, libp2p.EnableAutoNATv2())
}

return options, nil
}

Expand Down Expand Up @@ -217,8 +217,3 @@ func privKeyOption(privkey *ecdsa.PrivateKey) libp2p.Option {
return cfg.Apply(libp2p.Identity(ifaceKey))
}
}

// Configures stream timeouts on mplex.
func configureMplex() {
gomplex.ResetStreamTimeout = 5 * time.Second
}
25 changes: 24 additions & 1 deletion beacon-chain/p2p/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ func TestDefaultMultiplexers(t *testing.T) {
assert.NoError(t, err)

assert.Equal(t, protocol.ID("/yamux/1.0.0"), cfg.Muxers[0].ID)
assert.Equal(t, protocol.ID("/mplex/6.7.0"), cfg.Muxers[1].ID)
}

func TestSetConnManagerOption(t *testing.T) {
Expand Down Expand Up @@ -188,6 +187,30 @@ func checkLimit(t *testing.T, cm connmgr.ConnManager, expected int) {
}
}

func TestBuildOptions_EnableAutoNAT(t *testing.T) {
params.SetupTestConfigCleanup(t)
p2pCfg := &Config{
UDPPort: 2000,
TCPPort: 3000,
QUICPort: 3000,
EnableAutoNAT: true,
StateNotifier: &mock.MockStateNotifier{},
}
svc := &Service{cfg: p2pCfg}
var err error
svc.privKey, err = privKey(svc.cfg)
require.NoError(t, err)
ipAddr := network.IPAddr()
opts, err := svc.buildOptions(ipAddr, svc.privKey)
require.NoError(t, err)

// Verify that options were built without error when EnableAutoNAT is true.
// The actual AutoNAT v2 behavior is tested by libp2p itself.
var cfg libp2p.Config
err = cfg.Apply(append(opts, libp2p.FallbackDefaults)...)
assert.NoError(t, err)
}

func TestMultiAddressBuilderWithID(t *testing.T) {
testCases := []struct {
name string
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/peers/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestPeerExplicitAdd(t *testing.T) {

resAddress, err := p.Address(id)
require.NoError(t, err)
assert.Equal(t, address, resAddress, "Unexpected address")
assert.Equal(t, address.Equal(resAddress), true, "Unexpected address")

resDirection, err := p.Direction(id)
require.NoError(t, err)
Expand All @@ -72,7 +72,7 @@ func TestPeerExplicitAdd(t *testing.T) {

resAddress2, err := p.Address(id)
require.NoError(t, err)
assert.Equal(t, address2, resAddress2, "Unexpected address")
assert.Equal(t, address2.Equal(resAddress2), true, "Unexpected address")

resDirection2, err := p.Direction(id)
require.NoError(t, err)
Expand Down
50 changes: 48 additions & 2 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package p2p
import (
"context"
"crypto/ecdsa"
"fmt"
"sync"
"time"

Expand All @@ -28,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -154,8 +156,6 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
return nil, errors.Wrapf(err, "failed to build p2p options")
}

// Sets mplex timeouts
configureMplex()
h, err := libp2p.New(opts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to create p2p host")
Expand Down Expand Up @@ -259,6 +259,14 @@ func (s *Service) Start() {
// current epoch.
s.RefreshPersistentSubnets()

if s.cfg.EnableAutoNAT {
if err := s.subscribeReachabilityEvents(); err != nil {
log.WithError(err).Error("Failed to subscribe to AutoNAT v2 reachability events")
} else {
log.Info("AutoNAT v2 enabled for address reachability detection")
}
}

// Periodic functions.
async.RunEvery(s.ctx, params.BeaconConfig().TtfbTimeoutDuration(), func() {
ensurePeerConnections(s.ctx, s.host, s.peers, relayNodes...)
Expand Down Expand Up @@ -557,3 +565,41 @@ func (s *Service) downscorePeer(peerID peer.ID, reason string) {
newScore := s.Peers().Scorers().BadResponsesScorer().Increment(peerID)
log.WithFields(logrus.Fields{"peerID": peerID, "reason": reason, "newScore": newScore}).Debug("Downscore peer")
}

func (s *Service) subscribeReachabilityEvents() error {
sub, err := s.host.EventBus().Subscribe(new(event.EvtHostReachableAddrsChanged))
if err != nil {
return fmt.Errorf("subscribing to reachability events: %w", err)
}

go func() {
defer func() {
if err := sub.Close(); err != nil {
log.WithError(err).Debug("Failed to close reachability event subscription")
}
}()
for {
select {
case <-s.ctx.Done():
return
case ev := <-sub.Out():
if event, ok := ev.(event.EvtHostReachableAddrsChanged); ok {
log.WithFields(logrus.Fields{
"reachable": multiaddrsToStrings(event.Reachable),
"unreachable": multiaddrsToStrings(event.Unreachable),
"unknown": multiaddrsToStrings(event.Unknown),
}).Info("Address reachability changed")
}
}
}
}()
return nil
}

func multiaddrsToStrings(addrs []multiaddr.Multiaddr) []string {
strs := make([]string, len(addrs))
for i, a := range addrs {
strs[i] = a.String()
}
return strs
}
56 changes: 56 additions & 0 deletions beacon-chain/p2p/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/OffchainLabs/prysm/v7/testing/require"
prysmTime "github.com/OffchainLabs/prysm/v7/time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
noise "github.com/libp2p/go-libp2p/p2p/security/noise"
Expand Down Expand Up @@ -400,3 +401,58 @@ func TestService_connectWithPeer(t *testing.T) {
})
}
}

func TestService_SubscribeReachabilityEvents(t *testing.T) {
hook := logTest.NewGlobal()
ctx := t.Context()

h, _, _ := createHost(t, 0)
defer func() {
if err := h.Close(); err != nil {
t.Fatal(err)
}
}()

// Create service with the host
s := &Service{
ctx: ctx,
host: h,
cfg: &Config{EnableAutoNAT: true},
}

// Get an emitter for the reachability event
emitter, err := h.EventBus().Emitter(new(event.EvtHostReachableAddrsChanged))
require.NoError(t, err)
defer func() {
if err := emitter.Close(); err != nil {
t.Fatal(err)
}
}()
// Subscribe to reachability events
require.NoError(t, s.subscribeReachabilityEvents())

// Create test multiaddrs for each reachability state
reachableAddr, err := multiaddr.NewMultiaddr("/ip4/192.168.1.1/tcp/9000")
require.NoError(t, err)
unreachableAddr, err := multiaddr.NewMultiaddr("/ip4/10.0.0.1/tcp/9001")
require.NoError(t, err)
unknownAddr, err := multiaddr.NewMultiaddr("/ip4/172.16.0.1/tcp/9002")
require.NoError(t, err)

// Emit a reachability event with all address types
err = emitter.Emit(event.EvtHostReachableAddrsChanged{
Reachable: []multiaddr.Multiaddr{reachableAddr},
Unreachable: []multiaddr.Multiaddr{unreachableAddr},
Unknown: []multiaddr.Multiaddr{unknownAddr},
})
require.NoError(t, err)

// Wait for the event to be processed
time.Sleep(100 * time.Millisecond)

// Verify the log message contains all addresses
require.LogsContain(t, hook, "Address reachability changed")
require.LogsContain(t, hook, "/ip4/192.168.1.1/tcp/9000")
require.LogsContain(t, hook, "/ip4/10.0.0.1/tcp/9001")
require.LogsContain(t, hook, "/ip4/172.16.0.1/tcp/9002")
}
1 change: 0 additions & 1 deletion beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ go_library(
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p//core/protocol:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_mplex//:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
Expand Down
3 changes: 1 addition & 2 deletions beacon-chain/sync/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/OffchainLabs/prysm/v7/config/params"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/network"
multiplex "github.com/libp2p/go-mplex"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -148,7 +147,7 @@ func closeStreamAndWait(stream network.Stream, log *logrus.Entry) {
}

func isUnwantedError(err error) bool {
for _, e := range []error{network.ErrReset, multiplex.ErrShutdown, io.EOF, types.ErrIODeadline} {
for _, e := range []error{network.ErrReset, io.EOF, types.ErrIODeadline} {
if errors.Is(err, e) || err.Error() == e.Error() {
return true
}
Expand Down
3 changes: 3 additions & 0 deletions changelog/aarsh_libp2p_autonatv2_per_address.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Added

- Add support for detecting and logging per address reachability via libp2p AutoNAT v2.
1 change: 1 addition & 0 deletions cmd/beacon-chain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ var appFlags = []cli.Flag{
debug.MutexProfileFractionFlag,
cmd.LogFileName,
cmd.EnableUPnPFlag,
cmd.EnableAutoNATFlag,
cmd.ConfigFileFlag,
cmd.ChainConfigFileFlag,
cmd.GrpcMaxCallRecvMsgSizeFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/beacon-chain/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ var appHelpFlagGroups = []flagGroup{
Flags: []cli.Flag{
cmd.BootstrapNode,
cmd.EnableUPnPFlag,
cmd.EnableAutoNATFlag,
cmd.NoDiscovery,
cmd.P2PAllowList,
cmd.P2PDenyList,
Expand Down
5 changes: 5 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ var (
Name: "enable-upnp",
Usage: "Enable the service (Beacon chain or Validator) to use UPnP when possible.",
}
// EnableAutoNATFlag enables AutoNAT v2 service for per-address reachability detection.
EnableAutoNATFlag = &cli.BoolFlag{
Name: "enable-autonat",
Usage: "Enable AutoNAT v2 service for per-address reachability detection. Helps diagnose connectivity issues behind NAT/firewalls.",
}
// ConfigFileFlag specifies the filepath to load flag values.
ConfigFileFlag = &cli.StringFlag{
Name: "config-file",
Expand Down
Loading
Loading