Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changelog/6270.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
go/p2p: Ensure only server providers advertise themselves

Previously, the host would advertise itself upon creation
of p2p protocol client, even if the server was not running.

Advertisement is now independent and is only triggered
when serving the P2P protocol.
10 changes: 10 additions & 0 deletions go/p2p/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ type Service interface {
Publish(ctx context.Context, topic string, msg any)

// RegisterHandler registers a message handler for the specified runtime and topic kind.
//
// In addition, it triggers advertisement of the host's readiness to serve the specified topic,
// allowing remote peers without existing connection to discover it.
//
// Ensure your server is indeed ready, to avoid advertising prematurely.
RegisterHandler(topic string, handler Handler)

// BlockPeer blocks a specific peer from being used by the local node.
Expand All @@ -83,6 +88,11 @@ type Service interface {
RegisterProtocol(p core.ProtocolID, minPeers int, totalPeers int)

// RegisterProtocolServer registers a protocol server for the given protocol.
//
// In addition, it triggers advertisement of the host's readiness to serve the specified protocol,
// allowing remote peers without existing connection to discover it.
//
// Ensure your server is indeed ready, to avoid advertising prematurely.
RegisterProtocolServer(srv rpc.Server)

// GetMinRepublishInterval returns the minimum republish interval that needs to be respected by
Expand Down
7 changes: 5 additions & 2 deletions go/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ func (p *p2p) RegisterHandler(topic string, handler api.Handler) {
"topic", topic,
)

p.peerMgr.RegisterTopic(topic, minTopicPeers, totalTopicPeers)
p.peerMgr.TrackTopicPeers(topic, minTopicPeers, totalTopicPeers)
p.peerMgr.AdvertiseTopic(topic)
}

// Implements api.Service.
Expand All @@ -321,7 +322,7 @@ func (p *p2p) BlockPeer(peerID core.PeerID) {

// Implements api.Service.
func (p *p2p) RegisterProtocol(pid core.ProtocolID, minPeers int, totalPeers int) {
p.peerMgr.RegisterProtocol(pid, minPeers, totalPeers)
p.peerMgr.TrackProtocolPeers(pid, minPeers, totalPeers)
}

// Implements api.Service.
Expand All @@ -340,6 +341,8 @@ func (p *p2p) RegisterProtocolServer(srv rpc.Server) {

p.host.SetStreamHandler(srv.Protocol(), srv.HandleStream)

p.peerMgr.AdvertiseProtocol(srv.Protocol())

p.logger.Info("registered protocol server",
"protocol_id", srv.Protocol(),
)
Expand Down
4 changes: 4 additions & 0 deletions go/p2p/peermgmt/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func (d *peerDiscovery) startAdvertising(ns string) {
case d.advCh <- struct{}{}:
default:
}

d.logger.Debug("triggered protocol advertisement",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also be optional as I just saw that we log started advertising when the running thread picks up a new namespace.

"protocol", ns,
)
}

// stopAdvertising stops advertising the given namespace.
Expand Down
45 changes: 32 additions & 13 deletions go/p2p/peermgmt/peermgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ func (m *PeerManager) NumTopicPeers(topic string) int {
return len(m.pubsub.ListPeers(topic))
}

// RegisterProtocol starts tracking and managing peers that support the given protocol.
// TrackProtocolPeers starts tracking and managing peers that support the given protocol.
// If the protocol is already registered, its values are updated.
func (m *PeerManager) RegisterProtocol(p core.ProtocolID, minPeers int, totalPeers int) {
func (m *PeerManager) TrackProtocolPeers(p core.ProtocolID, minPeers int, totalPeers int) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -188,18 +188,16 @@ func (m *PeerManager) RegisterProtocol(p core.ProtocolID, minPeers int, totalPee
}

m.protocols[p] = &watermark{minPeers, totalPeers}
m.discovery.startAdvertising(string(p))

m.logger.Debug("protocol registered",
"protocol", p,
"min_peers", minPeers,
"total_peers", totalPeers,
)
}

// RegisterTopic starts tracking and managing peers that support the given topic.
// TrackTopicPeers starts tracking and managing peers that support the given topic.
// If the topic is already registered, its values are updated.
func (m *PeerManager) RegisterTopic(topic string, minPeers int, totalPeers int) {
func (m *PeerManager) TrackTopicPeers(topic string, minPeers int, totalPeers int) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -217,7 +215,6 @@ func (m *PeerManager) RegisterTopic(topic string, minPeers int, totalPeers int)
}

m.topics[topic] = &watermark{minPeers, totalPeers}
m.discovery.startAdvertising(topic)

m.logger.Debug("topic registered",
"topic", topic,
Expand All @@ -226,9 +223,23 @@ func (m *PeerManager) RegisterTopic(topic string, minPeers int, totalPeers int)
)
}

// UnregisterProtocol stops managing peers that support the given protocol.
// AdvertiseProtocol starts advertising readiness to serve the specified protocol.
//
// This enables remote peers without existing connection to find the host node.
func (m *PeerManager) AdvertiseProtocol(p core.ProtocolID) {
m.discovery.startAdvertising(string(p))
}

// AdvertiseTopic starts advertising readiness to serve the specified topic.
//
// This enables remote peers without existing connection to find the host node.
func (m *PeerManager) AdvertiseTopic(topic string) {
m.discovery.startAdvertising(topic)
}

// StopTrackingProtocolPeers stops managing peers that support the given protocol.
// If the protocol is not registered, this is a noop operation.
func (m *PeerManager) UnregisterProtocol(p core.ProtocolID) {
func (m *PeerManager) StopTrackingProtocolPeers(p core.ProtocolID) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -237,16 +248,15 @@ func (m *PeerManager) UnregisterProtocol(p core.ProtocolID) {
}

delete(m.protocols, p)
m.discovery.stopAdvertising(string(p))

m.logger.Debug("protocol unregistered",
"protocol", p,
)
}

// UnregisterTopic stops managing peers that support the given topic.
// StopTrackingTopicPeers stops managing peers that support the given topic.
// If the topic is not registered, this is a noop operation.
func (m *PeerManager) UnregisterTopic(topic string) {
func (m *PeerManager) StopTrackingTopicPeers(topic string) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -255,13 +265,22 @@ func (m *PeerManager) UnregisterTopic(topic string) {
}

delete(m.topics, topic)
m.discovery.stopAdvertising(topic)

m.logger.Debug("topic unregistered",
"topic", topic,
)
}

// StopAdvertisingProtocol stops advertising readiness to serve the specified protocol.
func (m *PeerManager) StopAdvertisingProtocol(p core.ProtocolID) {
m.discovery.stopAdvertising(string(p))
}

// StopAdvertisingTopic stops advertising readiness to serve the specified protocol.
func (m *PeerManager) StopAdvertisingTopic(topic string) {
m.discovery.stopAdvertising(topic)
}

func (m *PeerManager) run(ctx context.Context) {
// Start background services.
m.backup.start()
Expand Down
20 changes: 10 additions & 10 deletions go/p2p/peermgmt/peermgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (s *PeerManagerTestSuite) TestRegisterProtocol() {

for i := 0; i < 3; i++ {
p := core.ProtocolID(fmt.Sprintf("/protocol/test/%d.0.0", i))
s.manager.RegisterProtocol(p, 1, 10)
s.manager.TrackProtocolPeers(p, 1, 10)
require.Equal(i+1, len(s.manager.Protocols()))
}
}
Expand All @@ -136,7 +136,7 @@ func (s *PeerManagerTestSuite) TestRegisterTopic() {

for i := 0; i < 3; i++ {
t := fmt.Sprintf("topic %d", i)
s.manager.RegisterTopic(t, 1, 10)
s.manager.TrackTopicPeers(t, 1, 10)
require.Equal(i+1, len(s.manager.Topics()))
}
}
Expand All @@ -146,20 +146,20 @@ func (s *PeerManagerTestSuite) TestUnregisterProtocol() {

for i := 0; i < 3; i++ {
p := core.ProtocolID(fmt.Sprintf("/protocol/test/%d.0.0", i))
s.manager.RegisterProtocol(p, 1, 10)
s.manager.TrackProtocolPeers(p, 1, 10)
require.Equal(i+1, len(s.manager.Protocols()))
}

s.manager.UnregisterProtocol("404")
s.manager.StopTrackingProtocolPeers("404")
require.Equal(3, len(s.manager.Protocols()))

for i := 0; i < 3; i++ {
p := core.ProtocolID(fmt.Sprintf("/protocol/test/%d.0.0", i))
s.manager.UnregisterProtocol(p)
s.manager.StopTrackingProtocolPeers(p)
require.Equal(2-i, len(s.manager.Protocols()))
}

s.manager.UnregisterProtocol("404")
s.manager.StopTrackingProtocolPeers("404")
require.Equal(0, len(s.manager.Protocols()))
}

Expand All @@ -168,19 +168,19 @@ func (s *PeerManagerTestSuite) TestUnregisterTopic() {

for i := 0; i < 3; i++ {
t := fmt.Sprintf("topic %d", i)
s.manager.RegisterTopic(t, 1, 10)
s.manager.TrackTopicPeers(t, 1, 10)
require.Equal(i+1, len(s.manager.Topics()))
}

s.manager.UnregisterTopic("404")
s.manager.StopTrackingTopicPeers("404")
require.Equal(3, len(s.manager.Topics()))

for i := 0; i < 3; i++ {
s.manager.UnregisterTopic(fmt.Sprintf("topic %d", i))
s.manager.StopTrackingTopicPeers(fmt.Sprintf("topic %d", i))
require.Equal(2-i, len(s.manager.Topics()))
}

s.manager.UnregisterTopic("404")
s.manager.StopTrackingTopicPeers("404")
require.Equal(0, len(s.manager.Topics()))
}

Expand Down
6 changes: 6 additions & 0 deletions go/worker/common/p2p/txsync/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package txsync
import (
"github.com/libp2p/go-libp2p/core"

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/version"
Expand All @@ -16,6 +17,11 @@ const TxSyncProtocolID = "txsync"
// TxSyncProtocolVersion is the supported version of the transaction sync protocol.
var TxSyncProtocolVersion = version.Version{Major: 2, Minor: 0, Patch: 0}

// ProtocolID returns the runtime transaction sync protocol ID.
func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID {
return protocol.NewRuntimeProtocolID(chainContext, runtimeID, TxSyncProtocolID, TxSyncProtocolVersion)
}

// Constants related to the GetTxs method.
const (
MethodGetTxs = "GetTxs"
Expand Down
3 changes: 1 addition & 2 deletions go/worker/common/p2p/txsync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
"github.com/oasisprotocol/oasis-core/go/runtime/txpool"
)
Expand Down Expand Up @@ -52,5 +51,5 @@ func (s *service) handleGetTxs(request *GetTxsRequest) (*GetTxsResponse, error)

// NewServer creates a new transaction sync protocol server.
func NewServer(chainContext string, runtimeID common.Namespace, txPool txpool.TransactionPool) rpc.Server {
return rpc.NewServer(protocol.NewRuntimeProtocolID(chainContext, runtimeID, TxSyncProtocolID, TxSyncProtocolVersion), &service{txPool})
return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{txPool})
}
6 changes: 6 additions & 0 deletions go/worker/keymanager/p2p/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/libp2p/go-libp2p/core"

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/version"
"github.com/oasisprotocol/oasis-core/go/p2p/peermgmt"
Expand All @@ -18,6 +19,11 @@ const KeyManagerProtocolID = "keymanager"
// KeyManagerProtocolVersion is the supported version of the keymanager protocol.
var KeyManagerProtocolVersion = version.Version{Major: 2, Minor: 0, Patch: 0}

// ProtocolID returns the runtime keymanager protocol ID.
func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID {
return protocol.NewRuntimeProtocolID(chainContext, runtimeID, KeyManagerProtocolID, KeyManagerProtocolVersion)
}

// Constants related to the CallEnclave method.
const (
MethodCallEnclave = "CallEnclave"
Expand Down
3 changes: 1 addition & 2 deletions go/worker/keymanager/p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
enclaverpc "github.com/oasisprotocol/oasis-core/go/runtime/enclaverpc/api"
)
Expand Down Expand Up @@ -52,5 +51,5 @@ func (s *service) handleCallEnclave(ctx context.Context, request *CallEnclaveReq
func NewServer(chainContext string, runtimeID common.Namespace, km KeyManager) rpc.Server {
initMetrics()

return rpc.NewServer(protocol.NewRuntimeProtocolID(chainContext, runtimeID, KeyManagerProtocolID, KeyManagerProtocolVersion), &service{km})
return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{km})
}
6 changes: 6 additions & 0 deletions go/worker/storage/p2p/pub/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pub
import (
"github.com/libp2p/go-libp2p/core"

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/version"
"github.com/oasisprotocol/oasis-core/go/p2p/peermgmt"
Expand All @@ -16,6 +17,11 @@ const StoragePubProtocolID = "storagepub"
// StoragePubProtocolVersion is the supported version of the storage pub protocol.
var StoragePubProtocolVersion = version.Version{Major: 2, Minor: 0, Patch: 0}

// ProtocolID returns the runtime storage pub protocol ID.
func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID {
return protocol.NewRuntimeProtocolID(chainContext, runtimeID, StoragePubProtocolID, StoragePubProtocolVersion)
}

// Constants related to the Get method.
const (
MethodGet = "Get"
Expand Down
3 changes: 1 addition & 2 deletions go/worker/storage/p2p/pub/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
)
Expand Down Expand Up @@ -44,5 +43,5 @@ func (s *service) HandleRequest(ctx context.Context, method string, body cbor.Ra

// NewServer creates a new storage pub protocol server.
func NewServer(chainContext string, runtimeID common.Namespace, backend storage.Backend) rpc.Server {
return rpc.NewServer(protocol.NewRuntimeProtocolID(chainContext, runtimeID, StoragePubProtocolID, StoragePubProtocolVersion), &service{backend})
return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{backend})
}
6 changes: 6 additions & 0 deletions go/worker/storage/p2p/sync/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/libp2p/go-libp2p/core"

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/version"
Expand All @@ -20,6 +21,11 @@ const StorageSyncProtocolID = "storagesync"
// StorageSyncProtocolVersion is the supported version of the storage sync protocol.
var StorageSyncProtocolVersion = version.Version{Major: 2, Minor: 0, Patch: 0}

// ProtocolID returns the runtime storage sync protocol ID.
func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID {
return protocol.NewRuntimeProtocolID(chainContext, runtimeID, StorageSyncProtocolID, StorageSyncProtocolVersion)
}

// Constants related to the GetDiff method.
const (
MethodGetDiff = "GetDiff"
Expand Down
3 changes: 1 addition & 2 deletions go/worker/storage/p2p/sync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
Expand Down Expand Up @@ -105,5 +104,5 @@ func (s *service) handleGetCheckpointChunk(ctx context.Context, request *GetChec

// NewServer creates a new storage sync protocol server.
func NewServer(chainContext string, runtimeID common.Namespace, backend storage.Backend) rpc.Server {
return rpc.NewServer(protocol.NewRuntimeProtocolID(chainContext, runtimeID, StorageSyncProtocolID, StorageSyncProtocolVersion), &service{backend})
return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{backend})
}
Loading