@@ -17,8 +17,9 @@ const defaultTopic = "_peer-discovery._p2p._pubsub"
17
17
18
18
type FloodsubPeerDiscovery struct {
19
19
PeerDiscoveryComponents
20
- Interval int
21
- Topics []* pubsub.Topic
20
+ Interval int
21
+ Topics []* pubsub.Topic
22
+ Subscription []* pubsub.Subscription
22
23
}
23
24
24
25
type PeerDiscoveryComponents struct {
@@ -40,13 +41,25 @@ func (pd *FloodsubPeerDiscovery) Init(config PeerDiscoveryComponents, interval i
40
41
return fmt .Errorf ("failed to join topic | err %w" , err )
41
42
}
42
43
pd .Topics = append (pd .Topics , topic )
44
+
45
+ subs , err := topic .Subscribe ()
46
+ if err != nil {
47
+ return fmt .Errorf ("failed to subscribe topic | err %w" , err )
48
+ }
49
+ pd .Subscription = append (pd .Subscription , subs )
43
50
}
44
51
} else {
45
52
topic , err := pd .Pubsub .Join (defaultTopic )
46
53
if err != nil {
47
54
return fmt .Errorf ("failed to join topic | err %w" , err )
48
55
}
49
56
pd .Topics = append (pd .Topics , topic )
57
+
58
+ subs , err := topic .Subscribe ()
59
+ if err != nil {
60
+ return fmt .Errorf ("failed to subscribe topic | err %w" , err )
61
+ }
62
+ pd .Subscription = append (pd .Subscription , subs )
50
63
}
51
64
return nil
52
65
}
@@ -63,6 +76,10 @@ func (pd *FloodsubPeerDiscovery) Start(ctx context.Context) error {
63
76
return err
64
77
}
65
78
case <- ctx .Done ():
79
+ for _ , subs := range pd .Subscription {
80
+ subs .Cancel ()
81
+ }
82
+
66
83
for _ , topic := range pd .Topics {
67
84
if err := topic .Close (); err != nil {
68
85
return fmt .Errorf ("error in closing topic | %w" , err )
0 commit comments