Skip to content

Commit 17916ec

Browse files
fix: resolve connection issues (#5302)
1 parent a8faeb3 commit 17916ec

File tree

3 files changed

+27
-52
lines changed

3 files changed

+27
-52
lines changed

pkg/hive/hive.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,11 @@ import (
3535
const loggerName = "hive"
3636

3737
const (
38-
protocolName = "hive"
39-
protocolVersion = "1.1.0"
40-
peersStreamName = "peers"
41-
messageTimeout = 1 * time.Minute // maximum allowed time for a message to be read or written.
42-
maxBatchSize = 30
43-
batchValidationTimeout = 5 * time.Minute // prevent lock contention on peer validation
38+
protocolName = "hive"
39+
protocolVersion = "1.1.0"
40+
peersStreamName = "peers"
41+
messageTimeout = 1 * time.Minute // maximum allowed time for a message to be read or written.
42+
maxBatchSize = 30
4443
)
4544

4645
var (
@@ -261,17 +260,15 @@ func (s *Service) startCheckPeersHandler() {
261260
return
262261
case newPeers := <-s.peersChan:
263262
s.wg.Go(func() {
264-
cctx, cancel := context.WithTimeout(ctx, batchValidationTimeout)
265-
defer cancel()
266-
s.checkAndAddPeers(cctx, newPeers)
263+
s.checkAndAddPeers(newPeers)
267264
})
268265
}
269266
}
270267
})
271268
}
272269

273-
func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) {
274-
var peersToAdd []swarm.Address
270+
func (s *Service) checkAndAddPeers(peers pb.Peers) {
271+
peersToAdd := make([]swarm.Address, 0, len(peers.Peers))
275272

276273
for _, p := range peers.Peers {
277274
multiUnderlays, err := bzz.DeserializeUnderlays(p.Underlay)

pkg/p2p/libp2p/libp2p.go

Lines changed: 18 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -391,34 +391,28 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
391391
if o.EnableWSS {
392392
wsOpt := ws.WithTLSConfig(certManager.TLSConfig())
393393
transports = append(transports, libp2p.Transport(ws.New, wsOpt))
394-
// AddrsFactory takes the multiaddrs we're listening on and sets the multiaddrs to advertise to the network.
395-
// We use the AutoTLS address factory so that the `*` in the AutoTLS address string is replaced with the
396-
// actual IP address of the host once detected
397-
certManagerAddressFactory := certManager.AddressFactory()
398-
opts = append(opts, libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
399-
addrs = includeNatResolvedAddresses(addrs, newCompositeAddressResolver(tcpResolver, wssResolver), logger)
400-
401-
addrs = certManagerAddressFactory(addrs)
402-
403-
slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int {
404-
aPub := manet.IsPublicAddr(a)
405-
bPub := manet.IsPublicAddr(b)
406-
switch {
407-
case aPub == bPub:
408-
return 0
409-
case aPub:
410-
return -1
411-
case bPub:
412-
return 1
413-
}
414-
return 0
415-
})
416-
return addrs
417-
}))
418394
} else if o.EnableWS {
419395
transports = append(transports, libp2p.Transport(ws.New))
420396
}
421397

398+
compositeResolver := newCompositeAddressResolver(tcpResolver, wssResolver)
399+
400+
var addrFactory config.AddrsFactory
401+
if o.EnableWSS {
402+
certManagerFactory := certManager.AddressFactory()
403+
addrFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr {
404+
addrs = includeNatResolvedAddresses(addrs, compositeResolver, logger)
405+
addrs = certManagerFactory(addrs)
406+
return addrs
407+
}
408+
} else {
409+
addrFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr {
410+
return includeNatResolvedAddresses(addrs, compositeResolver, logger)
411+
}
412+
}
413+
414+
opts = append(opts, libp2p.AddrsFactory(addrFactory))
415+
422416
opts = append(opts, transports...)
423417

424418
if o.hostFactory == nil {
@@ -1076,23 +1070,6 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b
10761070
return nil, fmt.Errorf("connect full close %w", err)
10771071
}
10781072

1079-
var pingErr error
1080-
for _, addr := range addrs {
1081-
pingCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
1082-
_, err := s.Ping(pingCtx, addr)
1083-
cancel() // Cancel immediately after use
1084-
if err == nil {
1085-
pingErr = nil
1086-
break
1087-
}
1088-
pingErr = err
1089-
}
1090-
1091-
if pingErr != nil {
1092-
_ = s.Disconnect(overlay, "peer disconnected immediately after handshake")
1093-
return nil, p2p.ErrPeerNotFound
1094-
}
1095-
10961073
if !s.peers.Exists(overlay) {
10971074
return nil, p2p.ErrPeerNotFound
10981075
}

pkg/pushsync/pushsync.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
188188
if !attemptedWrite {
189189
if writeErr := w.WriteMsgWithContext(ctx, &pb.Receipt{Err: err.Error()}); writeErr == nil {
190190
_ = stream.FullClose()
191+
return
191192
}
192193
}
193194
_ = stream.Reset()

0 commit comments

Comments
 (0)