@@ -11,16 +11,15 @@ import (
11
11
"github.com/libp2p/go-libp2p/core/host"
12
12
"github.com/libp2p/go-libp2p/core/peer"
13
13
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
14
- rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
15
14
rhost "github.com/libp2p/go-libp2p/p2p/host/routed"
16
15
"github.com/multiformats/go-multiaddr"
17
16
"github.com/pkg/errors"
18
17
"github.com/rs/zerolog/log"
19
- "golang.org/x/sync/errgroup"
20
18
21
19
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/address"
22
20
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/env"
23
21
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/encodeable/keys"
22
+ "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
24
23
)
25
24
26
25
var DefaultBootstrapPeers []* address.P2PAddress
@@ -82,52 +81,43 @@ func NewP2PNode(config p2pNodeConfig) *P2PNode {
82
81
83
82
func (p * P2PNode ) Run (
84
83
ctx context.Context ,
84
+ runner service.Runner ,
85
85
topicNames []string ,
86
86
topicValidators ValidatorRegistry ,
87
87
) error {
88
- defer func () {
89
- close (p .GossipMessages )
90
- }()
88
+ p .mux .Lock ()
89
+ defer p .mux .Unlock ()
91
90
92
- errorgroup , errorgroupctx := errgroup .WithContext (ctx )
93
- errorgroup .Go (func () error {
94
- p .mux .Lock ()
95
- defer p .mux .Unlock ()
96
- if err := p .init (ctx ); err != nil {
97
- return err
98
- }
91
+ runner .Defer (func () {
92
+ close (p .GossipMessages )
93
+ })
99
94
100
- for topicName , validator := range topicValidators {
101
- if err := p .pubSub .RegisterTopicValidator (topicName , validator ); err != nil {
102
- return err
103
- }
104
- }
95
+ if err := p .init (ctx ); err != nil {
96
+ return err
97
+ }
105
98
106
- if err := p .joinTopics (topicNames ); err != nil {
99
+ for topicName , validator := range topicValidators {
100
+ if err := p .pubSub .RegisterTopicValidator (topicName , validator ); err != nil {
107
101
return err
108
102
}
103
+ }
109
104
110
- // listen to gossip on all topics
111
- for _ , room := range p .gossipRooms {
112
- room := room
113
- errorgroup .Go (func () error {
114
- return room .readLoop (errorgroupctx , p .GossipMessages )
115
- })
116
- }
117
-
118
- err := bootstrap (ctx , p .host , p .config , p .dht )
119
- if err != nil {
120
- return err
121
- }
105
+ if err := p .joinTopics (topicNames ); err != nil {
106
+ return err
107
+ }
122
108
123
- // block the function until the context is canceled
124
- errorgroup .Go (func () error {
125
- <- errorgroupctx .Done ()
126
- return ctx .Err ()
109
+ err := bootstrap (ctx , p .host , p .config , p .dht )
110
+ if err != nil {
111
+ return err
112
+ }
113
+ // listen to gossip on all topics
114
+ for _ , room := range p .gossipRooms {
115
+ room := room
116
+ runner .Go (func () error {
117
+ return room .readLoop (ctx , p .GossipMessages )
127
118
})
128
- return nil
129
- })
130
- return errorgroup .Wait ()
119
+ }
120
+ return nil
131
121
}
132
122
133
123
func (p * P2PNode ) Publish (ctx context.Context , topic string , message []byte ) error {
0 commit comments