diff --git a/pkg/hive/hive.go b/pkg/hive/hive.go index 4293aaf8d9d..8bc07f42791 100644 --- a/pkg/hive/hive.go +++ b/pkg/hive/hive.go @@ -35,12 +35,11 @@ import ( const loggerName = "hive" const ( - protocolName = "hive" - protocolVersion = "1.1.0" - peersStreamName = "peers" - messageTimeout = 1 * time.Minute // maximum allowed time for a message to be read or written. - maxBatchSize = 30 - batchValidationTimeout = 5 * time.Minute // prevent lock contention on peer validation + protocolName = "hive" + protocolVersion = "1.1.0" + peersStreamName = "peers" + messageTimeout = 1 * time.Minute // maximum allowed time for a message to be read or written. + maxBatchSize = 30 ) var ( @@ -261,17 +260,15 @@ func (s *Service) startCheckPeersHandler() { return case newPeers := <-s.peersChan: s.wg.Go(func() { - cctx, cancel := context.WithTimeout(ctx, batchValidationTimeout) - defer cancel() - s.checkAndAddPeers(cctx, newPeers) + s.checkAndAddPeers(newPeers) }) } } }) } -func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) { - var peersToAdd []swarm.Address +func (s *Service) checkAndAddPeers(peers pb.Peers) { + peersToAdd := make([]swarm.Address, 0, len(peers.Peers)) for _, p := range peers.Peers { multiUnderlays, err := bzz.DeserializeUnderlays(p.Underlay) diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 0a26f7b287d..c1731d15833 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -391,34 +391,28 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay if o.EnableWSS { wsOpt := ws.WithTLSConfig(certManager.TLSConfig()) transports = append(transports, libp2p.Transport(ws.New, wsOpt)) - // AddrsFactory takes the multiaddrs we're listening on and sets the multiaddrs to advertise to the network. - // We use the AutoTLS address factory so that the `*` in the AutoTLS address string is replaced with the - // actual IP address of the host once detected - certManagerAddressFactory := certManager.AddressFactory() - opts = append(opts, libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { - addrs = includeNatResolvedAddresses(addrs, newCompositeAddressResolver(tcpResolver, wssResolver), logger) - - addrs = certManagerAddressFactory(addrs) - - slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int { - aPub := manet.IsPublicAddr(a) - bPub := manet.IsPublicAddr(b) - switch { - case aPub == bPub: - return 0 - case aPub: - return -1 - case bPub: - return 1 - } - return 0 - }) - return addrs - })) } else if o.EnableWS { transports = append(transports, libp2p.Transport(ws.New)) } + compositeResolver := newCompositeAddressResolver(tcpResolver, wssResolver) + + var addrFactory config.AddrsFactory + if o.EnableWSS { + certManagerFactory := certManager.AddressFactory() + addrFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr { + addrs = includeNatResolvedAddresses(addrs, compositeResolver, logger) + addrs = certManagerFactory(addrs) + return addrs + } + } else { + addrFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr { + return includeNatResolvedAddresses(addrs, compositeResolver, logger) + } + } + + opts = append(opts, libp2p.AddrsFactory(addrFactory)) + opts = append(opts, transports...) if o.hostFactory == nil { @@ -1076,23 +1070,6 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b return nil, fmt.Errorf("connect full close %w", err) } - var pingErr error - for _, addr := range addrs { - pingCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) - _, err := s.Ping(pingCtx, addr) - cancel() // Cancel immediately after use - if err == nil { - pingErr = nil - break - } - pingErr = err - } - - if pingErr != nil { - _ = s.Disconnect(overlay, "peer disconnected immediately after handshake") - return nil, p2p.ErrPeerNotFound - } - if !s.peers.Exists(overlay) { return nil, p2p.ErrPeerNotFound } diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 78720473e6f..05c4aef3d41 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -188,6 +188,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) if !attemptedWrite { if writeErr := w.WriteMsgWithContext(ctx, &pb.Receipt{Err: err.Error()}); writeErr == nil { _ = stream.FullClose() + return } } _ = stream.Reset()