Skip to content

Commit fe4ea5f

Browse files
Merge pull request #522 from shutter-network/feature/251-floodsub-peer-discovery
floodsub discovery mechanism
2 parents 4cf80eb + 4480c53 commit fe4ea5f

File tree

6 files changed

+307
-3
lines changed

6 files changed

+307
-3
lines changed

rolling-shutter/p2p/config.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ type Config struct {
4646
AdvertiseAddresses []*address.P2PAddress `comment:"Optional, addresses to be advertised to other peers instead of auto-detected ones."`
4747
CustomBootstrapAddresses []*address.P2PAddress `comment:"Overwrite p2p boostrap nodes"`
4848
Environment env.Environment
49-
DiscoveryNamespace string `shconfig:",required" comment:"Must be unique for each instance id."`
50-
IsAccessNode bool `comment:"Optional, to be set to true if running an access node"`
49+
DiscoveryNamespace string `shconfig:",required" comment:"Must be unique for each instance id."`
50+
IsAccessNode bool `comment:"Optional, to be set to true if running an access node"`
51+
FloodSubDiscovery FloodsubDiscoveryConfig `shconfig:"required"`
5152
}
5253

5354
func (c *Config) Name() string {
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package floodsubpeerdiscovery
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
pubsub "github.com/libp2p/go-libp2p-pubsub"
9+
"github.com/libp2p/go-libp2p/core/peerstore"
10+
"github.com/rs/zerolog/log"
11+
"google.golang.org/protobuf/proto"
12+
13+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/address"
14+
)
15+
16+
const defaultTopic = "_peer-discovery._p2p._pubsub"
17+
18+
type FloodsubPeerDiscovery struct {
19+
PeerDiscoveryComponents
20+
Interval int
21+
Topics []*pubsub.Topic
22+
}
23+
24+
type PeerDiscoveryComponents struct {
25+
PeerID address.P2PIdentifier
26+
Pubsub *pubsub.PubSub
27+
PeerStore peerstore.Peerstore
28+
}
29+
30+
func (pd *FloodsubPeerDiscovery) Init(config PeerDiscoveryComponents, interval int, topics []string) error {
31+
pd.Interval = interval
32+
pd.PeerID = config.PeerID
33+
pd.Pubsub = config.Pubsub
34+
pd.PeerStore = config.PeerStore
35+
36+
if len(topics) > 0 {
37+
for _, topic := range topics {
38+
topic, err := pd.Pubsub.Join(topic)
39+
if err != nil {
40+
return fmt.Errorf("failed to join topic | err %w", err)
41+
}
42+
pd.Topics = append(pd.Topics, topic)
43+
}
44+
} else {
45+
topic, err := pd.Pubsub.Join(defaultTopic)
46+
if err != nil {
47+
return fmt.Errorf("failed to join topic | err %w", err)
48+
}
49+
pd.Topics = append(pd.Topics, topic)
50+
}
51+
return nil
52+
}
53+
54+
func (pd *FloodsubPeerDiscovery) Start(ctx context.Context) error {
55+
timer := time.NewTicker(time.Duration(pd.Interval) * time.Second)
56+
57+
for {
58+
select {
59+
case <-timer.C:
60+
err := pd.broadcast()
61+
if err != nil {
62+
log.Warn().Msgf("error in broadcasting floodsub msg | %v", err)
63+
return err
64+
}
65+
case <-ctx.Done():
66+
for _, topic := range pd.Topics {
67+
if err := topic.Close(); err != nil {
68+
return fmt.Errorf("error in closing topic | %w", err)
69+
}
70+
}
71+
return nil
72+
}
73+
}
74+
}
75+
76+
func (pd *FloodsubPeerDiscovery) broadcast() error {
77+
pubKey, err := pd.PeerID.ExtractPublicKey()
78+
if err != nil {
79+
return fmt.Errorf("peerId was missing public key | err %w", err)
80+
}
81+
82+
pubKeyBytes, err := pubKey.Raw()
83+
if err != nil || len(pubKeyBytes) == 0 {
84+
return fmt.Errorf("peerId was missing public key | err %w", err)
85+
}
86+
87+
if pd.Pubsub == nil {
88+
return fmt.Errorf("pubSub not configured | err %w", err)
89+
}
90+
91+
addresses := make([][]byte, 0)
92+
93+
for _, addr := range pd.PeerStore.Addrs(pd.PeerID.ID) {
94+
addresses = append(addresses, addr.Bytes())
95+
}
96+
97+
peer := Peer{
98+
PublicKey: pubKeyBytes,
99+
Addrs: addresses,
100+
}
101+
pbPeer, err := proto.Marshal(&peer)
102+
if err != nil {
103+
return fmt.Errorf("error marshaling message | err %w", err)
104+
}
105+
106+
for _, topic := range pd.Topics {
107+
if len(pd.Pubsub.ListPeers(topic.String())) == 0 {
108+
log.Info().Msgf("skipping broadcasting our peer data on topic %s because there are no peers present", topic)
109+
continue
110+
}
111+
log.Info().Msgf("broadcasting our peer data on topic %s", topic)
112+
113+
if err := topic.Publish(context.Background(), pbPeer); err != nil {
114+
return fmt.Errorf("failed to publish to topic | err %w", err)
115+
}
116+
}
117+
return nil
118+
}

