Skip to content

Commit ad65675

Browse files
feat: implemented and tested new floodsub discovery messaging
1 parent 12a6769 commit ad65675

File tree

6 files changed

+79
-29
lines changed

6 files changed

+79
-29
lines changed

rolling-shutter/p2p/config.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ type Config struct {
4646
AdvertiseAddresses []*address.P2PAddress `comment:"Optional, addresses to be advertised to other peers instead of auto-detected ones."`
4747
CustomBootstrapAddresses []*address.P2PAddress `comment:"Overwrite p2p boostrap nodes"`
4848
Environment env.Environment
49-
DiscoveryNamespace string `shconfig:",required" comment:"Must be unique for each instance id."`
50-
IsAccessNode bool `comment:"Optional, to be set to true if running an access node"`
49+
DiscoveryNamespace string `shconfig:",required" comment:"Must be unique for each instance id."`
50+
IsAccessNode bool `comment:"Optional, to be set to true if running an access node"`
51+
FloodSubDiscovery FloodsubDiscoveryConfig `shconfig:"required"`
5152
}
5253

5354
func (c *Config) Name() string {
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
package gossippeerdiscovery
1+
package floodsubpeerdiscovery
22

33
import (
44
"context"
55
"fmt"
6+
"time"
67

78
pubsub "github.com/libp2p/go-libp2p-pubsub"
89
"github.com/libp2p/go-libp2p/core/peerstore"
@@ -13,34 +14,61 @@ import (
1314

1415
const defaultTopic = "_peer-discovery._p2p._pubsub"
1516

16-
type GossipPeerDiscovery struct {
17+
type FloodsubPeerDiscovery struct {
1718
PeerDiscoveryComponents
18-
Interval int
19-
Topics []string
20-
ListenOnly bool
19+
Interval int
20+
Topics []*pubsub.Topic
2121
}
2222

2323
type PeerDiscoveryComponents struct {
24-
peerId address.P2PIdentifier
25-
pubsub *pubsub.PubSub
26-
peerStore peerstore.Peerstore
24+
PeerId address.P2PIdentifier
25+
Pubsub *pubsub.PubSub
26+
PeerStore peerstore.Peerstore
2727
}
2828

29-
func (pd *GossipPeerDiscovery) init(config PeerDiscoveryComponents, interval int, topics []string, listenOnly bool) {
29+
func (pd *FloodsubPeerDiscovery) Init(config PeerDiscoveryComponents, interval int, topics []string) error {
3030
pd.Interval = interval
31+
pd.PeerId = config.PeerId
32+
pd.Pubsub = config.Pubsub
33+
pd.PeerStore = config.PeerStore
34+
3135
if len(topics) > 0 {
32-
pd.Topics = topics
36+
for _, topic := range topics {
37+
topic, err := pd.Pubsub.Join(topic)
38+
if err != nil {
39+
return fmt.Errorf("failed to join topic | err %v", err)
40+
}
41+
pd.Topics = append(pd.Topics, topic)
42+
}
3343
} else {
34-
pd.Topics = []string{defaultTopic}
44+
topic, err := pd.Pubsub.Join(defaultTopic)
45+
if err != nil {
46+
return fmt.Errorf("failed to join topic | err %v", err)
47+
}
48+
pd.Topics = append(pd.Topics, topic)
3549
}
36-
pd.ListenOnly = listenOnly
37-
pd.peerId = config.peerId
38-
pd.pubsub = config.pubsub
39-
pd.peerStore = config.peerStore
50+
return nil
4051
}
4152

42-
func (pd *GossipPeerDiscovery) broadcast() error {
43-
pubKey, err := pd.peerId.ExtractPublicKey()
53+
func (pd *FloodsubPeerDiscovery) Start(ctx context.Context) error {
54+
timer := time.NewTicker(time.Duration(pd.Interval) * time.Second)
55+
56+
for {
57+
select {
58+
case <-timer.C:
59+
err := pd.broadcast()
60+
if err != nil {
61+
log.Info().Msgf("error in broadcasting floodsub msg | %v", err)
62+
return err
63+
}
64+
case <-ctx.Done():
65+
return nil
66+
}
67+
}
68+
}
69+
70+
func (pd *FloodsubPeerDiscovery) broadcast() error {
71+
pubKey, err := pd.PeerId.ExtractPublicKey()
4472
if err != nil {
4573
return fmt.Errorf("peerId was missing public key | err %v", err)
4674
}
@@ -50,13 +78,13 @@ func (pd *GossipPeerDiscovery) broadcast() error {
5078
return fmt.Errorf("peerId was missing public key | err %v", err)
5179
}
5280

53-
if pd.pubsub == nil {
81+
if pd.Pubsub == nil {
5482
return fmt.Errorf("pubSub not configured | err %v", err)
5583
}
5684

5785
addresses := make([][]byte, 0)
5886

59-
for _, addr := range pd.peerStore.Addrs(pd.peerId.ID) {
87+
for _, addr := range pd.PeerStore.Addrs(pd.PeerId.ID) {
6088
addresses = append(addresses, addr.Bytes())
6189
}
6290

@@ -70,18 +98,16 @@ func (pd *GossipPeerDiscovery) broadcast() error {
7098
}
7199

72100
for _, topic := range pd.Topics {
73-
if len(pd.pubsub.ListPeers(topic)) == 0 {
101+
if len(pd.Pubsub.ListPeers(topic.String())) == 0 {
74102
log.Info().Msgf("skipping broadcasting our peer data on topic %s because there are no peers present", topic)
75103
continue
76104
}
77105
log.Info().Msgf("broadcasting our peer data on topic %s", topic)
78-
topic, err := pd.pubsub.Join(topic)
79-
if err != nil {
80-
return fmt.Errorf("failed to join topic | err %v", err)
81-
}
106+
82107
if err := topic.Publish(context.Background(), pbPeer); err != nil {
83108
return fmt.Errorf("failed to publish to topic | err %v", err)
84109
}
110+
defer topic.Close()
85111
}
86112
return nil
87113
}

rolling-shutter/p2p/gossippeerdiscovery/peer.pb.go renamed to rolling-shutter/p2p/floodsubpeerdiscovery/peer.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rolling-shutter/p2p/gossippeerdiscovery/peer.proto renamed to rolling-shutter/p2p/floodsubpeerdiscovery/peer.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
syntax = "proto3";
22

3-
option go_package = "./;gossippeerdiscovery";
3+
option go_package = "./;floodsubpeerdiscovery";
44

55
message Peer {
66
bytes publicKey = 1;

rolling-shutter/p2p/messaging.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ func New(config *Config) (*P2PMessaging, error) {
9999
Environment: config.Environment,
100100
DiscoveryNamespace: config.DiscoveryNamespace,
101101
IsAccessNode: config.IsAccessNode,
102+
FloodsubDiscovery: *&config.FloodSubDiscovery,
102103
}
103104

104105
bootstrapAddresses := config.CustomBootstrapAddresses

rolling-shutter/p2p/p2p.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/env"
2424
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/keys"
2525
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
26+
"github.com/shutter-network/rolling-shutter/rolling-shutter/p2p/floodsubpeerdiscovery"
2627
)
2728

2829
var DefaultBootstrapPeers []*address.P2PAddress
@@ -59,7 +60,8 @@ type P2PNode struct {
5960
pubSub *pubsub.PubSub
6061
gossipRooms map[string]*gossipRoom
6162

62-
GossipMessages chan *pubsub.Message
63+
GossipMessages chan *pubsub.Message
64+
FloodSubDiscovery *floodsubpeerdiscovery.FloodsubPeerDiscovery
6365
}
6466

6567
type p2pNodeConfig struct {
@@ -71,6 +73,13 @@ type p2pNodeConfig struct {
7173
IsBootstrapNode bool
7274
IsAccessNode bool
7375
DiscoveryNamespace string
76+
FloodsubDiscovery FloodsubDiscoveryConfig
77+
}
78+
79+
type FloodsubDiscoveryConfig struct {
80+
Enabled bool
81+
Interval int
82+
Topics []string
7483
}
7584

7685
func NewP2PNode(config p2pNodeConfig) *P2PNode {
@@ -123,6 +132,19 @@ func (p *P2PNode) Run(
123132
return room.readLoop(ctx, p.GossipMessages)
124133
})
125134
}
135+
if p.config.FloodsubDiscovery.Enabled {
136+
log.Info().Msg("floodsub peer discovery is enabled")
137+
p.FloodSubDiscovery = &floodsubpeerdiscovery.FloodsubPeerDiscovery{}
138+
peerDiscoveryComponents := floodsubpeerdiscovery.PeerDiscoveryComponents{
139+
PeerId: address.P2PIdentifier{p.host.ID()},
140+
PeerStore: p.host.Peerstore(),
141+
Pubsub: p.pubSub,
142+
}
143+
p.FloodSubDiscovery.Init(peerDiscoveryComponents, p.config.FloodsubDiscovery.Interval, p.config.FloodsubDiscovery.Topics)
144+
runner.Go(func() error {
145+
return p.FloodSubDiscovery.Start(ctx)
146+
})
147+
}
126148
runner.Go(func() error {
127149
log.Info().Str("namespace", p.config.DiscoveryNamespace).Msg("starting advertizing discovery node")
128150
util.Advertise(ctx, p.discovery, p.config.DiscoveryNamespace)

0 commit comments

Comments
 (0)