Skip to content
19 changes: 8 additions & 11 deletions pkg/hive/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ import (
const loggerName = "hive"

const (
protocolName = "hive"
protocolVersion = "1.1.0"
peersStreamName = "peers"
messageTimeout = 1 * time.Minute // maximum allowed time for a message to be read or written.
maxBatchSize = 30
batchValidationTimeout = 5 * time.Minute // prevent lock contention on peer validation
protocolName = "hive"
protocolVersion = "1.1.0"
peersStreamName = "peers"
messageTimeout = 1 * time.Minute // maximum allowed time for a message to be read or written.
maxBatchSize = 30
)

var (
Expand Down Expand Up @@ -261,17 +260,15 @@ func (s *Service) startCheckPeersHandler() {
return
case newPeers := <-s.peersChan:
s.wg.Go(func() {
cctx, cancel := context.WithTimeout(ctx, batchValidationTimeout)
defer cancel()
s.checkAndAddPeers(cctx, newPeers)
s.checkAndAddPeers(newPeers)
})
}
}
})
}

func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) {
var peersToAdd []swarm.Address
func (s *Service) checkAndAddPeers(peers pb.Peers) {
peersToAdd := make([]swarm.Address, 0, len(peers.Peers))

for _, p := range peers.Peers {
multiUnderlays, err := bzz.DeserializeUnderlays(p.Underlay)
Expand Down
59 changes: 18 additions & 41 deletions pkg/p2p/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,34 +391,28 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
if o.EnableWSS {
wsOpt := ws.WithTLSConfig(certManager.TLSConfig())
transports = append(transports, libp2p.Transport(ws.New, wsOpt))
// AddrsFactory takes the multiaddrs we're listening on and sets the multiaddrs to advertise to the network.
// We use the AutoTLS address factory so that the `*` in the AutoTLS address string is replaced with the
// actual IP address of the host once detected
certManagerAddressFactory := certManager.AddressFactory()
opts = append(opts, libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
addrs = includeNatResolvedAddresses(addrs, newCompositeAddressResolver(tcpResolver, wssResolver), logger)

addrs = certManagerAddressFactory(addrs)

slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int {
aPub := manet.IsPublicAddr(a)
bPub := manet.IsPublicAddr(b)
switch {
case aPub == bPub:
return 0
case aPub:
return -1
case bPub:
return 1
}
return 0
})
return addrs
}))
} else if o.EnableWS {
transports = append(transports, libp2p.Transport(ws.New))
}

compositeResolver := newCompositeAddressResolver(tcpResolver, wssResolver)

var addrFactory config.AddrsFactory
if o.EnableWSS {
certManagerFactory := certManager.AddressFactory()
addrFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr {
addrs = includeNatResolvedAddresses(addrs, compositeResolver, logger)
addrs = certManagerFactory(addrs)
return addrs
}
} else {
addrFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr {
return includeNatResolvedAddresses(addrs, compositeResolver, logger)
}
}

opts = append(opts, libp2p.AddrsFactory(addrFactory))

opts = append(opts, transports...)

if o.hostFactory == nil {
Expand Down Expand Up @@ -1076,23 +1070,6 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b
return nil, fmt.Errorf("connect full close %w", err)
}

var pingErr error
for _, addr := range addrs {
pingCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
_, err := s.Ping(pingCtx, addr)
cancel() // Cancel immediately after use
if err == nil {
pingErr = nil
break
}
pingErr = err
}

if pingErr != nil {
_ = s.Disconnect(overlay, "peer disconnected immediately after handshake")
return nil, p2p.ErrPeerNotFound
}

if !s.peers.Exists(overlay) {
return nil, p2p.ErrPeerNotFound
}
Expand Down
1 change: 1 addition & 0 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
if !attemptedWrite {
if writeErr := w.WriteMsgWithContext(ctx, &pb.Receipt{Err: err.Error()}); writeErr == nil {
_ = stream.FullClose()
return
}
}
_ = stream.Reset()
Expand Down
Loading