Skip to content

Commit a6c7baa

Browse files
committed
go/p2p: Ensure only server providers advertise themselves
1 parent 2bb1b3b commit a6c7baa

File tree

12 files changed

+60
-7
lines changed

12 files changed

+60
-7
lines changed

go/oasis-node/cmd/debug/byzantine/node.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ func initializeAndRegisterByzantineNode(
155155
return nil, fmt.Errorf("initializing storage node failed: %w", err)
156156
}
157157
b.p2p.service.RegisterProtocolServer(storageP2P.NewServer(b.chainContext, b.runtimeID, storage))
158+
b.p2p.service.AdvertiseProtocol(storageP2P.RuntimeProtocolID(b.chainContext, b.runtimeID))
159+
158160
b.storage = storage
159161

160162
// Wait for activation epoch.

go/p2p/api/api.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,16 @@ type Service interface {
8383
RegisterProtocol(p core.ProtocolID, minPeers int, totalPeers int)
8484

8585
// RegisterProtocolServer registers a protocol server for the given protocol.
86+
//
87+
// Normally, registered protocol server should be also advertised (AdvertiseProtocol),
88+
// so that remote peers find the host node.
8689
RegisterProtocolServer(srv rpc.Server)
8790

91+
// AdvertiseProtocol starts advertising the host as available for serving the specified protocol.
92+
//
93+
// It should be called only after the protocol server is running and ready to serve.
94+
AdvertiseProtocol(p core.ProtocolID)
95+
8896
// GetMinRepublishInterval returns the minimum republish interval that needs to be respected by
8997
// the caller when publishing the same message. If Publish is called for the same message more
9098
// quickly, the message may be dropped and not published.

go/p2p/nop.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ func (p *nopP2P) RegisterProtocol(core.ProtocolID, int, int) {
8585
func (p *nopP2P) RegisterProtocolServer(rpc.Server) {
8686
}
8787

88+
// Implements api.Service.
89+
func (p *nopP2P) AdvertiseProtocol(core.ProtocolID) {
90+
}
91+
8892
// Implements api.Service.
8993
func (p *nopP2P) GetMinRepublishInterval() time.Duration {
9094
return time.Hour

go/p2p/p2p.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,11 @@ func (p *p2p) RegisterProtocolServer(srv rpc.Server) {
345345
)
346346
}
347347

348+
// Implements api.Service.
349+
func (p *p2p) AdvertiseProtocol(protocol core.ProtocolID) {
350+
p.peerMgr.AdvertiseProtocol(protocol)
351+
}
352+
348353
// Implements api.Service.
349354
func (p *p2p) GetMinRepublishInterval() time.Duration {
350355
return seenMessagesTTL + 5*time.Second

go/p2p/peermgmt/peermgr.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,15 +188,21 @@ func (m *PeerManager) RegisterProtocol(p core.ProtocolID, minPeers int, totalPee
188188
}
189189

190190
m.protocols[p] = &watermark{minPeers, totalPeers}
191-
m.discovery.startAdvertising(string(p))
192-
193191
m.logger.Debug("protocol registered",
194192
"protocol", p,
195193
"min_peers", minPeers,
196194
"total_peers", totalPeers,
197195
)
198196
}
199197

198+
// AdvertiseProtocol starts advertising readiness to serve the specified protocol.
199+
func (m *PeerManager) AdvertiseProtocol(p core.ProtocolID) {
200+
m.discovery.startAdvertising(string(p))
201+
m.logger.Debug("triggered protocol advertisement",
202+
"protocol", p,
203+
)
204+
}
205+
200206
// RegisterTopic starts tracking and managing peers that support the given topic.
201207
// If the topic is already registered, its values are updated.
202208
func (m *PeerManager) RegisterTopic(topic string, minPeers int, totalPeers int) {

go/worker/common/committee/node.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,6 +956,7 @@ func NewNode(
956956

957957
// Register transaction sync service.
958958
p2pHost.RegisterProtocolServer(txsync.NewServer(chainContext, runtime.ID(), n.TxPool))
959+
p2pHost.AdvertiseProtocol(txsync.RuntimeProtocolID(chainContext, runtime.ID()))
959960

960961
return n, nil
961962
}

go/worker/common/p2p/txsync/server.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package txsync
33
import (
44
"context"
55

6+
"github.com/libp2p/go-libp2p/core"
7+
68
"github.com/oasisprotocol/oasis-core/go/common"
79
"github.com/oasisprotocol/oasis-core/go/common/cbor"
810
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
@@ -50,7 +52,11 @@ func (s *service) handleGetTxs(request *GetTxsRequest) (*GetTxsResponse, error)
5052
return &rsp, nil
5153
}
5254

55+
func RuntimeProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID {
56+
return protocol.NewRuntimeProtocolID(chainContext, runtimeID, TxSyncProtocolID, TxSyncProtocolVersion)
57+
}
58+
5359
// NewServer creates a new transaction sync protocol server.
5460
func NewServer(chainContext string, runtimeID common.Namespace, txPool txpool.TransactionPool) rpc.Server {
55-
return rpc.NewServer(protocol.NewRuntimeProtocolID(chainContext, runtimeID, TxSyncProtocolID, TxSyncProtocolVersion), &service{txPool})
61+
return rpc.NewServer(RuntimeProtocolID(chainContext, runtimeID), &service{txPool})
5662
}

go/worker/keymanager/init.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ func New(
126126

127127
// Register keymanager service.
128128
commonWorker.P2P.RegisterProtocolServer(p2p.NewServer(commonWorker.ChainContext, w.runtimeID, w))
129+
commonWorker.P2P.AdvertiseProtocol(p2p.RuntimeProtocolID(commonWorker.ChainContext, w.runtimeID))
129130

130131
return w, nil
131132
}

go/worker/keymanager/p2p/server.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package p2p
33
import (
44
"context"
55

6+
"github.com/libp2p/go-libp2p/core"
67
"github.com/prometheus/client_golang/prometheus"
78

89
"github.com/oasisprotocol/oasis-core/go/common"
@@ -48,9 +49,13 @@ func (s *service) handleCallEnclave(ctx context.Context, request *CallEnclaveReq
4849
}, nil
4950
}
5051

52+
// RuntimeProtocolID returns the runtime protocol ID for the specified chain context and runtime ID.
53+
func RuntimeProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID {
54+
return protocol.NewRuntimeProtocolID(chainContext, runtimeID, KeyManagerProtocolID, KeyManagerProtocolVersion)
55+
}
56+
5157
// NewServer creates a new keymanager protocol server.
5258
func NewServer(chainContext string, runtimeID common.Namespace, km KeyManager) rpc.Server {
5359
initMetrics()
54-
55-
return rpc.NewServer(protocol.NewRuntimeProtocolID(chainContext, runtimeID, KeyManagerProtocolID, KeyManagerProtocolVersion), &service{km})
60+
return rpc.NewServer(RuntimeProtocolID(chainContext, runtimeID), &service{km})
5661
}

go/worker/storage/committee/node.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,11 +274,13 @@ func NewNode(
274274

275275
// Register storage sync service.
276276
commonNode.P2P.RegisterProtocolServer(storageSync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
277+
commonNode.P2P.AdvertiseProtocol(storageSync.RuntimeProtocolID(commonNode.ChainContext, commonNode.Runtime.ID()))
277278
n.storageSync = storageSync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID())
278279

279280
// Register storage pub service if configured.
280281
if rpcRoleProvider != nil {
281282
commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
283+
commonNode.P2P.AdvertiseProtocol(storagePub.RuntimeProtocolID(commonNode.ChainContext, commonNode.Runtime.ID()))
282284
}
283285

284286
return n, nil

0 commit comments

Comments
 (0)