rolling-shutter/p2p/floodsubpeerdiscovery/peer.pb.go

Lines changed: 151 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
syntax = "proto3";
2+
3+
option go_package = "./;floodsubpeerdiscovery";
4+
5+
message Peer {
6+
bytes publicKey = 1;
7+
repeated bytes addrs = 2;
8+
}

rolling-shutter/p2p/messaging.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ func New(config *Config) (*P2PMessaging, error) {
9999
Environment: config.Environment,
100100
DiscoveryNamespace: config.DiscoveryNamespace,
101101
IsAccessNode: config.IsAccessNode,
102+
FloodsubDiscovery: config.FloodSubDiscovery,
102103
}
103104

104105
bootstrapAddresses := config.CustomBootstrapAddresses

rolling-shutter/p2p/p2p.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/env"
2424
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/keys"
2525
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
26+
"github.com/shutter-network/rolling-shutter/rolling-shutter/p2p/floodsubpeerdiscovery"
2627
)
2728

2829
var DefaultBootstrapPeers []*address.P2PAddress
@@ -59,7 +60,8 @@ type P2PNode struct {
5960
pubSub *pubsub.PubSub
6061
gossipRooms map[string]*gossipRoom
6162

62-
GossipMessages chan *pubsub.Message
63+
GossipMessages chan *pubsub.Message
64+
FloodSubDiscovery *floodsubpeerdiscovery.FloodsubPeerDiscovery
6365
}
6466

6567
type p2pNodeConfig struct {
@@ -71,6 +73,13 @@ type p2pNodeConfig struct {
7173
IsBootstrapNode bool
7274
IsAccessNode bool
7375
DiscoveryNamespace string
76+
FloodsubDiscovery FloodsubDiscoveryConfig
77+
}
78+
79+
type FloodsubDiscoveryConfig struct {
80+
Enabled bool
81+
Interval int
82+
Topics []string
7483
}
7584

7685
func NewP2PNode(config p2pNodeConfig) *P2PNode {
@@ -123,6 +132,22 @@ func (p *P2PNode) Run(
123132
return room.readLoop(ctx, p.GossipMessages)
124133
})
125134
}
135+
if p.config.FloodsubDiscovery.Enabled {
136+
log.Info().Msg("floodsub peer discovery is enabled")
137+
p.FloodSubDiscovery = &floodsubpeerdiscovery.FloodsubPeerDiscovery{}
138+
peerDiscoveryComponents := floodsubpeerdiscovery.PeerDiscoveryComponents{
139+
PeerID: address.P2PIdentifier{ID: p.host.ID()},
140+
PeerStore: p.host.Peerstore(),
141+
Pubsub: p.pubSub,
142+
}
143+
err := p.FloodSubDiscovery.Init(peerDiscoveryComponents, p.config.FloodsubDiscovery.Interval, p.config.FloodsubDiscovery.Topics)
144+
if err != nil {
145+
return err
146+
}
147+
runner.Go(func() error {
148+
return p.FloodSubDiscovery.Start(ctx)
149+
})
150+
}
126151
runner.Go(func() error {
127152
log.Info().Str("namespace", p.config.DiscoveryNamespace).Msg("starting advertizing discovery node")
128153
util.Advertise(ctx, p.discovery, p.config.DiscoveryNamespace)

0 commit comments

Comments
 (0)