Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changelog/6262.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
go: Split storage sync p2p protocol

Storage sync protocol was split into two independent protocols (checkpoint
and diff sync).

This change was made since there may be fewer nodes that expose checkpoints
than storage diff. Previously, this could lead to issues with state sync
when a node was connected with peers that supported storage sync protocol
but had no checkpoints available.

This was done in backwards compatible manner, so that both protocols are still
advertised and used. Eventually, we plan to remove legacy protocol.
2 changes: 1 addition & 1 deletion go/oasis-node/cmd/debug/byzantine/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
"github.com/oasisprotocol/oasis-core/go/worker/client"
storageP2P "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
storageP2P "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
)

type byzantine struct {
Expand Down
7 changes: 5 additions & 2 deletions go/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type p2p struct {
registerAddresses []multiaddr.Multiaddr
topics map[string]*topicHandler

protocolRegistry *protocol.Registry

logger *logging.Logger
}

Expand Down Expand Up @@ -281,7 +283,7 @@ func (p *p2p) Publish(_ context.Context, topic string, msg any) {

// Implements api.Service.
func (p *p2p) RegisterHandler(topic string, handler api.Handler) {
protocol.ValidateTopicID(topic)
p.protocolRegistry.ValidateTopicID(topic)

p.Lock()
defer p.Unlock()
Expand Down Expand Up @@ -337,7 +339,7 @@ func (p *p2p) PeerManager() api.PeerManager {

// Implements api.Service.
func (p *p2p) RegisterProtocolServer(srv rpc.Server) {
protocol.ValidateProtocolID(srv.Protocol())
p.protocolRegistry.ValidateProtocolID(srv.Protocol())

p.host.SetStreamHandler(srv.Protocol(), srv.HandleStream)

Expand Down Expand Up @@ -439,6 +441,7 @@ func New(identity *identity.Identity, chainContext string, store *persistent.Com
pubsub: pubsub,
registerAddresses: cfg.Addresses,
topics: make(map[string]*topicHandler),
protocolRegistry: protocol.NewRegistry(),
logger: logger,
}, nil
}
Expand Down
23 changes: 11 additions & 12 deletions go/p2p/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,32 @@ import (
"github.com/oasisprotocol/oasis-core/go/p2p/api"
)

type protocolRegistry struct {
// Registry is responsible for ensuring unique protocol ids.
type Registry struct {
mu sync.Mutex
protocols map[core.ProtocolID]struct{}
}

func newProtocolRegistry() *protocolRegistry {
return &protocolRegistry{
func NewRegistry() *Registry {
return &Registry{
protocols: make(map[core.ProtocolID]struct{}),
}
}

var registry = newProtocolRegistry()

// ValidateProtocolID panics if the protocol id is not unique.
func ValidateProtocolID(p core.ProtocolID) {
registry.mu.Lock()
defer registry.mu.Unlock()
func (r *Registry) ValidateProtocolID(p core.ProtocolID) {
r.mu.Lock()
defer r.mu.Unlock()

if _, ok := registry.protocols[p]; ok {
if _, ok := r.protocols[p]; ok {
panic(fmt.Sprintf("p2p/protocol: protocol or topic with name '%s' already exists", p))
}
registry.protocols[p] = struct{}{}
r.protocols[p] = struct{}{}
}

// ValidateTopicID panics if the topic id is not unique.
func ValidateTopicID(topic string) {
ValidateProtocolID(core.ProtocolID(topic))
func (r *Registry) ValidateTopicID(topic string) {
r.ValidateProtocolID(core.ProtocolID(topic))
}

// NewProtocolID generates a protocol identifier for a consensus P2P protocol.
Expand Down
14 changes: 6 additions & 8 deletions go/p2p/protocol/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ func TestProtocolID(t *testing.T) {
require.Equal(expected, NewTopicIDForRuntime(chainContext, runtimeID, kind, version))
})

registry = newProtocolRegistry()

t.Run("ValidateProtocolID", func(_ *testing.T) {
ValidateProtocolID("protocol-1")
ValidateProtocolID("protocol-2")
r := NewRegistry()
r.ValidateProtocolID("protocol-1")
r.ValidateProtocolID("protocol-2")
})

t.Run("ValidateProtocolID panics", func(t *testing.T) {
Expand All @@ -69,9 +68,8 @@ func TestProtocolID(t *testing.T) {
t.Errorf("validate protocol id should fail")
}
}()
ValidateProtocolID("protocol")
ValidateProtocolID("protocol")
r := NewRegistry()
r.ValidateProtocolID("protocol")
r.ValidateProtocolID("protocol")
})

registry = newProtocolRegistry()
}
12 changes: 6 additions & 6 deletions go/p2p/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ type Client interface {
}

type client struct {
host core.Host
protocolID protocol.ID
host core.Host
protocolIDs []protocol.ID

listeners struct {
sync.RWMutex
Expand Down Expand Up @@ -486,7 +486,7 @@ func (c *client) call(
stream, err := c.host.NewStream(
ctx,
peerID,
c.protocolID,
c.protocolIDs...,
)
if err != nil {
return fmt.Errorf("failed to open stream: %w", err)
Expand Down Expand Up @@ -612,15 +612,15 @@ func retryFn(ctx context.Context, fn func() error, maxRetries uint64, retryInter
}

// NewClient creates a new RPC client for the given protocol.
func NewClient(h host.Host, p protocol.ID) Client {
func NewClient(h host.Host, p protocol.ID, fallback ...protocol.ID) Client {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to accept pids []protocol.ID because go-libp2p implementation doesn't guarantee that the first ID will be used if possible.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just syntactic sugar for slice? The main problem as you point out is that interface does not promise order.

In practice, implementation favors protocols from left to right.

Ideally, we should not rely on this. If we had many methods as part of our protocol likely this would be the only sensible approach. But we don't, so maybe we can make it simpler.

if h == nil {
// No P2P service, use the no-op client.
return &nopClient{}
}

return &client{
host: h,
protocolID: p,
host: h,
protocolIDs: append([]protocol.ID{p}, fallback...),
listeners: struct {
sync.RWMutex
m map[ClientListener]struct{}
Expand Down
7 changes: 6 additions & 1 deletion go/storage/mkvs/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ type ChunkProvider interface {

// GetCheckpointsRequest is a GetCheckpoints request.
type GetCheckpointsRequest struct {
Version uint16 `json:"version"`
// Version is version of the checkpoint request.
Version uint16 `json:"version"`
// Namespace is the namespace the checkpoints are for.
//
// Existing server implementation may ignore validation of this field.
// Best practice is to still pass it, but not rely on the actual validation.
Namespace common.Namespace `json:"namespace"`

// RootVersion specifies an optional root version to limit the request to. If specified, only
Expand Down
20 changes: 10 additions & 10 deletions go/worker/storage/committee/checkpoint_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

storageApi "github.com/oasisprotocol/oasis-core/go/storage/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
)

const (
Expand Down Expand Up @@ -55,7 +55,7 @@ type chunk struct {
*checkpoint.ChunkMetadata

// checkpoint points to the checkpoint this chunk originated from.
checkpoint *storageSync.Checkpoint
checkpoint *checkpointsync.Checkpoint
}

type chunkHeap struct {
Expand Down Expand Up @@ -101,7 +101,7 @@ func (n *Node) checkpointChunkFetcher(
defer cancel()

// Fetch chunk from peers.
rsp, pf, err := n.storageSync.GetCheckpointChunk(chunkCtx, &storageSync.GetCheckpointChunkRequest{
rsp, pf, err := n.checkpointSync.GetCheckpointChunk(chunkCtx, &checkpointsync.GetCheckpointChunkRequest{
Version: chunk.Version,
Root: chunk.Root,
Index: chunk.Index,
Expand Down Expand Up @@ -157,7 +157,7 @@ func (n *Node) checkpointChunkFetcher(
}
}

func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) {
func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) {
if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check.Metadata); err != nil {
// Any previous restores were already aborted by the driver up the call stack, so
// things should have been going smoothly here; bail.
Expand Down Expand Up @@ -276,11 +276,11 @@ func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelReques
}
}

func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) {
func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) {
ctx, cancel := context.WithTimeout(n.ctx, cpListsTimeout)
defer cancel()

list, err := n.storageSync.GetCheckpoints(ctx, &storageSync.GetCheckpointsRequest{
list, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{
Version: 1,
})
if err != nil {
Expand All @@ -297,8 +297,8 @@ func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) {
}

// sortCheckpoints sorts the slice in-place (descending by version, peers, hash).
func sortCheckpoints(s []*storageSync.Checkpoint) {
slices.SortFunc(s, func(a, b *storageSync.Checkpoint) int {
func sortCheckpoints(s []*checkpointsync.Checkpoint) {
slices.SortFunc(s, func(a, b *checkpointsync.Checkpoint) int {
return cmp.Or(
cmp.Compare(b.Root.Version, a.Root.Version),
cmp.Compare(len(b.Peers), len(a.Peers)),
Expand All @@ -307,7 +307,7 @@ func sortCheckpoints(s []*storageSync.Checkpoint) {
})
}

func (n *Node) checkCheckpointUsable(cp *storageSync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool {
func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool {
namespace := n.commonNode.Runtime.ID()
if !namespace.Equal(&cp.Root.Namespace) {
// Not for the right runtime.
Expand Down Expand Up @@ -357,7 +357,7 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc

// If we only want the genesis checkpoint, filter it out.
if wantOnlyGenesis && len(cps) > 0 {
var filteredCps []*storageSync.Checkpoint
var filteredCps []*checkpointsync.Checkpoint
for _, cp := range cps {
if cp.Root.Version == genesisRound {
filteredCps = append(filteredCps, cp)
Expand Down
14 changes: 7 additions & 7 deletions go/worker/storage/committee/checkpoint_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,35 @@ import (
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/node"
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
)

func TestSortCheckpoints(t *testing.T) {
cp1 := &sync.Checkpoint{
cp1 := &checkpointsync.Checkpoint{
Metadata: &checkpoint.Metadata{
Root: node.Root{
Version: 2,
},
},
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()},
}
cp2 := &sync.Checkpoint{
cp2 := &checkpointsync.Checkpoint{
Metadata: &checkpoint.Metadata{
Root: node.Root{
Version: 2,
},
},
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()},
}
cp3 := &sync.Checkpoint{
cp3 := &checkpointsync.Checkpoint{
Metadata: &checkpoint.Metadata{
Root: node.Root{
Version: 1,
},
},
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()},
}
cp4 := &sync.Checkpoint{
cp4 := &checkpointsync.Checkpoint{
Metadata: &checkpoint.Metadata{
Root: node.Root{
Version: 1,
Expand All @@ -45,9 +45,9 @@ func TestSortCheckpoints(t *testing.T) {
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()},
}

s := []*sync.Checkpoint{cp2, cp3, cp4, cp1}
s := []*checkpointsync.Checkpoint{cp2, cp3, cp4, cp1}

sortCheckpoints(s)

assert.Equal(t, s, []*sync.Checkpoint{cp1, cp2, cp3, cp4})
assert.Equal(t, s, []*checkpointsync.Checkpoint{cp1, cp2, cp3, cp4})
}
25 changes: 19 additions & 6 deletions go/worker/storage/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import (
"github.com/oasisprotocol/oasis-core/go/worker/common/committee"
"github.com/oasisprotocol/oasis-core/go/worker/registration"
"github.com/oasisprotocol/oasis-core/go/worker/storage/api"
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync"
storagePub "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/pub"
storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
)

var (
Expand Down Expand Up @@ -128,7 +130,8 @@ type Node struct { // nolint: maligned

localStorage storageApi.LocalBackend

storageSync storageSync.Client
diffSync diffsync.Client
checkpointSync checkpointsync.Client

undefinedRound uint64

Expand Down Expand Up @@ -272,9 +275,19 @@ func NewNode(
node: n,
})

// Register storage sync service.
commonNode.P2P.RegisterProtocolServer(storageSync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
n.storageSync = storageSync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID())
// Advertise and serve legacy storage sync protocol.
commonNode.P2P.RegisterProtocolServer(synclegacy.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))

// Register diff sync protocol server.
commonNode.P2P.RegisterProtocolServer(diffsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
// Register checkpoint sync protocol server if checkpoints are enabled.
if checkInterval < checkpoint.CheckIntervalDisabled {
commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
}

// Create diff and checkpoint sync p2p protocol clients that have a fallback to the old legacy storage sync protocol.
n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID())
n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID())

// Register storage pub service if configured.
if rpcRoleProvider != nil {
Expand Down Expand Up @@ -430,7 +443,7 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) {
ctx, cancel := context.WithCancel(n.ctx)
defer cancel()

rsp, pf, err := n.storageSync.GetDiff(ctx, &storageSync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot})
rsp, pf, err := n.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot})
if err != nil {
result.err = err
return
Expand Down
Loading
Loading