Skip to content

Commit 8a11b7c

Browse files
authored
basic_host: close swarm on Close (#2916)
Using the `BasicHost` constructor transfers the ownership of the swarm. This is similar to how using `libp2p.New` transfers the ownership of user provided config options like `ResourceManager`, all of which are closed on `host.Close`
1 parent e276a6e commit 8a11b7c

File tree

4 files changed

+32
-30
lines changed

4 files changed

+32
-30
lines changed

config/config.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ func (cfg *Config) addTransports() ([]fx.Option, error) {
368368
fxopts = append(fxopts, cfg.QUICReuse...)
369369
} else {
370370
fxopts = append(fxopts,
371-
fx.Provide(func(key quic.StatelessResetKey, tokenGenerator quic.TokenGeneratorKey, _ *swarm.Swarm, lifecycle fx.Lifecycle) (*quicreuse.ConnManager, error) {
371+
fx.Provide(func(key quic.StatelessResetKey, tokenGenerator quic.TokenGeneratorKey, lifecycle fx.Lifecycle) (*quicreuse.ConnManager, error) {
372372
var opts []quicreuse.Option
373373
if !cfg.DisableMetrics {
374374
opts = append(opts, quicreuse.EnableMetrics(cfg.PrometheusRegisterer))
@@ -469,18 +469,17 @@ func (cfg *Config) NewNode() (host.Host, error) {
469469
fx.Provide(func() event.Bus {
470470
return eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer))))
471471
}),
472-
fx.Provide(func(eventBus event.Bus, lifecycle fx.Lifecycle) (*swarm.Swarm, error) {
473-
sw, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics)
474-
if err != nil {
475-
return nil, err
476-
}
477-
lifecycle.Append(fx.StopHook(sw.Close))
478-
return sw, nil
472+
fx.Provide(func() crypto.PrivKey {
473+
return cfg.PeerKey
479474
}),
480475
// Make sure the swarm constructor depends on the quicreuse.ConnManager.
481476
// That way, the ConnManager will be started before the swarm, and more importantly,
482477
// the swarm will be stopped before the ConnManager.
483-
fx.Decorate(func(sw *swarm.Swarm, _ *quicreuse.ConnManager, lifecycle fx.Lifecycle) *swarm.Swarm {
478+
fx.Provide(func(eventBus event.Bus, _ *quicreuse.ConnManager, lifecycle fx.Lifecycle) (*swarm.Swarm, error) {
479+
sw, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics)
480+
if err != nil {
481+
return nil, err
482+
}
484483
lifecycle.Append(fx.Hook{
485484
OnStart: func(context.Context) error {
486485
// TODO: This method succeeds if listening on one address succeeds. We
@@ -491,14 +490,13 @@ func (cfg *Config) NewNode() (host.Host, error) {
491490
return sw.Close()
492491
},
493492
})
494-
return sw
493+
return sw, nil
495494
}),
496495
fx.Provide(cfg.newBasicHost),
497496
fx.Provide(func(bh *bhost.BasicHost) host.Host {
498497
return bh
499498
}),
500499
fx.Provide(func(h *swarm.Swarm) peer.ID { return h.LocalPeer() }),
501-
fx.Provide(func(h *swarm.Swarm) crypto.PrivKey { return h.Peerstore().PrivKey(h.LocalPeer()) }),
502500
}
503501
transportOpts, err := cfg.addTransports()
504502
if err != nil {

libp2p_test.go

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313

1414
"github.com/libp2p/go-libp2p/core/connmgr"
1515
"github.com/libp2p/go-libp2p/core/crypto"
16-
"github.com/libp2p/go-libp2p/core/event"
1716
"github.com/libp2p/go-libp2p/core/host"
1817
"github.com/libp2p/go-libp2p/core/network"
1918
"github.com/libp2p/go-libp2p/core/peer"
@@ -429,24 +428,15 @@ func TestMain(m *testing.M) {
429428
}
430429

431430
func TestDialCircuitAddrWithWrappedResourceManager(t *testing.T) {
432-
relay, err := New(EnableRelayService())
431+
relay, err := New(EnableRelayService(), ForceReachabilityPublic())
433432
require.NoError(t, err)
434433
defer relay.Close()
435434

436-
// Fake that the relay is publicly reachable
437-
emitterForRelay, err := relay.EventBus().Emitter(&event.EvtLocalReachabilityChanged{})
438-
require.NoError(t, err)
439-
defer emitterForRelay.Close()
440-
emitterForRelay.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPublic})
441-
442-
peerBehindRelay, err := New(EnableAutoRelayWithStaticRelays([]peer.AddrInfo{{ID: relay.ID(), Addrs: relay.Addrs()}}))
435+
peerBehindRelay, err := New(
436+
EnableAutoRelayWithStaticRelays([]peer.AddrInfo{{ID: relay.ID(), Addrs: relay.Addrs()}}),
437+
ForceReachabilityPrivate())
443438
require.NoError(t, err)
444439
defer peerBehindRelay.Close()
445-
// Emit an event to tell this peer it is private
446-
emitterForPeerBehindRelay, err := peerBehindRelay.EventBus().Emitter(&event.EvtLocalReachabilityChanged{})
447-
require.NoError(t, err)
448-
defer emitterForPeerBehindRelay.Close()
449-
emitterForPeerBehindRelay.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPrivate})
450440

451441
// Use a wrapped resource manager to test that the circuit dialing works
452442
// with it. Look at the PR introducing this test for context
@@ -467,10 +457,12 @@ func TestDialCircuitAddrWithWrappedResourceManager(t *testing.T) {
467457
)
468458
require.NoError(t, err)
469459

470-
ctx, cancel := context.WithCancel(context.Background())
471-
res := <-ping.Ping(ctx, h, peerBehindRelay.ID())
472-
require.NoError(t, res.Error)
473-
defer cancel()
460+
require.Eventually(t, func() bool {
461+
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
462+
defer cancel()
463+
res := <-ping.Ping(ctx, h, peerBehindRelay.ID())
464+
return res.Error == nil
465+
}, 5*time.Second, 50*time.Millisecond)
474466
}
475467

476468
func TestHostAddrsFactoryAddsCerthashes(t *testing.T) {

p2p/host/basic/basic_host.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,6 +1080,10 @@ func (h *BasicHost) Close() error {
10801080
_ = h.emitters.evtLocalProtocolsUpdated.Close()
10811081
_ = h.emitters.evtLocalAddrsUpdated.Close()
10821082

1083+
if err := h.network.Close(); err != nil {
1084+
log.Errorf("swarm close failed: %v", err)
1085+
}
1086+
10831087
h.psManager.Close()
10841088
if h.Peerstore() != nil {
10851089
h.Peerstore().Close()

p2p/host/basic/basic_host_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,15 @@ func TestMultipleClose(t *testing.T) {
8383

8484
require.NoError(t, h.Close())
8585
require.NoError(t, h.Close())
86-
require.NoError(t, h.Close())
86+
h2, err := NewHost(swarmt.GenSwarm(t), nil)
87+
require.NoError(t, err)
88+
defer h2.Close()
89+
require.Error(t, h.Connect(context.Background(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}))
90+
h.Network().Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
91+
_, err = h.NewStream(context.Background(), h2.ID())
92+
require.Error(t, err)
93+
require.Empty(t, h.Addrs())
94+
require.Empty(t, h.AllAddrs())
8795
}
8896

8997
func TestSignedPeerRecordWithNoListenAddrs(t *testing.T) {

0 commit comments

Comments
 (0)