From f1a869a8f100ec105e4f11e13e42f7815988310a Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Tue, 9 Dec 2025 12:55:14 +0100 Subject: [PATCH 1/9] fix(libp2p): consolidate AddrsFactory logic for all transports --- go.mod | 2 +- go.sum | 4 ++-- pkg/p2p/libp2p/libp2p.go | 28 +++++++++++++++++----------- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index e629193a4e4..0dd9e214275 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/ethereum/go-ethereum v1.15.11 github.com/ethersphere/batch-archive v0.0.4 github.com/ethersphere/go-price-oracle-abi v0.6.9 - github.com/ethersphere/go-storage-incentives-abi v0.9.4 + github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4 github.com/ethersphere/go-sw3-abi v0.6.9 github.com/ethersphere/langos v1.0.0 github.com/go-playground/validator/v10 v10.11.1 diff --git a/go.sum b/go.sum index b2918edd460..ce118f3ca08 100644 --- a/go.sum +++ b/go.sum @@ -239,8 +239,8 @@ github.com/ethersphere/batch-archive v0.0.4 h1:PHmwQfmUEyDJgoX2IqI/R0alQ63+aLPXf github.com/ethersphere/batch-archive v0.0.4/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= github.com/ethersphere/go-price-oracle-abi v0.6.9 h1:bseen6he3PZv5GHOm+KD6s4awaFmVSD9LFx+HpB6rCU= github.com/ethersphere/go-price-oracle-abi v0.6.9/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk= -github.com/ethersphere/go-storage-incentives-abi v0.9.4 h1:mSIWXQXg5OQmH10QvXMV5w0vbSibFMaRlBL37gPLTM0= -github.com/ethersphere/go-storage-incentives-abi v0.9.4/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc= +github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4 h1:YK9FpiQz29ctU5V46CuwMt+4X5Xn8FTBwy6E2v/ix8s= +github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc= github.com/ethersphere/go-sw3-abi v0.6.9 h1:TnWLnYkWE5UvC17mQBdUmdkzhPhO8GcqvWy4wvd1QJQ= github.com/ethersphere/go-sw3-abi v0.6.9/go.mod h1:BmpsvJ8idQZdYEtWnvxA8POYQ8Rl/NhyCdF0zLMOOJU= github.com/ethersphere/langos v1.0.0 h1:NBtNKzXTTRSue95uOlzPN4py7Aofs0xWPzyj4AI1Vcc= diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 0a26f7b287d..a8137040c78 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -391,13 +391,20 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay if o.EnableWSS { wsOpt := ws.WithTLSConfig(certManager.TLSConfig()) transports = append(transports, libp2p.Transport(ws.New, wsOpt)) - // AddrsFactory takes the multiaddrs we're listening on and sets the multiaddrs to advertise to the network. - // We use the AutoTLS address factory so that the `*` in the AutoTLS address string is replaced with the - // actual IP address of the host once detected - certManagerAddressFactory := certManager.AddressFactory() - opts = append(opts, libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { - addrs = includeNatResolvedAddresses(addrs, newCompositeAddressResolver(tcpResolver, wssResolver), logger) + } + + if o.EnableWS { + transports = append(transports, libp2p.Transport(ws.New)) + } + + opts = append(opts, libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { + addrs = includeNatResolvedAddresses(addrs, newCompositeAddressResolver(tcpResolver, wssResolver), logger) + if o.EnableWSS { + // AddrsFactory takes the multiaddrs we're listening on and sets the multiaddrs to advertise to the network. + // We use the AutoTLS address factory so that the `*` in the AutoTLS address string is replaced with the + // actual IP address of the host once detected + certManagerAddressFactory := certManager.AddressFactory() addrs = certManagerAddressFactory(addrs) slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int { @@ -413,11 +420,10 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay } return 0 }) - return addrs - })) - } else if o.EnableWS { - transports = append(transports, libp2p.Transport(ws.New)) - } + } + + return addrs + })) opts = append(opts, transports...) From 2f10842a3536abfa5795e774e5f2610c5efef436 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Tue, 9 Dec 2025 17:48:52 +0100 Subject: [PATCH 2/9] fix(libp2p): increase ping timeout to 5s --- pkg/p2p/libp2p/libp2p.go | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index a8137040c78..a98af640761 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -388,37 +388,34 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay wssResolver = r } + // Add WebSocket transport(s) based on configuration if o.EnableWSS { wsOpt := ws.WithTLSConfig(certManager.TLSConfig()) transports = append(transports, libp2p.Transport(ws.New, wsOpt)) - } - - if o.EnableWS { + } else if o.EnableWS { transports = append(transports, libp2p.Transport(ws.New)) } opts = append(opts, libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { + // Always include NAT-resolved addresses (both cases use the same resolver logic addrs = includeNatResolvedAddresses(addrs, newCompositeAddressResolver(tcpResolver, wssResolver), logger) + // Only apply cert manager address rewriting when WSS is enabled if o.EnableWSS { - // AddrsFactory takes the multiaddrs we're listening on and sets the multiaddrs to advertise to the network. - // We use the AutoTLS address factory so that the `*` in the AutoTLS address string is replaced with the - // actual IP address of the host once detected certManagerAddressFactory := certManager.AddressFactory() addrs = certManagerAddressFactory(addrs) + // Sort to prioritize public addresses (only meaningful with WSS, but harmless otherwise) slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int { aPub := manet.IsPublicAddr(a) bPub := manet.IsPublicAddr(b) - switch { - case aPub == bPub: + if aPub == bPub { return 0 - case aPub: + } + if aPub { return -1 - case bPub: - return 1 } - return 0 + return 1 }) } @@ -485,6 +482,7 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay return nil, fmt.Errorf("handshake service: %w", err) } + // TODO: check if it needs to have own isolated peerstore with dedicated resource manager? // Create a new dialer for libp2p ping protocol. This ensures that the protocol // uses a different set of keys to do ping. It prevents inconsistencies in peerstore as // the addresses used are not dialable and hence should be cleaned up. We should create @@ -1082,9 +1080,10 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b return nil, fmt.Errorf("connect full close %w", err) } + // TODO: do we need to ping here? the handshake already verifies liveness? var pingErr error for _, addr := range addrs { - pingCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) _, err := s.Ping(pingCtx, addr) cancel() // Cancel immediately after use if err == nil { From c5fbf5a9d4d19d19188645b6c3723140b404685b Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Wed, 10 Dec 2025 11:58:41 +0100 Subject: [PATCH 3/9] fix(libp2p): remove ping after handshake --- pkg/p2p/libp2p/libp2p.go | 32 -------------------------------- 1 file changed, 32 deletions(-) diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index a98af640761..3166f629ed5 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -388,7 +388,6 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay wssResolver = r } - // Add WebSocket transport(s) based on configuration if o.EnableWSS { wsOpt := ws.WithTLSConfig(certManager.TLSConfig()) transports = append(transports, libp2p.Transport(ws.New, wsOpt)) @@ -404,19 +403,6 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay if o.EnableWSS { certManagerAddressFactory := certManager.AddressFactory() addrs = certManagerAddressFactory(addrs) - - // Sort to prioritize public addresses (only meaningful with WSS, but harmless otherwise) - slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int { - aPub := manet.IsPublicAddr(a) - bPub := manet.IsPublicAddr(b) - if aPub == bPub { - return 0 - } - if aPub { - return -1 - } - return 1 - }) } return addrs @@ -1080,24 +1066,6 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b return nil, fmt.Errorf("connect full close %w", err) } - // TODO: do we need to ping here? the handshake already verifies liveness? - var pingErr error - for _, addr := range addrs { - pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - _, err := s.Ping(pingCtx, addr) - cancel() // Cancel immediately after use - if err == nil { - pingErr = nil - break - } - pingErr = err - } - - if pingErr != nil { - _ = s.Disconnect(overlay, "peer disconnected immediately after handshake") - return nil, p2p.ErrPeerNotFound - } - if !s.peers.Exists(overlay) { return nil, p2p.ErrPeerNotFound } From a519468a0e3fbc09d3abcefcde0b63d04aaffb27 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Wed, 10 Dec 2025 12:05:03 +0100 Subject: [PATCH 4/9] fix(hive): remove unused context --- pkg/hive/hive.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/pkg/hive/hive.go b/pkg/hive/hive.go index 4293aaf8d9d..d05cbbd873a 100644 --- a/pkg/hive/hive.go +++ b/pkg/hive/hive.go @@ -35,12 +35,11 @@ import ( const loggerName = "hive" const ( - protocolName = "hive" - protocolVersion = "1.1.0" - peersStreamName = "peers" - messageTimeout = 1 * time.Minute // maximum allowed time for a message to be read or written. - maxBatchSize = 30 - batchValidationTimeout = 5 * time.Minute // prevent lock contention on peer validation + protocolName = "hive" + protocolVersion = "1.1.0" + peersStreamName = "peers" + messageTimeout = 1 * time.Minute // maximum allowed time for a message to be read or written. + maxBatchSize = 30 ) var ( @@ -261,16 +260,14 @@ func (s *Service) startCheckPeersHandler() { return case newPeers := <-s.peersChan: s.wg.Go(func() { - cctx, cancel := context.WithTimeout(ctx, batchValidationTimeout) - defer cancel() - s.checkAndAddPeers(cctx, newPeers) + s.checkAndAddPeers(newPeers) }) } } }) } -func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) { +func (s *Service) checkAndAddPeers(peers pb.Peers) { var peersToAdd []swarm.Address for _, p := range peers.Peers { From 5d4ee00bd088f838e694580644bea419a6378581 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Wed, 10 Dec 2025 21:44:43 +0100 Subject: [PATCH 5/9] chore: revert go-storage-incentives-abi to v0.9.4 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 0dd9e214275..e629193a4e4 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/ethereum/go-ethereum v1.15.11 github.com/ethersphere/batch-archive v0.0.4 github.com/ethersphere/go-price-oracle-abi v0.6.9 - github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4 + github.com/ethersphere/go-storage-incentives-abi v0.9.4 github.com/ethersphere/go-sw3-abi v0.6.9 github.com/ethersphere/langos v1.0.0 github.com/go-playground/validator/v10 v10.11.1 diff --git a/go.sum b/go.sum index ce118f3ca08..b2918edd460 100644 --- a/go.sum +++ b/go.sum @@ -239,8 +239,8 @@ github.com/ethersphere/batch-archive v0.0.4 h1:PHmwQfmUEyDJgoX2IqI/R0alQ63+aLPXf github.com/ethersphere/batch-archive v0.0.4/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= github.com/ethersphere/go-price-oracle-abi v0.6.9 h1:bseen6he3PZv5GHOm+KD6s4awaFmVSD9LFx+HpB6rCU= github.com/ethersphere/go-price-oracle-abi v0.6.9/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk= -github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4 h1:YK9FpiQz29ctU5V46CuwMt+4X5Xn8FTBwy6E2v/ix8s= -github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc= +github.com/ethersphere/go-storage-incentives-abi v0.9.4 h1:mSIWXQXg5OQmH10QvXMV5w0vbSibFMaRlBL37gPLTM0= +github.com/ethersphere/go-storage-incentives-abi v0.9.4/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc= github.com/ethersphere/go-sw3-abi v0.6.9 h1:TnWLnYkWE5UvC17mQBdUmdkzhPhO8GcqvWy4wvd1QJQ= github.com/ethersphere/go-sw3-abi v0.6.9/go.mod h1:BmpsvJ8idQZdYEtWnvxA8POYQ8Rl/NhyCdF0zLMOOJU= github.com/ethersphere/langos v1.0.0 h1:NBtNKzXTTRSue95uOlzPN4py7Aofs0xWPzyj4AI1Vcc= From 5a0524d86bcdde3a957d6addaaf937d6a51a0614 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Wed, 10 Dec 2025 23:55:13 +0100 Subject: [PATCH 6/9] fix(hive): preallocate peersToAdd slice --- pkg/hive/hive.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/hive/hive.go b/pkg/hive/hive.go index d05cbbd873a..8bc07f42791 100644 --- a/pkg/hive/hive.go +++ b/pkg/hive/hive.go @@ -268,7 +268,7 @@ func (s *Service) startCheckPeersHandler() { } func (s *Service) checkAndAddPeers(peers pb.Peers) { - var peersToAdd []swarm.Address + peersToAdd := make([]swarm.Address, 0, len(peers.Peers)) for _, p := range peers.Peers { multiUnderlays, err := bzz.DeserializeUnderlays(p.Underlay) From b27bd1ba6225a019bb10ebf2db9b38fc22c12c46 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Thu, 11 Dec 2025 11:38:21 +0100 Subject: [PATCH 7/9] fix(pushsync): return after FullClose in handler --- pkg/pushsync/pushsync.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 78720473e6f..05c4aef3d41 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -188,6 +188,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) if !attemptedWrite { if writeErr := w.WriteMsgWithContext(ctx, &pb.Receipt{Err: err.Error()}); writeErr == nil { _ = stream.FullClose() + return } } _ = stream.Reset() From 805a1608fb1fce525779f14501aa09a5c212e21e Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Thu, 11 Dec 2025 11:39:12 +0100 Subject: [PATCH 8/9] fix(libp2p): improve flow for addrFactory when we have ws and wss --- pkg/p2p/libp2p/libp2p.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 3166f629ed5..172a8f861b1 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -395,18 +395,23 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay transports = append(transports, libp2p.Transport(ws.New)) } - opts = append(opts, libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { - // Always include NAT-resolved addresses (both cases use the same resolver logic - addrs = includeNatResolvedAddresses(addrs, newCompositeAddressResolver(tcpResolver, wssResolver), logger) - - // Only apply cert manager address rewriting when WSS is enabled - if o.EnableWSS { - certManagerAddressFactory := certManager.AddressFactory() - addrs = certManagerAddressFactory(addrs) + compositeResolver := newCompositeAddressResolver(tcpResolver, wssResolver) + + var addrFactory config.AddrsFactory + if o.EnableWSS { + certManagerFactory := certManager.AddressFactory() + addrFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr { + addrs = includeNatResolvedAddresses(addrs, compositeResolver, logger) + addrs = certManagerFactory(addrs) + return addrs + } + } else { + addrFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr { + return includeNatResolvedAddresses(addrs, compositeResolver, logger) } + } - return addrs - })) + opts = append(opts, libp2p.AddrsFactory(addrFactory)) opts = append(opts, transports...) From 8598fd4347512be9ec3eb26226477133b55d5392 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Thu, 11 Dec 2025 11:51:46 +0100 Subject: [PATCH 9/9] fix(libp2p): remove TODO comment --- pkg/p2p/libp2p/libp2p.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 172a8f861b1..c1731d15833 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -473,7 +473,6 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay return nil, fmt.Errorf("handshake service: %w", err) } - // TODO: check if it needs to have own isolated peerstore with dedicated resource manager? // Create a new dialer for libp2p ping protocol. This ensures that the protocol // uses a different set of keys to do ping. It prevents inconsistencies in peerstore as // the addresses used are not dialable and hence should be cleaned up. We should create