Skip to content

Commit 11c4ae6

Browse files
committed
Use DHT to find peers
1 parent 4b5f240 commit 11c4ae6

File tree

5 files changed

+122
-39
lines changed

5 files changed

+122
-39
lines changed

rolling-shutter/p2p/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type Config struct {
4545
ListenAddresses []*address.P2PAddress
4646
CustomBootstrapAddresses []*address.P2PAddress `comment:"Overwrite p2p boostrap nodes"`
4747
Environment env.Environment
48+
DiscoveryNamespace string `shconfig:",required" comment:"Must be unique for each instance id."`
4849
}
4950

5051
func (c *Config) Name() string {
@@ -76,6 +77,7 @@ func (c *Config) SetExampleValues() error {
7677
),
7778
}
7879
c.Environment = env.EnvironmentProduction
80+
c.DiscoveryNamespace = "shutter-42"
7981

8082
p2pkey, err := keys.GenerateLibp2pPrivate(rand.Reader)
8183
if err != nil {

rolling-shutter/p2p/dht.go

Lines changed: 69 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
package p2p
22

33
import (
4+
"context"
5+
"time"
6+
47
dht "github.com/libp2p/go-libp2p-kad-dht"
5-
"github.com/libp2p/go-libp2p/core/peer"
8+
pubsub "github.com/libp2p/go-libp2p-pubsub"
9+
"github.com/libp2p/go-libp2p/core/discovery"
10+
"github.com/libp2p/go-libp2p/core/host"
11+
"github.com/libp2p/go-libp2p/core/network"
612
"github.com/libp2p/go-libp2p/core/protocol"
13+
"github.com/libp2p/go-libp2p/p2p/discovery/util"
14+
"github.com/rs/zerolog/log"
715

816
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/env"
917
)
@@ -15,18 +23,24 @@ const (
1523
dhtProtocolPrefix protocol.ID = "/shutter"
1624
dhtProtocolExtensionStaging protocol.ID = "/staging"
1725
dhtProtocolExtensionLocal protocol.ID = "/local"
26+
27+
findPeerInterval = 10 * time.Second
28+
)
29+
30+
var (
31+
peerLow = pubsub.GossipSubDlo * 2
32+
peerTarget = pubsub.GossipSubDhi * 3
33+
peerHigh = pubsub.GossipSubDhi * 6
1834
)
1935

20-
func dhtRoutingOptions(
21-
environment env.Environment,
22-
bootstrapPeers ...peer.AddrInfo,
23-
) []dht.Option {
36+
func dhtRoutingOptions(config *p2pNodeConfig) []dht.Option {
2437
// options with higher index in the array will overwrite existing ones
2538
opts := []dht.Option{
2639
dht.ProtocolPrefix(dhtProtocolPrefix),
40+
dht.BootstrapPeers(config.BootstrapPeers...),
2741
}
2842

29-
switch environment { //nolint: exhaustive
43+
switch config.Environment { //nolint: exhaustive
3044
case env.EnvironmentStaging:
3145
opts = append(opts,
3246
dht.ProtocolExtension(dhtProtocolExtensionStaging),
@@ -42,10 +56,56 @@ func dhtRoutingOptions(
4256
default:
4357
}
4458

45-
if len(bootstrapPeers) > 0 {
46-
// this overwrites the option set before
47-
opts = append(opts, dht.BootstrapPeers(bootstrapPeers...))
59+
if config.IsBootstrapNode {
60+
opts = append(opts, dht.Mode(dht.ModeServer))
4861
}
4962

5063
return opts
5164
}
65+
66+
func findPeers(ctx context.Context, h host.Host, d discovery.Discoverer, ns string) error {
67+
log.Info().Str("namespace", ns).Msg("starting peer discovery")
68+
69+
ticker := time.NewTicker(findPeerInterval)
70+
defer ticker.Stop()
71+
72+
for {
73+
select {
74+
case <-ctx.Done():
75+
return ctx.Err()
76+
case <-ticker.C:
77+
peersBefore := len(h.Network().Peers())
78+
if peersBefore >= peerTarget {
79+
continue
80+
}
81+
82+
peers, err := util.FindPeers(ctx, d, ns)
83+
if err != nil {
84+
log.Error().Err(err).Msg("error finding peers")
85+
}
86+
87+
newConnections := 0
88+
failedDials := 0
89+
for _, p := range peers {
90+
if p.ID == h.ID() {
91+
continue
92+
}
93+
if h.Network().Connectedness(p.ID) != network.Connected {
94+
_, err = h.Network().DialPeer(ctx, p.ID)
95+
if err != nil {
96+
log.Error().Err(err).Str("peer", p.ID.String()).Msg("error dialing peer")
97+
failedDials++
98+
}
99+
newConnections++
100+
}
101+
}
102+
103+
log.Debug().
104+
Int("peers-before", peersBefore).
105+
Int("peer-target", peerTarget).
106+
Int("new-connections", newConnections).
107+
Int("failed-dials", failedDials).
108+
Msg("looking for peers")
109+
}
110+
}
111+
}

rolling-shutter/p2p/messaging.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,10 @@ func New(config *Config) (*P2PMessaging, error) {
8282
listenAddresses = append(listenAddresses, addr.Multiaddr)
8383
}
8484
cfg := &p2pNodeConfig{
85-
ListenAddrs: listenAddresses,
86-
PrivKey: *config.P2PKey,
87-
Environment: config.Environment,
88-
// for now, disable those features, since
89-
// they are not stable from our side
90-
DisableTopicDHT: true,
91-
DisableRoutingDHT: true,
85+
ListenAddrs: listenAddresses,
86+
PrivKey: *config.P2PKey,
87+
Environment: config.Environment,
88+
DiscoveryNamespace: config.DiscoveryNamespace,
9289
}
9390

9491
bootstrapAddresses := config.CustomBootstrapAddresses

rolling-shutter/p2p/p2p.go

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ import (
1111
"github.com/libp2p/go-libp2p/core/host"
1212
"github.com/libp2p/go-libp2p/core/peer"
1313
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
14+
"github.com/libp2p/go-libp2p/p2p/discovery/util"
1415
rhost "github.com/libp2p/go-libp2p/p2p/host/routed"
16+
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
1517
"github.com/multiformats/go-multiaddr"
1618
"github.com/pkg/errors"
1719
"github.com/rs/zerolog/log"
@@ -52,20 +54,20 @@ type P2PNode struct {
5254
mux sync.Mutex
5355
host host.Host
5456
dht *dht.IpfsDHT
57+
discovery *routing.RoutingDiscovery
5558
pubSub *pubsub.PubSub
5659
gossipRooms map[string]*gossipRoom
5760

5861
GossipMessages chan *pubsub.Message
5962
}
6063

6164
type p2pNodeConfig struct {
62-
ListenAddrs []multiaddr.Multiaddr
63-
BootstrapPeers []peer.AddrInfo
64-
PrivKey keys.Libp2pPrivate
65-
Environment env.Environment
66-
IsBootstrapNode bool
67-
DisableTopicDHT bool
68-
DisableRoutingDHT bool
65+
ListenAddrs []multiaddr.Multiaddr
66+
BootstrapPeers []peer.AddrInfo
67+
PrivKey keys.Libp2pPrivate
68+
Environment env.Environment
69+
IsBootstrapNode bool
70+
DiscoveryNamespace string
6971
}
7072

7173
func NewP2PNode(config p2pNodeConfig) *P2PNode {
@@ -118,6 +120,15 @@ func (p *P2PNode) Run(
118120
return room.readLoop(ctx, p.GossipMessages)
119121
})
120122
}
123+
runner.Go(func() error {
124+
log.Info().Str("namespace", p.config.DiscoveryNamespace).Msg("starting advertizing discovery node")
125+
util.Advertise(ctx, p.discovery, p.config.DiscoveryNamespace)
126+
<-ctx.Done()
127+
return ctx.Err()
128+
})
129+
runner.Go(func() error {
130+
return findPeers(ctx, p.host, p.discovery, p.config.DiscoveryNamespace)
131+
})
121132
return nil
122133
}
123134

@@ -141,18 +152,31 @@ func (p *P2PNode) init(ctx context.Context) error {
141152
if err != nil {
142153
return err
143154
}
144-
p2pPubSub, err := createPubSub(ctx, p2pHost, p.config, hashTable)
155+
discovery := routing.NewRoutingDiscovery(hashTable)
156+
p2pPubSub, err := createPubSub(ctx, p2pHost, p.config, discovery)
145157
if err != nil {
146158
return err
147159
}
148160

149161
p.host = p2pHost
150162
p.dht = hashTable
163+
p.discovery = discovery
151164
p.pubSub = p2pPubSub
152165
log.Info().Str("address", p.p2pAddress()).Msg("created libp2p host")
153166
return nil
154167
}
155168

169+
func createConnectionManager() (*connmgr.BasicConnMgr, error) {
170+
// TODO: This starts a background goroutine. It works, but it's better to do that later in
171+
// P2PNode.Run() when we have a proper context.
172+
m, err := connmgr.NewConnManager(peerLow, peerHigh)
173+
if err != nil {
174+
return nil, errors.Wrap(err, "failed to create connection manager")
175+
}
176+
177+
return m, nil
178+
}
179+
156180
func createHost(
157181
ctx context.Context,
158182
config p2pNodeConfig,
@@ -166,12 +190,15 @@ func createHost(
166190
// This was a bug in the check function, reading the wrong config value to check against:
167191
// https://github.com/libp2p/go-libp2p/issues/2628
168192

193+
connectionManager, err := createConnectionManager()
194+
if err != nil {
195+
return nil, nil, err
196+
}
197+
169198
options := []libp2p.Option{
170199
libp2p.Identity(&config.PrivKey.Key),
171200
libp2p.ListenAddrs(config.ListenAddrs...),
172-
// libp2p.DefaultTransports,
173-
// libp2p.DefaultSecurity,
174-
// libp2p.ConnectionManager(connectionManager),
201+
libp2p.ConnectionManager(connectionManager),
175202
libp2p.ProtocolVersion(protocolVersion),
176203
}
177204

@@ -192,11 +219,7 @@ func createHost(
192219
return nil, nil, err
193220
}
194221

195-
if config.DisableRoutingDHT {
196-
return p2pHost, nil, err
197-
}
198-
199-
opts := dhtRoutingOptions(config.Environment, config.BootstrapPeers...)
222+
opts := dhtRoutingOptions(&config)
200223
idht, err := dht.New(ctx, p2pHost, opts...)
201224
if err != nil {
202225
return nil, nil, err
@@ -212,7 +235,7 @@ func createPubSub(
212235
ctx context.Context,
213236
p2pHost host.Host,
214237
config p2pNodeConfig,
215-
hashTable *dht.IpfsDHT,
238+
discovery *routing.RoutingDiscovery,
216239
) (*pubsub.PubSub, error) {
217240
gossipSubParams, peerScoreParams, peerScoreThresholds := makePubSubParams(pubSubParamsOptions{
218241
isBootstrapNode: config.IsBootstrapNode,
@@ -222,14 +245,9 @@ func createPubSub(
222245
pubsubOptions := []pubsub.Option{
223246
pubsub.WithGossipSubParams(*gossipSubParams),
224247
pubsub.WithPeerScore(peerScoreParams, peerScoreThresholds),
248+
pubsub.WithDiscovery(discovery),
225249
}
226250

227-
if !config.DisableTopicDHT {
228-
pubsubOptions = append(
229-
pubsubOptions,
230-
pubsub.WithDiscovery(routing.NewRoutingDiscovery(hashTable)),
231-
)
232-
}
233251
if config.IsBootstrapNode {
234252
// enables the pubsub v1.1 feature to handle discovery and
235253
// connection management over the PubSub protocol

rolling-shutter/p2p/params.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ func makePubSubParams(
1717
) (*pubsub.GossipSubParams, *pubsub.PeerScoreParams, *pubsub.PeerScoreThresholds) {
1818
gsDefault := pubsub.DefaultGossipSubParams()
1919
gossipSubParams := &gsDefault
20+
21+
// modified defaults from ethereum consensus spec
22+
// https://github.com/ethereum/consensus-specs/blob/5d80b1954a4b7a121aa36143d50b366727b66cbc/specs/phase0/p2p-interface.md#why-are-these-specific-gossip-parameters-chosen
23+
gossipSubParams.HeartbeatInterval = 700 * time.Millisecond
24+
gossipSubParams.HistoryLength = 6
25+
2026
// From the spec:
2127
// to allow bootstrapping via PeerExchange (PX),
2228
// the bootstrappers should not form a mesh, thus D=D_lo=D_hi=D_out=0

0 commit comments

Comments
 (0)