Skip to content

Commit 6fe595f

Browse files
Merge pull request #6270 from oasisprotocol/martin/feature/p2p-advertize-only-servers
go/p2p: Ensure only server providers advertise themselves
2 parents 2bb1b3b + 2068173 commit 6fe595f

File tree

14 files changed

+96
-33
lines changed

14 files changed

+96
-33
lines changed

.changelog/6270.internal.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
go/p2p: Ensure only server providers advertise themselves
2+
3+
Previously, the host would advertise itself upon creation
4+
of p2p protocol client, even if the server was not running.
5+
6+
Advertisement is now independent and is only triggered
7+
when serving the P2P protocol.

go/p2p/api/api.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ type Service interface {
6868
Publish(ctx context.Context, topic string, msg any)
6969

7070
// RegisterHandler registers a message handler for the specified runtime and topic kind.
71+
//
72+
// In addition, it triggers advertisement of the host's readiness to serve the specified topic,
73+
// allowing remote peers without existing connection to discover it.
74+
//
75+
// Ensure your server is indeed ready, to avoid advertising prematurely.
7176
RegisterHandler(topic string, handler Handler)
7277

7378
// BlockPeer blocks a specific peer from being used by the local node.
@@ -83,6 +88,11 @@ type Service interface {
8388
RegisterProtocol(p core.ProtocolID, minPeers int, totalPeers int)
8489

8590
// RegisterProtocolServer registers a protocol server for the given protocol.
91+
//
92+
// In addition, it triggers advertisement of the host's readiness to serve the specified protocol,
93+
// allowing remote peers without existing connection to discover it.
94+
//
95+
// Ensure your server is indeed ready, to avoid advertising prematurely.
8696
RegisterProtocolServer(srv rpc.Server)
8797

8898
// GetMinRepublishInterval returns the minimum republish interval that needs to be respected by

go/p2p/p2p.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,8 @@ func (p *p2p) RegisterHandler(topic string, handler api.Handler) {
305305
"topic", topic,
306306
)
307307

308-
p.peerMgr.RegisterTopic(topic, minTopicPeers, totalTopicPeers)
308+
p.peerMgr.TrackTopicPeers(topic, minTopicPeers, totalTopicPeers)
309+
p.peerMgr.AdvertiseTopic(topic)
309310
}
310311

311312
// Implements api.Service.
@@ -321,7 +322,7 @@ func (p *p2p) BlockPeer(peerID core.PeerID) {
321322

322323
// Implements api.Service.
323324
func (p *p2p) RegisterProtocol(pid core.ProtocolID, minPeers int, totalPeers int) {
324-
p.peerMgr.RegisterProtocol(pid, minPeers, totalPeers)
325+
p.peerMgr.TrackProtocolPeers(pid, minPeers, totalPeers)
325326
}
326327

327328
// Implements api.Service.
@@ -340,6 +341,8 @@ func (p *p2p) RegisterProtocolServer(srv rpc.Server) {
340341

341342
p.host.SetStreamHandler(srv.Protocol(), srv.HandleStream)
342343

344+
p.peerMgr.AdvertiseProtocol(srv.Protocol())
345+
343346
p.logger.Info("registered protocol server",
344347
"protocol_id", srv.Protocol(),
345348
)

go/p2p/peermgmt/discovery.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ func (d *peerDiscovery) startAdvertising(ns string) {
100100
case d.advCh <- struct{}{}:
101101
default:
102102
}
103+
104+
d.logger.Debug("triggered protocol advertisement",
105+
"protocol", ns,
106+
)
103107
}
104108

105109
// stopAdvertising stops advertising the given namespace.

go/p2p/peermgmt/peermgr.go

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,9 @@ func (m *PeerManager) NumTopicPeers(topic string) int {
168168
return len(m.pubsub.ListPeers(topic))
169169
}
170170

171-
// RegisterProtocol starts tracking and managing peers that support the given protocol.
171+
// TrackProtocolPeers starts tracking and managing peers that support the given protocol.
172172
// If the protocol is already registered, its values are updated.
173-
func (m *PeerManager) RegisterProtocol(p core.ProtocolID, minPeers int, totalPeers int) {
173+
func (m *PeerManager) TrackProtocolPeers(p core.ProtocolID, minPeers int, totalPeers int) {
174174
m.mu.Lock()
175175
defer m.mu.Unlock()
176176

@@ -188,18 +188,16 @@ func (m *PeerManager) RegisterProtocol(p core.ProtocolID, minPeers int, totalPee
188188
}
189189

190190
m.protocols[p] = &watermark{minPeers, totalPeers}
191-
m.discovery.startAdvertising(string(p))
192-
193191
m.logger.Debug("protocol registered",
194192
"protocol", p,
195193
"min_peers", minPeers,
196194
"total_peers", totalPeers,
197195
)
198196
}
199197

200-
// RegisterTopic starts tracking and managing peers that support the given topic.
198+
// TrackTopicPeers starts tracking and managing peers that support the given topic.
201199
// If the topic is already registered, its values are updated.
202-
func (m *PeerManager) RegisterTopic(topic string, minPeers int, totalPeers int) {
200+
func (m *PeerManager) TrackTopicPeers(topic string, minPeers int, totalPeers int) {
203201
m.mu.Lock()
204202
defer m.mu.Unlock()
205203

@@ -217,7 +215,6 @@ func (m *PeerManager) RegisterTopic(topic string, minPeers int, totalPeers int)
217215
}
218216

219217
m.topics[topic] = &watermark{minPeers, totalPeers}
220-
m.discovery.startAdvertising(topic)
221218

222219
m.logger.Debug("topic registered",
223220
"topic", topic,
@@ -226,9 +223,23 @@ func (m *PeerManager) RegisterTopic(topic string, minPeers int, totalPeers int)
226223
)
227224
}
228225

229-
// UnregisterProtocol stops managing peers that support the given protocol.
226+
// AdvertiseProtocol starts advertising readiness to serve the specified protocol.
227+
//
228+
// This enables remote peers without existing connection to find the host node.
229+
func (m *PeerManager) AdvertiseProtocol(p core.ProtocolID) {
230+
m.discovery.startAdvertising(string(p))
231+
}
232+
233+
// AdvertiseTopic starts advertising readiness to serve the specified topic.
234+
//
235+
// This enables remote peers without existing connection to find the host node.
236+
func (m *PeerManager) AdvertiseTopic(topic string) {
237+
m.discovery.startAdvertising(topic)
238+
}
239+
240+
// StopTrackingProtocolPeers stops managing peers that support the given protocol.
230241
// If the protocol is not registered, this is a noop operation.
231-
func (m *PeerManager) UnregisterProtocol(p core.ProtocolID) {
242+
func (m *PeerManager) StopTrackingProtocolPeers(p core.ProtocolID) {
232243
m.mu.Lock()
233244
defer m.mu.Unlock()
234245

@@ -237,16 +248,15 @@ func (m *PeerManager) UnregisterProtocol(p core.ProtocolID) {
237248
}
238249

239250
delete(m.protocols, p)
240-
m.discovery.stopAdvertising(string(p))
241251

242252
m.logger.Debug("protocol unregistered",
243253
"protocol", p,
244254
)
245255
}
246256

247-
// UnregisterTopic stops managing peers that support the given topic.
257+
// StopTrackingTopicPeers stops managing peers that support the given topic.
248258
// If the topic is not registered, this is a noop operation.
249-
func (m *PeerManager) UnregisterTopic(topic string) {
259+
func (m *PeerManager) StopTrackingTopicPeers(topic string) {
250260
m.mu.Lock()
251261
defer m.mu.Unlock()
252262

@@ -255,13 +265,22 @@ func (m *PeerManager) UnregisterTopic(topic string) {
255265
}
256266

257267
delete(m.topics, topic)
258-
m.discovery.stopAdvertising(topic)
259268

260269
m.logger.Debug("topic unregistered",
261270
"topic", topic,
262271
)
263272
}
264273

274+
// StopAdvertisingProtocol stops advertising readiness to serve the specified protocol.
275+
func (m *PeerManager) StopAdvertisingProtocol(p core.ProtocolID) {
276+
m.discovery.stopAdvertising(string(p))
277+
}
278+
279+
// StopAdvertisingTopic stops advertising readiness to serve the specified protocol.
280+
func (m *PeerManager) StopAdvertisingTopic(topic string) {
281+
m.discovery.stopAdvertising(topic)
282+
}
283+
265284
func (m *PeerManager) run(ctx context.Context) {
266285
// Start background services.
267286
m.backup.start()

go/p2p/peermgmt/peermgr_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (s *PeerManagerTestSuite) TestRegisterProtocol() {
126126

127127
for i := 0; i < 3; i++ {
128128
p := core.ProtocolID(fmt.Sprintf("/protocol/test/%d.0.0", i))
129-
s.manager.RegisterProtocol(p, 1, 10)
129+
s.manager.TrackProtocolPeers(p, 1, 10)
130130
require.Equal(i+1, len(s.manager.Protocols()))
131131
}
132132
}
@@ -136,7 +136,7 @@ func (s *PeerManagerTestSuite) TestRegisterTopic() {
136136

137137
for i := 0; i < 3; i++ {
138138
t := fmt.Sprintf("topic %d", i)
139-
s.manager.RegisterTopic(t, 1, 10)
139+
s.manager.TrackTopicPeers(t, 1, 10)
140140
require.Equal(i+1, len(s.manager.Topics()))
141141
}
142142
}
@@ -146,20 +146,20 @@ func (s *PeerManagerTestSuite) TestUnregisterProtocol() {
146146

147147
for i := 0; i < 3; i++ {
148148
p := core.ProtocolID(fmt.Sprintf("/protocol/test/%d.0.0", i))
149-
s.manager.RegisterProtocol(p, 1, 10)
149+
s.manager.TrackProtocolPeers(p, 1, 10)
150150
require.Equal(i+1, len(s.manager.Protocols()))
151151
}
152152

153-
s.manager.UnregisterProtocol("404")
153+
s.manager.StopTrackingProtocolPeers("404")
154154
require.Equal(3, len(s.manager.Protocols()))
155155

156156
for i := 0; i < 3; i++ {
157157
p := core.ProtocolID(fmt.Sprintf("/protocol/test/%d.0.0", i))
158-
s.manager.UnregisterProtocol(p)
158+
s.manager.StopTrackingProtocolPeers(p)
159159
require.Equal(2-i, len(s.manager.Protocols()))
160160
}
161161

162-
s.manager.UnregisterProtocol("404")
162+
s.manager.StopTrackingProtocolPeers("404")
163163
require.Equal(0, len(s.manager.Protocols()))
164164
}
165165

@@ -168,19 +168,19 @@ func (s *PeerManagerTestSuite) TestUnregisterTopic() {
168168

169169
for i := 0; i < 3; i++ {
170170
t := fmt.Sprintf("topic %d", i)
171-
s.manager.RegisterTopic(t, 1, 10)
171+
s.manager.TrackTopicPeers(t, 1, 10)
172172
require.Equal(i+1, len(s.manager.Topics()))
173173
}
174174

175-
s.manager.UnregisterTopic("404")
175+
s.manager.StopTrackingTopicPeers("404")
176176
require.Equal(3, len(s.manager.Topics()))
177177

178178
for i := 0; i < 3; i++ {
179-
s.manager.UnregisterTopic(fmt.Sprintf("topic %d", i))
179+
s.manager.StopTrackingTopicPeers(fmt.Sprintf("topic %d", i))
180180
require.Equal(2-i, len(s.manager.Topics()))
181181
}
182182

183-
s.manager.UnregisterTopic("404")
183+
s.manager.StopTrackingTopicPeers("404")
184184
require.Equal(0, len(s.manager.Topics()))
185185
}
186186

go/worker/common/p2p/txsync/protocol.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package txsync
33
import (
44
"github.com/libp2p/go-libp2p/core"
55

6+
"github.com/oasisprotocol/oasis-core/go/common"
67
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
78
"github.com/oasisprotocol/oasis-core/go/common/node"
89
"github.com/oasisprotocol/oasis-core/go/common/version"
@@ -16,6 +17,11 @@ const TxSyncProtocolID = "txsync"
1617
// TxSyncProtocolVersion is the supported version of the transaction sync protocol.
1718
var TxSyncProtocolVersion = version.Version{Major: 2, Minor: 0, Patch: 0}
1819

20+
// ProtocolID returns the runtime transaction sync protocol ID.
21+
func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID {
22+
return protocol.NewRuntimeProtocolID(chainContext, runtimeID, TxSyncProtocolID, TxSyncProtocolVersion)
23+
}
24+
1925
// Constants related to the GetTxs method.
2026
const (
2127
MethodGetTxs = "GetTxs"

go/worker/common/p2p/txsync/server.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55

66
"github.com/oasisprotocol/oasis-core/go/common"
77
"github.com/oasisprotocol/oasis-core/go/common/cbor"
8-
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
98
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
109
"github.com/oasisprotocol/oasis-core/go/runtime/txpool"
1110
)
@@ -52,5 +51,5 @@ func (s *service) handleGetTxs(request *GetTxsRequest) (*GetTxsResponse, error)
5251

5352
// NewServer creates a new transaction sync protocol server.
5453
func NewServer(chainContext string, runtimeID common.Namespace, txPool txpool.TransactionPool) rpc.Server {
55-
return rpc.NewServer(protocol.NewRuntimeProtocolID(chainContext, runtimeID, TxSyncProtocolID, TxSyncProtocolVersion), &service{txPool})
54+
return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{txPool})
5655
}

go/worker/keymanager/p2p/protocol.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/libp2p/go-libp2p/core"
77

8+
"github.com/oasisprotocol/oasis-core/go/common"
89
"github.com/oasisprotocol/oasis-core/go/common/node"
910
"github.com/oasisprotocol/oasis-core/go/common/version"
1011
"github.com/oasisprotocol/oasis-core/go/p2p/peermgmt"
@@ -18,6 +19,11 @@ const KeyManagerProtocolID = "keymanager"
1819
// KeyManagerProtocolVersion is the supported version of the keymanager protocol.
1920
var KeyManagerProtocolVersion = version.Version{Major: 2, Minor: 0, Patch: 0}
2021

22+
// ProtocolID returns the runtime keymanager protocol ID.
23+
func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID {
24+
return protocol.NewRuntimeProtocolID(chainContext, runtimeID, KeyManagerProtocolID, KeyManagerProtocolVersion)
25+
}
26+
2127
// Constants related to the CallEnclave method.
2228
const (
2329
MethodCallEnclave = "CallEnclave"

go/worker/keymanager/p2p/server.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77

88
"github.com/oasisprotocol/oasis-core/go/common"
99
"github.com/oasisprotocol/oasis-core/go/common/cbor"
10-
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
1110
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
1211
enclaverpc "github.com/oasisprotocol/oasis-core/go/runtime/enclaverpc/api"
1312
)
@@ -52,5 +51,5 @@ func (s *service) handleCallEnclave(ctx context.Context, request *CallEnclaveReq
5251
func NewServer(chainContext string, runtimeID common.Namespace, km KeyManager) rpc.Server {
5352
initMetrics()
5453

55-
return rpc.NewServer(protocol.NewRuntimeProtocolID(chainContext, runtimeID, KeyManagerProtocolID, KeyManagerProtocolVersion), &service{km})
54+
return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{km})
5655
}

0 commit comments

Comments
 (0)