Skip to content

Commit c0a8a78

Browse files
committed
basichost: extract out observed addrs manager
1 parent b798eec commit c0a8a78

File tree

9 files changed

+360
-419
lines changed

9 files changed

+360
-419
lines changed

config/config.go

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
2929
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
3030
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
31+
"github.com/libp2p/go-libp2p/p2p/host/observedaddrs"
3132
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
3233
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
3334
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
@@ -434,23 +435,23 @@ func (cfg *Config) addTransports() ([]fx.Option, error) {
434435
return fxopts, nil
435436
}
436437

437-
func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus, an *autonatv2.AutoNAT) (*bhost.BasicHost, error) {
438+
func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus, an *autonatv2.AutoNAT, o bhost.ObservedAddrsManager) (*bhost.BasicHost, error) {
438439
h, err := bhost.NewHost(swrm, &bhost.HostOpts{
439-
EventBus: eventBus,
440-
ConnManager: cfg.ConnManager,
441-
AddrsFactory: cfg.AddrsFactory,
442-
NATManager: cfg.NATManager,
443-
EnablePing: !cfg.DisablePing,
444-
UserAgent: cfg.UserAgent,
445-
ProtocolVersion: cfg.ProtocolVersion,
446-
EnableHolePunching: cfg.EnableHolePunching,
447-
HolePunchingOptions: cfg.HolePunchingOptions,
448-
EnableRelayService: cfg.EnableRelayService,
449-
RelayServiceOpts: cfg.RelayServiceOpts,
450-
EnableMetrics: !cfg.DisableMetrics,
451-
PrometheusRegisterer: cfg.PrometheusRegisterer,
452-
DisableIdentifyAddressDiscovery: cfg.DisableIdentifyAddressDiscovery,
453-
AutoNATv2: an,
440+
EventBus: eventBus,
441+
ConnManager: cfg.ConnManager,
442+
AddrsFactory: cfg.AddrsFactory,
443+
NATManager: cfg.NATManager,
444+
EnablePing: !cfg.DisablePing,
445+
UserAgent: cfg.UserAgent,
446+
ProtocolVersion: cfg.ProtocolVersion,
447+
EnableHolePunching: cfg.EnableHolePunching,
448+
HolePunchingOptions: cfg.HolePunchingOptions,
449+
EnableRelayService: cfg.EnableRelayService,
450+
RelayServiceOpts: cfg.RelayServiceOpts,
451+
EnableMetrics: !cfg.DisableMetrics,
452+
PrometheusRegisterer: cfg.PrometheusRegisterer,
453+
AutoNATv2: an,
454+
ObservedAddrsManager: o,
454455
})
455456
if err != nil {
456457
return nil, err
@@ -529,6 +530,25 @@ func (cfg *Config) NewNode() (host.Host, error) {
529530
})
530531
return sw, nil
531532
}),
533+
fx.Provide(func(eventBus event.Bus, s *swarm.Swarm, lifecycle fx.Lifecycle) (bhost.ObservedAddrsManager, error) {
534+
if cfg.DisableIdentifyAddressDiscovery {
535+
return nil, nil
536+
}
537+
o, err := observedaddrs.NewManager(eventBus, s)
538+
if err != nil {
539+
return nil, err
540+
}
541+
lifecycle.Append(fx.Hook{
542+
OnStart: func(context.Context) error {
543+
o.Start()
544+
return nil
545+
},
546+
OnStop: func(context.Context) error {
547+
return o.Close()
548+
},
549+
})
550+
return o, nil
551+
}),
532552
fx.Provide(func() (*autonatv2.AutoNAT, error) {
533553
if !cfg.EnableAutoNATv2 {
534554
return nil, nil

p2p/host/basic/addrs_manager.go

Lines changed: 21 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,15 @@ import (
2121
"github.com/prometheus/client_golang/prometheus"
2222
)
2323

24-
var (
25-
// addrChangeTickrInterval is the interval to recompute host addrs.
26-
addrChangeTickrInterval = 5 * time.Second
27-
// natTypeChageTickrInterval is the interval to recompute host nat type.
28-
natTypeChangeTickrInterval = time.Minute
29-
)
24+
const maxObservedAddrsPerListenAddr = 3
25+
26+
// addrChangeTickrInterval is the interval to recompute host addrs.
27+
var addrChangeTickrInterval = 5 * time.Second
3028

31-
type observedAddrsManager interface {
29+
// ObservedAddrsManager maps our local listen addrs to externally observed addrs.
30+
type ObservedAddrsManager interface {
3231
Addrs(minObservers int) []ma.Multiaddr
3332
AddrsFor(local ma.Multiaddr) []ma.Multiaddr
34-
35-
Record(conn connMultiaddrs, observed ma.Multiaddr)
36-
RemoveConn(conn connMultiaddrs)
37-
Start()
38-
getNATType() (network.NATDeviceType, network.NATDeviceType)
39-
io.Closer
4033
}
4134

4235
type hostAddrs struct {
@@ -54,7 +47,7 @@ type addrsManager struct {
5447
addrsFactory AddrsFactory
5548
listenAddrs func() []ma.Multiaddr
5649
addCertHashes func([]ma.Multiaddr) []ma.Multiaddr
57-
observedAddrsManager observedAddrsManager
50+
observedAddrsManager ObservedAddrsManager
5851
interfaceAddrs *interfaceAddrsCache
5952
addrsReachabilityTracker *addrsReachabilityTracker
6053

@@ -84,8 +77,7 @@ func newAddrsManager(
8477
addrsFactory AddrsFactory,
8578
listenAddrs func() []ma.Multiaddr,
8679
addCertHashes func([]ma.Multiaddr) []ma.Multiaddr,
87-
disableObservedAddrs bool,
88-
observedAddrsManager observedAddrsManager,
80+
observedAddrsManager ObservedAddrsManager,
8981
addrsUpdatedChan chan struct{},
9082
client autonatv2Client,
9183
enableMetrics bool,
@@ -96,6 +88,7 @@ func newAddrsManager(
9688
bus: bus,
9789
listenAddrs: listenAddrs,
9890
addCertHashes: addCertHashes,
91+
observedAddrsManager: observedAddrsManager,
9992
natManager: natmgr,
10093
addrsFactory: addrsFactory,
10194
triggerAddrsUpdateChan: make(chan chan struct{}, 1),
@@ -108,25 +101,6 @@ func newAddrsManager(
108101
unknownReachability := network.ReachabilityUnknown
109102
as.hostReachability.Store(&unknownReachability)
110103

111-
if !disableObservedAddrs {
112-
if observedAddrsManager != nil {
113-
as.observedAddrsManager = observedAddrsManager
114-
} else {
115-
om, err := NewObservedAddrManager(func() []ma.Multiaddr {
116-
l := as.listenAddrs()
117-
r, err := manet.ResolveUnspecifiedAddresses(l, as.interfaceAddrs.All())
118-
if err != nil {
119-
return l
120-
}
121-
return append(l, r...)
122-
})
123-
if err != nil {
124-
return nil, fmt.Errorf("failed to create observed addrs manager: %w", err)
125-
}
126-
as.observedAddrsManager = om
127-
}
128-
}
129-
130104
if client != nil {
131105
var metricsTracker MetricsTracker
132106
if enableMetrics {
@@ -138,22 +112,12 @@ func newAddrsManager(
138112
}
139113

140114
func (a *addrsManager) Start() error {
141-
// TODO: add Start method to NATMgr
142115
if a.addrsReachabilityTracker != nil {
143116
err := a.addrsReachabilityTracker.Start()
144117
if err != nil {
145118
return fmt.Errorf("error starting addrs reachability tracker: %s", err)
146119
}
147120
}
148-
if a.observedAddrsManager != nil {
149-
a.observedAddrsManager.Start()
150-
err := a.startObservedAddrsWorker()
151-
if err != nil {
152-
a.observedAddrsManager.Close()
153-
return fmt.Errorf("error starting observed addrs worker: %s", err)
154-
}
155-
}
156-
157121
return a.startBackgroundWorker()
158122
}
159123

@@ -171,24 +135,13 @@ func (a *addrsManager) Close() {
171135
log.Warnf("error closing addrs reachability tracker: %s", err)
172136
}
173137
}
174-
if a.observedAddrsManager != nil {
175-
err := a.observedAddrsManager.Close()
176-
if err != nil {
177-
log.Warnf("error closing observed addrs manager: %s", err)
178-
}
179-
}
180138
a.wg.Wait()
181139
}
182140

183141
func (a *addrsManager) NetNotifee() network.Notifiee {
184142
return &network.NotifyBundle{
185143
ListenF: func(network.Network, ma.Multiaddr) { a.updateAddrsSync() },
186144
ListenCloseF: func(network.Network, ma.Multiaddr) { a.updateAddrsSync() },
187-
DisconnectedF: func(_ network.Network, conn network.Conn) {
188-
if a.observedAddrsManager != nil {
189-
a.observedAddrsManager.RemoveConn(conn)
190-
}
191-
},
192145
}
193146
}
194147

@@ -219,23 +172,6 @@ func closeIfError(err error, closer io.Closer, name string) error {
219172
return nil
220173
}
221174

222-
func (a *addrsManager) startObservedAddrsWorker() (retErr error) {
223-
identifySub, err := a.bus.Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.Name("addrs-manager"))
224-
if err != nil {
225-
return fmt.Errorf("error subscribing to autonat reachability: %s", err)
226-
}
227-
defer func() { retErr = closeIfError(retErr, identifySub, "identify subscription") }()
228-
229-
natTypeEmitter, err := a.bus.Emitter(new(event.EvtNATDeviceTypeChanged), eventbus.Stateful)
230-
if err != nil {
231-
return fmt.Errorf("error creating nat type emitter: %s", err)
232-
}
233-
234-
a.wg.Add(1)
235-
go a.observedAddrsWorker(identifySub, natTypeEmitter)
236-
return nil
237-
}
238-
239175
func (a *addrsManager) startBackgroundWorker() (retErr error) {
240176
autoRelayAddrsSub, err := a.bus.Subscribe(new(event.EvtAutoRelayAddrsUpdated), eventbus.Name("addrs-manager"))
241177
if err != nil {
@@ -282,11 +218,8 @@ func (a *addrsManager) startBackgroundWorker() (retErr error) {
282218
return nil
283219
}
284220

285-
func (a *addrsManager) background(
286-
autoRelayAddrsSub,
287-
autonatReachabilitySub event.Subscription,
288-
emitter event.Emitter,
289-
relayAddrs []ma.Multiaddr,
221+
func (a *addrsManager) background(autoRelayAddrsSub, autonatReachabilitySub event.Subscription,
222+
emitter event.Emitter, relayAddrs []ma.Multiaddr,
290223
) {
291224
defer a.wg.Done()
292225
defer func() {
@@ -334,38 +267,6 @@ func (a *addrsManager) background(
334267
}
335268
}
336269

337-
func (a *addrsManager) observedAddrsWorker(identifySub event.Subscription, natTypeEmitter event.Emitter) {
338-
defer a.wg.Done()
339-
defer func() {
340-
err := identifySub.Close()
341-
if err != nil {
342-
log.Warnf("error closing identify sub: %s", err)
343-
}
344-
err = natTypeEmitter.Close()
345-
if err != nil {
346-
log.Warnf("error closing nat emitter: %s", err)
347-
}
348-
}()
349-
natTypeTicker := time.NewTicker(natTypeChangeTickrInterval)
350-
defer natTypeTicker.Stop()
351-
var udpNATType, tcpNATType network.NATDeviceType
352-
for {
353-
select {
354-
case e := <-identifySub.Out():
355-
evt := e.(event.EvtPeerIdentificationCompleted)
356-
a.observedAddrsManager.Record(evt.Conn, evt.ObservedAddr)
357-
case <-natTypeTicker.C:
358-
if *a.hostReachability.Load() == network.ReachabilityPrivate {
359-
newUDPNAT, newTCPNAT := a.observedAddrsManager.getNATType()
360-
a.notifyNATTypeChanged(natTypeEmitter, newUDPNAT, newTCPNAT, udpNATType, tcpNATType)
361-
udpNATType, tcpNATType = newUDPNAT, newTCPNAT
362-
}
363-
case <-a.ctx.Done():
364-
return
365-
}
366-
}
367-
}
368-
369270
// updateAddrs updates the addresses of the host and returns the new updated
370271
// addrs. This must only be called from the background goroutine or from the Start method otherwise
371272
// we may end up with stale addrs.
@@ -472,6 +373,8 @@ func (a *addrsManager) getAddrs(localAddrs []ma.Multiaddr, relayAddrs []ma.Multi
472373
func (a *addrsManager) HolePunchAddrs() []ma.Multiaddr {
473374
addrs := a.DirectAddrs()
474375
addrs = slices.Clone(a.addrsFactory(addrs))
376+
// AllAddrs may ignore observed addresses in favour of NAT mappings.
377+
// Use both for hole punching.
475378
if a.observedAddrsManager != nil {
476379
// For holepunching, include all the best addresses we know even ones with only 1 observer.
477380
addrs = append(addrs, a.observedAddrsManager.Addrs(1)...)
@@ -552,8 +455,6 @@ func (a *addrsManager) appendPrimaryInterfaceAddrs(dst []ma.Multiaddr, listenAdd
552455
// appendNATAddrs appends the NAT-ed addrs for the listenAddrs. For unspecified listen addrs it appends the
553456
// public address for all the interfaces.
554457
// Inferring WebTransport from QUIC depends on the observed address manager.
555-
//
556-
// TODO: Merge the natmgr and identify.ObservedAddrManager in to one NatMapper module.
557458
func (a *addrsManager) appendNATAddrs(dst []ma.Multiaddr, listenAddrs []ma.Multiaddr) []ma.Multiaddr {
558459
for _, listenAddr := range listenAddrs {
559460
natAddr := a.natManager.GetMapping(listenAddr)
@@ -570,6 +471,9 @@ func (a *addrsManager) appendObservedAddrs(dst []ma.Multiaddr, listenAddrs, ifac
570471
// will have the unspecified address as the local address.
571472
for _, la := range listenAddrs {
572473
obsAddrs := a.observedAddrsManager.AddrsFor(la)
474+
if len(obsAddrs) > maxObservedAddrsPerListenAddr {
475+
obsAddrs = obsAddrs[:maxObservedAddrsPerListenAddr]
476+
}
573477
dst = append(dst, obsAddrs...)
574478
}
575479

@@ -580,26 +484,15 @@ func (a *addrsManager) appendObservedAddrs(dst []ma.Multiaddr, listenAddrs, ifac
580484
return dst
581485
}
582486
for _, addr := range resolved {
583-
dst = append(dst, a.observedAddrsManager.AddrsFor(addr)...)
487+
obsAddrs := a.observedAddrsManager.AddrsFor(addr)
488+
if len(obsAddrs) > maxObservedAddrsPerListenAddr {
489+
obsAddrs = obsAddrs[:maxObservedAddrsPerListenAddr]
490+
}
491+
dst = append(dst, obsAddrs...)
584492
}
585493
return dst
586494
}
587495

588-
func (a *addrsManager) notifyNATTypeChanged(emitter event.Emitter, newUDPNAT, newTCPNAT, oldUDPNAT, oldTCPNAT network.NATDeviceType) {
589-
if newUDPNAT != oldUDPNAT {
590-
emitter.Emit(event.EvtNATDeviceTypeChanged{
591-
TransportProtocol: network.NATTransportUDP,
592-
NatDeviceType: newUDPNAT,
593-
})
594-
}
595-
if newTCPNAT != oldTCPNAT {
596-
emitter.Emit(event.EvtNATDeviceTypeChanged{
597-
TransportProtocol: network.NATTransportTCP,
598-
NatDeviceType: newTCPNAT,
599-
})
600-
}
601-
}
602-
603496
func areAddrsDifferent(prev, current []ma.Multiaddr) bool {
604497
// TODO: make the sorted nature of ma.Unique a guarantee in multiaddrs
605498
prev = ma.Unique(prev)

0 commit comments

Comments
 (0)