From 24fd955696f815446db57e76c719aefac4879586 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Wed, 23 Jul 2025 22:11:07 +0200 Subject: [PATCH 1/7] go/p2p/rpc: Make CallMulti fail if 0 peers --- go/p2p/rpc/client.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/p2p/rpc/client.go b/go/p2p/rpc/client.go index 888045ad471..e471db89092 100644 --- a/go/p2p/rpc/client.go +++ b/go/p2p/rpc/client.go @@ -364,6 +364,10 @@ func (c *client) CallMulti( ) ([]any, []PeerFeedback, error) { c.logger.Debug("call multiple", "method", method) + if len(peers) == 0 { + return nil, nil, fmt.Errorf("no peers given to service the request") + } + co := NewCallMultiOptions(opts...) // Prepare the request. From 9782a1f75e27f6ab63af0fb7432ebb90aebbdd2c Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Wed, 23 Jul 2025 23:05:42 +0200 Subject: [PATCH 2/7] go/worker/storage/p2p: Split storage sync protocol --- .../storage/p2p/checkpointsync/client.go | 125 ++++++++++++++++++ .../storage/p2p/checkpointsync/protocol.go | 80 +++++++++++ .../storage/p2p/checkpointsync/server.go | 73 ++++++++++ go/worker/storage/p2p/diffsync/client.go | 58 ++++++++ go/worker/storage/p2p/diffsync/protocol.go | 61 +++++++++ go/worker/storage/p2p/diffsync/server.go | 61 +++++++++ 6 files changed, 458 insertions(+) create mode 100644 go/worker/storage/p2p/checkpointsync/client.go create mode 100644 go/worker/storage/p2p/checkpointsync/protocol.go create mode 100644 go/worker/storage/p2p/checkpointsync/server.go create mode 100644 go/worker/storage/p2p/diffsync/client.go create mode 100644 go/worker/storage/p2p/diffsync/protocol.go create mode 100644 go/worker/storage/p2p/diffsync/server.go diff --git a/go/worker/storage/p2p/checkpointsync/client.go b/go/worker/storage/p2p/checkpointsync/client.go new file mode 100644 index 00000000000..64edb9ecc0a --- /dev/null +++ b/go/worker/storage/p2p/checkpointsync/client.go @@ -0,0 +1,125 @@ +package checkpointsync + +import ( + "context" + + "github.com/libp2p/go-libp2p/core" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/p2p/protocol" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" +) + +const ( + // minProtocolPeers is the minimum number of peers from the registry we want to have connected + // for checkpoint sync protocol. + minProtocolPeers = 5 + + // totalProtocolPeers is the number of peers we want to have connected for checkpoint sync protocol. + totalProtocolPeers = 10 +) + +// Client is a checkpoint sync protocol client. +type Client interface { + // GetCheckpoints returns a list of checkpoint metadata for all known checkpoints. + GetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) ([]*Checkpoint, error) + + // GetCheckpointChunk requests a specific checkpoint chunk. + GetCheckpointChunk( + ctx context.Context, + request *GetCheckpointChunkRequest, + cp *Checkpoint, + ) (*GetCheckpointChunkResponse, rpc.PeerFeedback, error) +} + +// Checkpoint contains checkpoint metadata together with peer information. +type Checkpoint struct { + *checkpoint.Metadata + + // Peers are the feedback structures of all the peers that have advertised this checkpoint. + Peers []rpc.PeerFeedback +} + +type client struct { + rc rpc.Client + mgr rpc.PeerManager +} + +func (c *client) GetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) ([]*Checkpoint, error) { + var rsp GetCheckpointsResponse + rsps, pfs, err := c.rc.CallMulti(ctx, c.mgr.GetBestPeers(), MethodGetCheckpoints, request, rsp) + if err != nil { + return nil, err + } + + // Combine deduplicated results into a single result. + var checkpoints []*Checkpoint + cps := make(map[hash.Hash]*Checkpoint) + for i, peerRsp := range rsps { + peerCps := peerRsp.(*GetCheckpointsResponse).Checkpoints + + for _, cpMeta := range peerCps { + h := cpMeta.EncodedHash() + cp := cps[h] + if cp == nil { + cp = &Checkpoint{ + Metadata: cpMeta, + } + cps[h] = cp + checkpoints = append(checkpoints, cp) + } + cp.Peers = append(cp.Peers, pfs[i]) + } + + // Record success for a peer if it returned at least one checkpoint. + if len(peerCps) > 0 { + pfs[i].RecordSuccess() + } + } + return checkpoints, nil +} + +func (c *client) GetCheckpointChunk( + ctx context.Context, + request *GetCheckpointChunkRequest, + cp *Checkpoint, +) (*GetCheckpointChunkResponse, rpc.PeerFeedback, error) { + var opts []rpc.BestPeersOption + // When a checkpoint is passed, we limit requests to only those peers that actually advertised + // having the checkpoint in question to avoid needless requests. + if cp != nil { + peers := make([]core.PeerID, 0, len(cp.Peers)) + for _, pf := range cp.Peers { + peers = append(peers, pf.PeerID()) + } + opts = append(opts, rpc.WithLimitPeers(peers)) + } + + var rsp GetCheckpointChunkResponse + pf, err := c.rc.CallOne(ctx, c.mgr.GetBestPeers(opts...), MethodGetCheckpointChunk, request, &rsp, + rpc.WithMaxPeerResponseTime(MaxGetCheckpointChunkResponseTime), + ) + if err != nil { + return nil, nil, err + } + return &rsp, pf, nil +} + +// NewClient creates a new checkpoint sync protocol client. +// +// Moreover, it ensures underlying p2p service starts tracking protocol peers. +func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { + pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion) + rc := rpc.NewClient(p2p.Host(), pid) + mgr := rpc.NewPeerManager(p2p, pid) + rc.RegisterListener(mgr) + + p2p.RegisterProtocol(pid, minProtocolPeers, totalProtocolPeers) + + return &client{ + rc: rc, + mgr: mgr, + } +} diff --git a/go/worker/storage/p2p/checkpointsync/protocol.go b/go/worker/storage/p2p/checkpointsync/protocol.go new file mode 100644 index 00000000000..140f5e3d618 --- /dev/null +++ b/go/worker/storage/p2p/checkpointsync/protocol.go @@ -0,0 +1,80 @@ +// Package checkpointsync defines wire protocol together with client/server +// implementations for the checkpoint sync protocol, used for runtime state sync. +package checkpointsync + +import ( + "time" + + "github.com/libp2p/go-libp2p/core" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/common/node" + "github.com/oasisprotocol/oasis-core/go/common/version" + "github.com/oasisprotocol/oasis-core/go/p2p/peermgmt" + "github.com/oasisprotocol/oasis-core/go/p2p/protocol" + "github.com/oasisprotocol/oasis-core/go/storage/api" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" +) + +// CheckpointSyncProtocolID is a unique protocol identifier for the checkpoint sync protocol. +const CheckpointSyncProtocolID = "checkpointsync" + +// CheckpointSyncProtocolVersion is the supported version of the checkpoint sync protocol. +var CheckpointSyncProtocolVersion = version.Version{Major: 1, Minor: 0, Patch: 0} + +// ProtocolID returns the runtime checkpoint sync protocol ID. +func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID { + return protocol.NewRuntimeProtocolID(chainContext, runtimeID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion) +} + +// Constants related to the GetCheckpoints method. +const ( + MethodGetCheckpoints = "GetCheckpoints" +) + +// GetCheckpointsRequest is a GetCheckpoints request. +type GetCheckpointsRequest struct { + Version uint16 `json:"version"` +} + +// GetCheckpointsResponse is a response to a GetCheckpoints request. +type GetCheckpointsResponse struct { + Checkpoints []*checkpoint.Metadata `json:"checkpoints,omitempty"` +} + +// Constants related to the GetCheckpointChunk method. +const ( + MethodGetCheckpointChunk = "GetCheckpointChunk" + MaxGetCheckpointChunkResponseTime = time.Minute +) + +// GetCheckpointChunkRequest is a GetCheckpointChunk request. +type GetCheckpointChunkRequest struct { + Version uint16 `json:"version"` + Root api.Root `json:"root"` + Index uint64 `json:"index"` + Digest hash.Hash `json:"digest"` +} + +// GetCheckpointChunkResponse is a response to a GetCheckpointChunk request. +type GetCheckpointChunkResponse struct { + Chunk []byte `json:"chunk,omitempty"` +} + +func init() { + peermgmt.RegisterNodeHandler(&peermgmt.NodeHandlerBundle{ + ProtocolsFn: func(n *node.Node, chainContext string) []core.ProtocolID { + if !n.HasRoles(node.RoleComputeWorker | node.RoleStorageRPC) { + return []core.ProtocolID{} + } + + protocols := make([]core.ProtocolID, len(n.Runtimes)) + for i, rt := range n.Runtimes { + protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion) + } + + return protocols + }, + }) +} diff --git a/go/worker/storage/p2p/checkpointsync/server.go b/go/worker/storage/p2p/checkpointsync/server.go new file mode 100644 index 00000000000..2df4d0f68db --- /dev/null +++ b/go/worker/storage/p2p/checkpointsync/server.go @@ -0,0 +1,73 @@ +package checkpointsync + +import ( + "bytes" + "context" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/cbor" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" + "github.com/oasisprotocol/oasis-core/go/storage/api" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" +) + +type service struct { + backend api.Backend +} + +func (s *service) HandleRequest(ctx context.Context, method string, body cbor.RawMessage) (any, error) { + switch method { + case MethodGetCheckpoints: + var rq GetCheckpointsRequest + if err := cbor.Unmarshal(body, &rq); err != nil { + return nil, rpc.ErrBadRequest + } + + return s.handleGetCheckpoints(ctx, &rq) + case MethodGetCheckpointChunk: + var rq GetCheckpointChunkRequest + if err := cbor.Unmarshal(body, &rq); err != nil { + return nil, rpc.ErrBadRequest + } + + return s.handleGetCheckpointChunk(ctx, &rq) + default: + return nil, rpc.ErrMethodNotSupported + } +} + +func (s *service) handleGetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) (*GetCheckpointsResponse, error) { + cps, err := s.backend.GetCheckpoints(ctx, &checkpoint.GetCheckpointsRequest{ + Version: request.Version, + }) + if err != nil { + return nil, err + } + + return &GetCheckpointsResponse{ + Checkpoints: cps, + }, nil +} + +func (s *service) handleGetCheckpointChunk(ctx context.Context, request *GetCheckpointChunkRequest) (*GetCheckpointChunkResponse, error) { + // Consider using stream resource manager to track buffer use. + var buf bytes.Buffer + err := s.backend.GetCheckpointChunk(ctx, &checkpoint.ChunkMetadata{ + Version: request.Version, + Root: request.Root, + Index: request.Index, + Digest: request.Digest, + }, &buf) + if err != nil { + return nil, err + } + + return &GetCheckpointChunkResponse{ + Chunk: buf.Bytes(), + }, nil +} + +// NewServer creates a new checkpoint sync protocol server. +func NewServer(chainContext string, runtimeID common.Namespace, backend api.Backend) rpc.Server { + return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{backend}) +} diff --git a/go/worker/storage/p2p/diffsync/client.go b/go/worker/storage/p2p/diffsync/client.go new file mode 100644 index 00000000000..6fcfc6d111b --- /dev/null +++ b/go/worker/storage/p2p/diffsync/client.go @@ -0,0 +1,58 @@ +package diffsync + +import ( + "context" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/p2p/protocol" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" +) + +const ( + // minProtocolPeers is the minimum number of peers from the registry we want to have connected + // for diff sync protocol. + minProtocolPeers = 5 + + // totalProtocolPeers is the number of peers we want to have connected for diff sync protocol. + totalProtocolPeers = 10 +) + +// Client is a diff sync protocol client. +type Client interface { + // GetDiff requests a write log of entries that must be applied to get from the first given root + // to the second one. + GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) +} + +type client struct { + rc rpc.Client + mgr rpc.PeerManager +} + +func (c *client) GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) { + var rsp GetDiffResponse + pf, err := c.rc.CallOne(ctx, c.mgr.GetBestPeers(), MethodGetDiff, request, &rsp, + rpc.WithMaxPeerResponseTime(MaxGetDiffResponseTime), + ) + if err != nil { + return nil, nil, err + } + return &rsp, pf, nil +} + +// NewClient creates a new diff sync protocol client. +// +// Moreover, it ensures underlying p2p service starts tracking protocol peers. +func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { + pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, DiffSyncProtocolID, DiffSyncProtocolVersion) + rc := rpc.NewClient(p2p.Host(), pid) + mgr := rpc.NewPeerManager(p2p, pid) + rc.RegisterListener(mgr) + + p2p.RegisterProtocol(pid, minProtocolPeers, totalProtocolPeers) + + return &client{ + rc: rc, + mgr: mgr, + } +} diff --git a/go/worker/storage/p2p/diffsync/protocol.go b/go/worker/storage/p2p/diffsync/protocol.go new file mode 100644 index 00000000000..c53304d5666 --- /dev/null +++ b/go/worker/storage/p2p/diffsync/protocol.go @@ -0,0 +1,61 @@ +// Package diffsync defines wire protocol together with client/server +// implementations for the diff sync protocol, used for runtime block sync. +package diffsync + +import ( + "time" + + "github.com/libp2p/go-libp2p/core" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/node" + "github.com/oasisprotocol/oasis-core/go/common/version" + "github.com/oasisprotocol/oasis-core/go/p2p/peermgmt" + "github.com/oasisprotocol/oasis-core/go/p2p/protocol" + "github.com/oasisprotocol/oasis-core/go/storage/api" +) + +// DiffSyncProtocolID is a unique protocol identifier for the diff sync protocol. +const DiffSyncProtocolID = "diffsync" + +// DiffSyncProtocolVersion is the supported version of the diff sync protocol. +var DiffSyncProtocolVersion = version.Version{Major: 1, Minor: 0, Patch: 0} + +// ProtocolID returns the runtime diff sync protocol ID. +func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID { + return protocol.NewRuntimeProtocolID(chainContext, runtimeID, DiffSyncProtocolID, DiffSyncProtocolVersion) +} + +// Constants related to the GetDiff method. +const ( + MethodGetDiff = "GetDiff" + MaxGetDiffResponseTime = 15 * time.Second +) + +// GetDiffRequest is a GetDiff request. +type GetDiffRequest struct { + StartRoot api.Root `json:"start_root"` + EndRoot api.Root `json:"end_root"` +} + +// GetDiffResponse is a response to a GetDiff request. +type GetDiffResponse struct { + WriteLog api.WriteLog `json:"write_log,omitempty"` +} + +func init() { + peermgmt.RegisterNodeHandler(&peermgmt.NodeHandlerBundle{ + ProtocolsFn: func(n *node.Node, chainContext string) []core.ProtocolID { + if !n.HasRoles(node.RoleComputeWorker | node.RoleStorageRPC) { + return []core.ProtocolID{} + } + + protocols := make([]core.ProtocolID, len(n.Runtimes)) + for i, rt := range n.Runtimes { + protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, DiffSyncProtocolID, DiffSyncProtocolVersion) + } + + return protocols + }, + }) +} diff --git a/go/worker/storage/p2p/diffsync/server.go b/go/worker/storage/p2p/diffsync/server.go new file mode 100644 index 00000000000..4267f8dc6a8 --- /dev/null +++ b/go/worker/storage/p2p/diffsync/server.go @@ -0,0 +1,61 @@ +package diffsync + +import ( + "context" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/cbor" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" + "github.com/oasisprotocol/oasis-core/go/storage/api" +) + +type service struct { + backend api.Backend +} + +func (s *service) HandleRequest(ctx context.Context, method string, body cbor.RawMessage) (any, error) { + switch method { + case MethodGetDiff: + var rq GetDiffRequest + if err := cbor.Unmarshal(body, &rq); err != nil { + return nil, rpc.ErrBadRequest + } + + return s.handleGetDiff(ctx, &rq) + default: + return nil, rpc.ErrMethodNotSupported + } +} + +func (s *service) handleGetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, error) { + it, err := s.backend.GetDiff(ctx, &api.GetDiffRequest{ + StartRoot: request.StartRoot, + EndRoot: request.EndRoot, + }) + if err != nil { + return nil, err + } + + var rsp GetDiffResponse + for { + more, err := it.Next() + if err != nil { + return nil, err + } + if !more { + break + } + + chunk, err := it.Value() + if err != nil { + return nil, err + } + rsp.WriteLog = append(rsp.WriteLog, chunk) + } + return &rsp, nil +} + +// NewServer creates a new diff sync protocol server. +func NewServer(chainContext string, runtimeID common.Namespace, backend api.Backend) rpc.Server { + return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{backend}) +} From 492ad44d0fd9ae2bb591b1e84dda2950685f3087 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Thu, 24 Jul 2025 00:18:29 +0200 Subject: [PATCH 3/7] go/worker/storage: Integrate new p2p protocols Legacy storage sync protocol is still advertised and served to enable seamless rolling upgrades of the network. --- .changelog/5751.feature.md | 12 +++ go/oasis-node/cmd/debug/byzantine/node.go | 4 +- .../storage/committee/checkpoint_sync.go | 98 +++++++++++++++---- .../storage/committee/checkpoint_sync_test.go | 14 +-- go/worker/storage/committee/node.go | 44 +++++++-- .../p2p/{sync => synclegacy}/client.go | 2 +- .../p2p/{sync => synclegacy}/protocol.go | 8 +- .../p2p/{sync => synclegacy}/server.go | 2 +- 8 files changed, 145 insertions(+), 39 deletions(-) create mode 100644 .changelog/5751.feature.md rename go/worker/storage/p2p/{sync => synclegacy}/client.go (99%) rename go/worker/storage/p2p/{sync => synclegacy}/protocol.go (90%) rename go/worker/storage/p2p/{sync => synclegacy}/server.go (99%) diff --git a/.changelog/5751.feature.md b/.changelog/5751.feature.md new file mode 100644 index 00000000000..766413d7ce9 --- /dev/null +++ b/.changelog/5751.feature.md @@ -0,0 +1,12 @@ +go: Split storage sync p2p protocol + +Storage sync protocol was split into two independent protocols (checkpoint +and diff sync). + +This change was made since there may be fewer nodes that expose checkpoints +than storage diff. Previously, this could lead to issues with state sync +when a node was connected with peers that supported storage sync protocol +but had no checkpoints available. + +This was done in backwards compatible manner, so that both protocols are still +advertised and used. Eventually, we plan to remove legacy protocol. diff --git a/go/oasis-node/cmd/debug/byzantine/node.go b/go/oasis-node/cmd/debug/byzantine/node.go index 447def2d766..3920f96f11e 100644 --- a/go/oasis-node/cmd/debug/byzantine/node.go +++ b/go/oasis-node/cmd/debug/byzantine/node.go @@ -22,7 +22,7 @@ import ( scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api" storage "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/worker/client" - storageP2P "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" ) type byzantine struct { @@ -154,7 +154,7 @@ func initializeAndRegisterByzantineNode( if err != nil { return nil, fmt.Errorf("initializing storage node failed: %w", err) } - b.p2p.service.RegisterProtocolServer(storageP2P.NewServer(b.chainContext, b.runtimeID, storage)) + b.p2p.service.RegisterProtocolServer(synclegacy.NewServer(b.chainContext, b.runtimeID, storage)) b.storage = storage // Wait for activation epoch. diff --git a/go/worker/storage/committee/checkpoint_sync.go b/go/worker/storage/committee/checkpoint_sync.go index c9236f4cada..ad553272a90 100644 --- a/go/worker/storage/committee/checkpoint_sync.go +++ b/go/worker/storage/committee/checkpoint_sync.go @@ -11,9 +11,11 @@ import ( "sync" "time" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" - storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" ) const ( @@ -55,7 +57,7 @@ type chunk struct { *checkpoint.ChunkMetadata // checkpoint points to the checkpoint this chunk originated from. - checkpoint *storageSync.Checkpoint + checkpoint *checkpointsync.Checkpoint } type chunkHeap struct { @@ -101,12 +103,7 @@ func (n *Node) checkpointChunkFetcher( defer cancel() // Fetch chunk from peers. - rsp, pf, err := n.storageSync.GetCheckpointChunk(chunkCtx, &storageSync.GetCheckpointChunkRequest{ - Version: chunk.Version, - Root: chunk.Root, - Index: chunk.Index, - Digest: chunk.Digest, - }, chunk.checkpoint) + rsp, pf, err := n.fetchChunk(chunkCtx, chunk) if err != nil { n.logger.Error("failed to fetch chunk from peers", "err", err, @@ -117,7 +114,7 @@ func (n *Node) checkpointChunkFetcher( } // Restore fetched chunk. - done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp.Chunk)) + done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp)) cancel() switch { @@ -157,7 +154,47 @@ func (n *Node) checkpointChunkFetcher( } } -func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { +// fetchChunk fetches chunk using checkpoint sync p2p protocol client. +// +// In case of no peers or error, it fallbacks to the legacy storage sync protocol. +func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFeedback, error) { + rsp1, pf, err := n.checkpointSync.GetCheckpointChunk( + ctx, + &checkpointsync.GetCheckpointChunkRequest{ + Version: chunk.Version, + Root: chunk.Root, + Index: chunk.Index, + Digest: chunk.Digest, + }, + &checkpointsync.Checkpoint{ + Metadata: chunk.checkpoint.Metadata, + Peers: chunk.checkpoint.Peers, + }, + ) + if err == nil { // if NO error + return rsp1.Chunk, pf, nil + } + + rsp2, pf, err := n.legacyStorageSync.GetCheckpointChunk( + ctx, + &synclegacy.GetCheckpointChunkRequest{ + Version: chunk.Version, + Root: chunk.Root, + Index: chunk.Index, + Digest: chunk.Digest, + }, + &synclegacy.Checkpoint{ + Metadata: chunk.checkpoint.Metadata, + Peers: chunk.checkpoint.Peers, + }, + ) + if err != nil { + return nil, nil, err + } + return rsp2.Chunk, pf, nil +} + +func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check.Metadata); err != nil { // Any previous restores were already aborted by the driver up the call stack, so // things should have been going smoothly here; bail. @@ -276,13 +313,11 @@ func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelReques } } -func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) { +func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) { ctx, cancel := context.WithTimeout(n.ctx, cpListsTimeout) defer cancel() - list, err := n.storageSync.GetCheckpoints(ctx, &storageSync.GetCheckpointsRequest{ - Version: 1, - }) + list, err := n.fetchCheckpoints(ctx) if err != nil { n.logger.Error("failed to retrieve any checkpoints", "err", err, @@ -296,9 +331,36 @@ func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) { return list, nil } +// fetchCheckpoints fetches checkpoints using checkpoint sync p2p protocol client. +// +// In case of no peers, error or no checkpoints, it fallbacks to the legacy storage sync protocol. +func (n *Node) fetchCheckpoints(ctx context.Context) ([]*checkpointsync.Checkpoint, error) { + list1, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{ + Version: 1, + }) + if err == nil && len(list1) > 0 { // if NO error and at least one checkpoint + return list1, nil + } + + list2, err := n.legacyStorageSync.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{ + Version: 1, + }) + if err != nil { + return nil, err + } + var cps []*checkpointsync.Checkpoint + for _, cp := range list2 { + cps = append(cps, &checkpointsync.Checkpoint{ + Metadata: cp.Metadata, + Peers: cp.Peers, + }) + } + return cps, nil +} + // sortCheckpoints sorts the slice in-place (descending by version, peers, hash). -func sortCheckpoints(s []*storageSync.Checkpoint) { - slices.SortFunc(s, func(a, b *storageSync.Checkpoint) int { +func sortCheckpoints(s []*checkpointsync.Checkpoint) { + slices.SortFunc(s, func(a, b *checkpointsync.Checkpoint) int { return cmp.Or( cmp.Compare(b.Root.Version, a.Root.Version), cmp.Compare(len(b.Peers), len(a.Peers)), @@ -307,7 +369,7 @@ func sortCheckpoints(s []*storageSync.Checkpoint) { }) } -func (n *Node) checkCheckpointUsable(cp *storageSync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { +func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { namespace := n.commonNode.Runtime.ID() if !namespace.Equal(&cp.Root.Namespace) { // Not for the right runtime. @@ -357,7 +419,7 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc // If we only want the genesis checkpoint, filter it out. if wantOnlyGenesis && len(cps) > 0 { - var filteredCps []*storageSync.Checkpoint + var filteredCps []*checkpointsync.Checkpoint for _, cp := range cps { if cp.Root.Version == genesisRound { filteredCps = append(filteredCps, cp) diff --git a/go/worker/storage/committee/checkpoint_sync_test.go b/go/worker/storage/committee/checkpoint_sync_test.go index 2e0ff2c206d..d39e50f3239 100644 --- a/go/worker/storage/committee/checkpoint_sync_test.go +++ b/go/worker/storage/committee/checkpoint_sync_test.go @@ -8,11 +8,11 @@ import ( "github.com/oasisprotocol/oasis-core/go/p2p/rpc" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/node" - "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" ) func TestSortCheckpoints(t *testing.T) { - cp1 := &sync.Checkpoint{ + cp1 := &checkpointsync.Checkpoint{ Metadata: &checkpoint.Metadata{ Root: node.Root{ Version: 2, @@ -20,7 +20,7 @@ func TestSortCheckpoints(t *testing.T) { }, Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()}, } - cp2 := &sync.Checkpoint{ + cp2 := &checkpointsync.Checkpoint{ Metadata: &checkpoint.Metadata{ Root: node.Root{ Version: 2, @@ -28,7 +28,7 @@ func TestSortCheckpoints(t *testing.T) { }, Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()}, } - cp3 := &sync.Checkpoint{ + cp3 := &checkpointsync.Checkpoint{ Metadata: &checkpoint.Metadata{ Root: node.Root{ Version: 1, @@ -36,7 +36,7 @@ func TestSortCheckpoints(t *testing.T) { }, Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()}, } - cp4 := &sync.Checkpoint{ + cp4 := &checkpointsync.Checkpoint{ Metadata: &checkpoint.Metadata{ Root: node.Root{ Version: 1, @@ -45,9 +45,9 @@ func TestSortCheckpoints(t *testing.T) { Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()}, } - s := []*sync.Checkpoint{cp2, cp3, cp4, cp1} + s := []*checkpointsync.Checkpoint{cp2, cp3, cp4, cp1} sortCheckpoints(s) - assert.Equal(t, s, []*sync.Checkpoint{cp1, cp2, cp3, cp4}) + assert.Equal(t, s, []*checkpointsync.Checkpoint{cp1, cp2, cp3, cp4}) } diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index 9832cb98648..dfa8dbff18f 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -32,8 +32,10 @@ import ( "github.com/oasisprotocol/oasis-core/go/worker/common/committee" "github.com/oasisprotocol/oasis-core/go/worker/registration" "github.com/oasisprotocol/oasis-core/go/worker/storage/api" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync" storagePub "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/pub" - storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" ) var ( @@ -128,7 +130,9 @@ type Node struct { // nolint: maligned localStorage storageApi.LocalBackend - storageSync storageSync.Client + diffSync diffsync.Client + checkpointSync checkpointsync.Client + legacyStorageSync synclegacy.Client undefinedRound uint64 @@ -272,15 +276,21 @@ func NewNode( node: n, }) - // Register storage sync service. - commonNode.P2P.RegisterProtocolServer(storageSync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - n.storageSync = storageSync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - - // Register storage pub service if configured. + // Advertise and serve p2p protocols. + commonNode.P2P.RegisterProtocolServer(synclegacy.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + commonNode.P2P.RegisterProtocolServer(diffsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + if checkInterval != checkpoint.CheckIntervalDisabled { + commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + } if rpcRoleProvider != nil { commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) } + // Create p2p protocol clients. + n.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + return n, nil } @@ -430,13 +440,29 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { ctx, cancel := context.WithCancel(n.ctx) defer cancel() - rsp, pf, err := n.storageSync.GetDiff(ctx, &storageSync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) + wl, pf, err := n.getDiff(ctx, prevRoot, thisRoot) if err != nil { result.err = err return } result.pf = pf - result.writeLog = rsp.WriteLog + result.writeLog = wl +} + +// getDiff fetches writelog using diff sync p2p protocol client. +// +// In case of no peers or error, it fallbacks to the legacy storage sync protocol. +func (n *Node) getDiff(ctx context.Context, prevRoot, thisRoot storageApi.Root) (storageApi.WriteLog, rpc.PeerFeedback, error) { + rsp1, pf, err := n.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) + if err == nil { // if NO error + return rsp1.WriteLog, pf, nil + } + + rsp2, pf, err := n.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) + if err != nil { + return nil, nil, err + } + return rsp2.WriteLog, pf, nil } func (n *Node) finalize(summary *blockSummary) { diff --git a/go/worker/storage/p2p/sync/client.go b/go/worker/storage/p2p/synclegacy/client.go similarity index 99% rename from go/worker/storage/p2p/sync/client.go rename to go/worker/storage/p2p/synclegacy/client.go index fdb8ed7379d..2058b2dae53 100644 --- a/go/worker/storage/p2p/sync/client.go +++ b/go/worker/storage/p2p/synclegacy/client.go @@ -1,4 +1,4 @@ -package sync +package synclegacy import ( "context" diff --git a/go/worker/storage/p2p/sync/protocol.go b/go/worker/storage/p2p/synclegacy/protocol.go similarity index 90% rename from go/worker/storage/p2p/sync/protocol.go rename to go/worker/storage/p2p/synclegacy/protocol.go index 7dca895e014..e32d764bca6 100644 --- a/go/worker/storage/p2p/sync/protocol.go +++ b/go/worker/storage/p2p/synclegacy/protocol.go @@ -1,4 +1,10 @@ -package sync +// Package synclegacy defines wire protocol together with client/server +// implementations for the legacy storage sync protocol, used for runtime block sync. +// +// The protocol was split into storage diff and checkpoints protocol. +// +// TODO: Remove it: https://github.com/oasisprotocol/oasis-core/issues/6261 +package synclegacy import ( "time" diff --git a/go/worker/storage/p2p/sync/server.go b/go/worker/storage/p2p/synclegacy/server.go similarity index 99% rename from go/worker/storage/p2p/sync/server.go rename to go/worker/storage/p2p/synclegacy/server.go index 22b6731465a..a09c342bcaf 100644 --- a/go/worker/storage/p2p/sync/server.go +++ b/go/worker/storage/p2p/synclegacy/server.go @@ -1,4 +1,4 @@ -package sync +package synclegacy import ( "bytes" From 6917257ee0bc5a4bc51b295b39647086a96672d8 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Thu, 24 Jul 2025 00:22:40 +0200 Subject: [PATCH 4/7] go/worker: Use ProtocolID helper to avoid duplication --- go/worker/common/p2p/txsync/client.go | 3 +-- go/worker/common/p2p/txsync/protocol.go | 2 +- go/worker/keymanager/p2p/client.go | 3 +-- go/worker/keymanager/p2p/protocol.go | 2 +- go/worker/storage/p2p/checkpointsync/client.go | 3 +-- go/worker/storage/p2p/checkpointsync/protocol.go | 2 +- go/worker/storage/p2p/diffsync/client.go | 3 +-- go/worker/storage/p2p/diffsync/protocol.go | 2 +- go/worker/storage/p2p/pub/client.go | 3 +-- go/worker/storage/p2p/pub/protocol.go | 2 +- go/worker/storage/p2p/synclegacy/client.go | 3 +-- go/worker/storage/p2p/synclegacy/protocol.go | 2 +- 12 files changed, 12 insertions(+), 18 deletions(-) diff --git a/go/worker/common/p2p/txsync/client.go b/go/worker/common/p2p/txsync/client.go index 54d69f6242c..f293986b123 100644 --- a/go/worker/common/p2p/txsync/client.go +++ b/go/worker/common/p2p/txsync/client.go @@ -5,7 +5,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" ) @@ -82,7 +81,7 @@ func (c *client) GetTxs(ctx context.Context, request *GetTxsRequest) (*GetTxsRes // NewClient creates a new transaction sync protocol client. func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { - pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, TxSyncProtocolID, TxSyncProtocolVersion) + pid := ProtocolID(chainContext, runtimeID) mgr := rpc.NewPeerManager(p2p, pid) rc := rpc.NewClient(p2p.Host(), pid) rc.RegisterListener(mgr) diff --git a/go/worker/common/p2p/txsync/protocol.go b/go/worker/common/p2p/txsync/protocol.go index e5aef20bdd7..5d1e05f9676 100644 --- a/go/worker/common/p2p/txsync/protocol.go +++ b/go/worker/common/p2p/txsync/protocol.go @@ -47,7 +47,7 @@ func init() { protocols := make([]core.ProtocolID, len(n.Runtimes)) for i, rt := range n.Runtimes { - protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, TxSyncProtocolID, TxSyncProtocolVersion) + protocols[i] = ProtocolID(chainContext, rt.ID) } return protocols diff --git a/go/worker/keymanager/p2p/client.go b/go/worker/keymanager/p2p/client.go index b7972d1168e..0eb0a3194ca 100644 --- a/go/worker/keymanager/p2p/client.go +++ b/go/worker/keymanager/p2p/client.go @@ -7,7 +7,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" p2p "github.com/oasisprotocol/oasis-core/go/p2p/api" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" ) @@ -46,7 +45,7 @@ func (c *client) CallEnclave(ctx context.Context, request *CallEnclaveRequest, p // NewClient creates a new keymanager protocol client. func NewClient(p2p p2p.Service, chainContext string, keymanagerID common.Namespace) Client { - pid := protocol.NewRuntimeProtocolID(chainContext, keymanagerID, KeyManagerProtocolID, KeyManagerProtocolVersion) + pid := ProtocolID(chainContext, keymanagerID) mgr := rpc.NewPeerManager(p2p, pid) rc := rpc.NewClient(p2p.Host(), pid) rc.RegisterListener(mgr) diff --git a/go/worker/keymanager/p2p/protocol.go b/go/worker/keymanager/p2p/protocol.go index f7dfda7f407..21d7dd383f0 100644 --- a/go/worker/keymanager/p2p/protocol.go +++ b/go/worker/keymanager/p2p/protocol.go @@ -50,7 +50,7 @@ func init() { protocols := make([]core.ProtocolID, len(n.Runtimes)) for i, rt := range n.Runtimes { - protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, KeyManagerProtocolID, KeyManagerProtocolVersion) + protocols[i] = ProtocolID(chainContext, rt.ID) } return protocols diff --git a/go/worker/storage/p2p/checkpointsync/client.go b/go/worker/storage/p2p/checkpointsync/client.go index 64edb9ecc0a..e364a8a54d7 100644 --- a/go/worker/storage/p2p/checkpointsync/client.go +++ b/go/worker/storage/p2p/checkpointsync/client.go @@ -7,7 +7,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" ) @@ -111,7 +110,7 @@ func (c *client) GetCheckpointChunk( // // Moreover, it ensures underlying p2p service starts tracking protocol peers. func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { - pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion) + pid := ProtocolID(chainContext, runtimeID) rc := rpc.NewClient(p2p.Host(), pid) mgr := rpc.NewPeerManager(p2p, pid) rc.RegisterListener(mgr) diff --git a/go/worker/storage/p2p/checkpointsync/protocol.go b/go/worker/storage/p2p/checkpointsync/protocol.go index 140f5e3d618..2b07d38e917 100644 --- a/go/worker/storage/p2p/checkpointsync/protocol.go +++ b/go/worker/storage/p2p/checkpointsync/protocol.go @@ -71,7 +71,7 @@ func init() { protocols := make([]core.ProtocolID, len(n.Runtimes)) for i, rt := range n.Runtimes { - protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion) + protocols[i] = ProtocolID(chainContext, rt.ID) } return protocols diff --git a/go/worker/storage/p2p/diffsync/client.go b/go/worker/storage/p2p/diffsync/client.go index 6fcfc6d111b..4c0363fe21d 100644 --- a/go/worker/storage/p2p/diffsync/client.go +++ b/go/worker/storage/p2p/diffsync/client.go @@ -4,7 +4,6 @@ import ( "context" "github.com/oasisprotocol/oasis-core/go/common" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" ) @@ -44,7 +43,7 @@ func (c *client) GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiff // // Moreover, it ensures underlying p2p service starts tracking protocol peers. func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { - pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, DiffSyncProtocolID, DiffSyncProtocolVersion) + pid := ProtocolID(chainContext, runtimeID) rc := rpc.NewClient(p2p.Host(), pid) mgr := rpc.NewPeerManager(p2p, pid) rc.RegisterListener(mgr) diff --git a/go/worker/storage/p2p/diffsync/protocol.go b/go/worker/storage/p2p/diffsync/protocol.go index c53304d5666..e2949504039 100644 --- a/go/worker/storage/p2p/diffsync/protocol.go +++ b/go/worker/storage/p2p/diffsync/protocol.go @@ -52,7 +52,7 @@ func init() { protocols := make([]core.ProtocolID, len(n.Runtimes)) for i, rt := range n.Runtimes { - protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, DiffSyncProtocolID, DiffSyncProtocolVersion) + protocols[i] = ProtocolID(chainContext, rt.ID) } return protocols diff --git a/go/worker/storage/p2p/pub/client.go b/go/worker/storage/p2p/pub/client.go index 4b75b8f5d32..1590df41cb2 100644 --- a/go/worker/storage/p2p/pub/client.go +++ b/go/worker/storage/p2p/pub/client.go @@ -4,7 +4,6 @@ import ( "context" "github.com/oasisprotocol/oasis-core/go/common" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" ) @@ -64,7 +63,7 @@ func (c *client) Iterate(ctx context.Context, request *IterateRequest) (*ProofRe // NewClient creates a new storage pub protocol client. func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { - pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, StoragePubProtocolID, StoragePubProtocolVersion) + pid := ProtocolID(chainContext, runtimeID) mgr := rpc.NewPeerManager(p2p, pid) rc := rpc.NewClient(p2p.Host(), pid) rc.RegisterListener(mgr) diff --git a/go/worker/storage/p2p/pub/protocol.go b/go/worker/storage/p2p/pub/protocol.go index e3641b77b78..48ac31eff0e 100644 --- a/go/worker/storage/p2p/pub/protocol.go +++ b/go/worker/storage/p2p/pub/protocol.go @@ -58,7 +58,7 @@ func init() { protocols := make([]core.ProtocolID, len(n.Runtimes)) for i, rt := range n.Runtimes { - protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, StoragePubProtocolID, StoragePubProtocolVersion) + protocols[i] = ProtocolID(chainContext, rt.ID) } return protocols diff --git a/go/worker/storage/p2p/synclegacy/client.go b/go/worker/storage/p2p/synclegacy/client.go index 2058b2dae53..702fea0fdad 100644 --- a/go/worker/storage/p2p/synclegacy/client.go +++ b/go/worker/storage/p2p/synclegacy/client.go @@ -7,7 +7,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" ) @@ -129,7 +128,7 @@ func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Cli // Use two separate clients and managers for the same protocol. This is to make sure that peers // are scored differently between the two use cases (syncing diffs vs. syncing checkpoints). We // could consider separating this into two protocols in the future. - pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, StorageSyncProtocolID, StorageSyncProtocolVersion) + pid := ProtocolID(chainContext, runtimeID) rcC := rpc.NewClient(p2p.Host(), pid) mgrC := rpc.NewPeerManager(p2p, pid) diff --git a/go/worker/storage/p2p/synclegacy/protocol.go b/go/worker/storage/p2p/synclegacy/protocol.go index e32d764bca6..e97a05efa09 100644 --- a/go/worker/storage/p2p/synclegacy/protocol.go +++ b/go/worker/storage/p2p/synclegacy/protocol.go @@ -92,7 +92,7 @@ func init() { protocols := make([]core.ProtocolID, len(n.Runtimes)) for i, rt := range n.Runtimes { - protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, StorageSyncProtocolID, StorageSyncProtocolVersion) + protocols[i] = ProtocolID(chainContext, rt.ID) } return protocols From 6b30fe1a33f4511ee6420db9c0f37ae3db37f56a Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Wed, 30 Jul 2025 21:56:54 +0200 Subject: [PATCH 5/7] go/worker/storage/committee: Factor out checkpointer creation --- go/worker/storage/committee/node.go | 64 +++++++++++++++-------------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index dfa8dbff18f..e141dd77483 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -212,8 +212,38 @@ func NewNode( n.ctx, n.ctxCancel = context.WithCancel(context.Background()) - // Create a new checkpointer. Always create a checkpointer, even if checkpointing is disabled - // in configuration so we can ensure that the genesis checkpoint is available. + // Create a checkpointer (even if checkpointing is disabled) to ensure the genesis checkpoint is available. + checkpointer, err := n.createCheckpointer(commonNode, localStorage) + if err != nil { + return nil, fmt.Errorf("failed to create checkpointer: %w", err) + } + n.checkpointer = checkpointer + + // Register prune handler. + commonNode.Runtime.History().Pruner().RegisterHandler(&pruneHandler{ + logger: n.logger, + node: n, + }) + + // Advertise and serve p2p protocols. + commonNode.P2P.RegisterProtocolServer(synclegacy.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + commonNode.P2P.RegisterProtocolServer(diffsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + if config.GlobalConfig.Storage.Checkpointer.Enabled { + commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + } + if rpcRoleProvider != nil { + commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + } + + // Create p2p protocol clients. + n.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + + return n, nil +} + +func (n *Node) createCheckpointer(commonNode *committee.Node, localStorage storageApi.LocalBackend) (checkpoint.Checkpointer, error) { checkInterval := checkpoint.CheckIntervalDisabled if config.GlobalConfig.Storage.Checkpointer.Enabled { checkInterval = config.GlobalConfig.Storage.Checkpointer.CheckInterval @@ -259,39 +289,13 @@ func NewNode( return blk.Header.StorageRoots(), nil }, } - var err error - n.checkpointer, err = checkpoint.NewCheckpointer( + + return checkpoint.NewCheckpointer( n.ctx, localStorage.NodeDB(), localStorage.Checkpointer(), checkpointerCfg, ) - if err != nil { - return nil, fmt.Errorf("failed to create checkpointer: %w", err) - } - - // Register prune handler. - commonNode.Runtime.History().Pruner().RegisterHandler(&pruneHandler{ - logger: n.logger, - node: n, - }) - - // Advertise and serve p2p protocols. - commonNode.P2P.RegisterProtocolServer(synclegacy.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - commonNode.P2P.RegisterProtocolServer(diffsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - if checkInterval != checkpoint.CheckIntervalDisabled { - commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - } - if rpcRoleProvider != nil { - commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - } - - // Create p2p protocol clients. - n.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - - return n, nil } // Service interface. From e7cd2ac4b14f18fc6a32519d3b5cf42763a62072 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Wed, 30 Jul 2025 21:43:04 +0200 Subject: [PATCH 6/7] go/e2e: Add checkpoints sync interoperability test --- go/oasis-test-runner/oasis/compute.go | 4 + go/oasis-test-runner/oasis/fixture.go | 14 ++- .../scenario/e2e/runtime/checkpoint_sync.go | 118 ++++++++++++++++++ .../scenario/e2e/runtime/scenario.go | 1 + go/worker/storage/config/config.go | 13 +- 5 files changed, 139 insertions(+), 11 deletions(-) create mode 100644 go/oasis-test-runner/scenario/e2e/runtime/checkpoint_sync.go diff --git a/go/oasis-test-runner/oasis/compute.go b/go/oasis-test-runner/oasis/compute.go index 2bebd20a12a..b6d62043a35 100644 --- a/go/oasis-test-runner/oasis/compute.go +++ b/go/oasis-test-runner/oasis/compute.go @@ -46,6 +46,7 @@ type Compute struct { // nolint: maligned storageBackend string disablePublicRPC bool checkpointSyncDisabled bool + legacySyncServerDisabled bool checkpointCheckInterval time.Duration checkpointParallelChunker bool } @@ -64,6 +65,7 @@ type ComputeCfg struct { StorageBackend string DisablePublicRPC bool CheckpointSyncDisabled bool + LegacySyncServerDisabled bool CheckpointCheckInterval time.Duration CheckpointParallelChunker bool } @@ -170,6 +172,7 @@ func (worker *Compute) ModifyConfig() error { worker.Config.Storage.Backend = worker.storageBackend worker.Config.Storage.PublicRPCEnabled = !worker.disablePublicRPC worker.Config.Storage.CheckpointSyncDisabled = worker.checkpointSyncDisabled + worker.Config.Storage.LegacySyncServerDisabled = worker.legacySyncServerDisabled worker.Config.Storage.Checkpointer.Enabled = true worker.Config.Storage.Checkpointer.CheckInterval = worker.checkpointCheckInterval worker.Config.Storage.Checkpointer.ParallelChunker = worker.checkpointParallelChunker @@ -236,6 +239,7 @@ func (net *Network) NewCompute(cfg *ComputeCfg) (*Compute, error) { sentryIndices: cfg.SentryIndices, disablePublicRPC: cfg.DisablePublicRPC, checkpointSyncDisabled: cfg.CheckpointSyncDisabled, + legacySyncServerDisabled: cfg.LegacySyncServerDisabled, checkpointCheckInterval: cfg.CheckpointCheckInterval, checkpointParallelChunker: cfg.CheckpointParallelChunker, sentryPubKey: sentryPubKey, diff --git a/go/oasis-test-runner/oasis/fixture.go b/go/oasis-test-runner/oasis/fixture.go index fd358f787cb..597f2a948dd 100644 --- a/go/oasis-test-runner/oasis/fixture.go +++ b/go/oasis-test-runner/oasis/fixture.go @@ -411,6 +411,7 @@ type ComputeWorkerFixture struct { CheckpointCheckInterval time.Duration `json:"checkpoint_check_interval,omitempty"` CheckpointSyncEnabled bool `json:"checkpoint_sync_enabled,omitempty"` + LegacySyncServerDisabled bool `json:"legacy_sync_server_disabled,omitempty"` CheckpointParallelChunker bool `json:"checkpoint_parallel_chunker,omitempty"` // Runtimes contains the indexes of the runtimes to enable. @@ -449,12 +450,13 @@ func (f *ComputeWorkerFixture) Create(net *Network) (*Compute, error) { CheckpointParallelChunker: f.CheckpointParallelChunker, // The checkpoint syncing flag is intentionally flipped here. // Syncing should normally be enabled, but normally disabled in tests. - CheckpointSyncDisabled: !f.CheckpointSyncEnabled, - DisablePublicRPC: f.DisablePublicRPC, - Runtimes: f.Runtimes, - RuntimeConfig: f.RuntimeConfig, - RuntimeProvisioner: f.RuntimeProvisioner, - RuntimeStatePaths: f.RuntimeStatePaths, + CheckpointSyncDisabled: !f.CheckpointSyncEnabled, + LegacySyncServerDisabled: f.LegacySyncServerDisabled, + DisablePublicRPC: f.DisablePublicRPC, + Runtimes: f.Runtimes, + RuntimeConfig: f.RuntimeConfig, + RuntimeProvisioner: f.RuntimeProvisioner, + RuntimeStatePaths: f.RuntimeStatePaths, }) } diff --git a/go/oasis-test-runner/scenario/e2e/runtime/checkpoint_sync.go b/go/oasis-test-runner/scenario/e2e/runtime/checkpoint_sync.go new file mode 100644 index 00000000000..b33ad257b14 --- /dev/null +++ b/go/oasis-test-runner/scenario/e2e/runtime/checkpoint_sync.go @@ -0,0 +1,118 @@ +package runtime + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env" + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/log" + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis" + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" +) + +// CheckpointSync tests interoperability of the new checkpoint and diff sync +// p2p protocols with the legacy storage sync p2p protocol. +// +// The test checks that hosts that serve both protocols are compatible +// with clients that fallback to both. +// +// To simulate legacy host comment out fallback to the new protocols +// inside storage committee worker and disable registration of new checkpoint +// and diff sync protocols. This is not tested automatically as it would +// further pollute existing code and require additional config flags. +var CheckpointSync scenario.Scenario = newCheckpointSyncImpl() + +type checkpointSync struct { + Scenario +} + +func newCheckpointSyncImpl() scenario.Scenario { + return &checkpointSync{ + Scenario: *NewScenario( + "checkpoint-sync", + NewTestClient().WithScenario(SimpleScenario), + ), + } +} + +func (sc *checkpointSync) Clone() scenario.Scenario { + return &checkpointSync{ + Scenario: *sc.Scenario.Clone().(*Scenario), + } +} + +func (sc *checkpointSync) Fixture() (*oasis.NetworkFixture, error) { + f, err := sc.Scenario.Fixture() + if err != nil { + return nil, err + } + + // Make the first compute worker check for checkpoints more often. + f.ComputeWorkers[0].CheckpointCheckInterval = time.Second + // Configure runtime for storage checkpointing. + f.Runtimes[1].Storage.CheckpointInterval = 10 + f.Runtimes[1].Storage.CheckpointNumKept = 10 + f.Runtimes[1].Storage.CheckpointChunkSize = 1024 + // Serve both legacy and new protocols. + for i := range f.ComputeWorkers { + f.ComputeWorkers[i].LegacySyncServerDisabled = false + } + f.ComputeWorkers = append(f.ComputeWorkers, oasis.ComputeWorkerFixture{ + NodeFixture: oasis.NodeFixture{ + NoAutoStart: true, + }, + Entity: 1, + Runtimes: []int{1}, + CheckpointSyncEnabled: true, + LogWatcherHandlerFactories: []log.WatcherHandlerFactory{ + oasis.LogAssertCheckpointSync(), + }, + }) + + return f, nil +} + +func (sc *checkpointSync) Run(ctx context.Context, _ *env.Env) error { + if err := sc.Net.Start(); err != nil { + return err + } + + if err := sc.WaitForClientSync(ctx); err != nil { + return fmt.Errorf("failed to wait for client sync: %w", err) + } + + // Generate some more rounds to trigger checkpointing. + for i := 0; i < 15; i++ { + sc.Logger.Info("submitting transaction to runtime", "seq", i) + if _, err := sc.submitKeyValueRuntimeInsertTx(ctx, KeyValueRuntimeID, uint64(i), "checkpoint", strconv.Itoa(i), 0, 0, plaintextTxKind); err != nil { + return err + } + } + + // Make sure that the first compute node created checkpoints. + ctrl, err := oasis.NewController(sc.Net.ComputeWorkers()[0].SocketPath()) + if err != nil { + return fmt.Errorf("failed to connect with the first compute node: %w", err) + } + if _, err = ctrl.Storage.GetCheckpoints(ctx, &checkpoint.GetCheckpointsRequest{Version: 1, Namespace: KeyValueRuntimeID}); err != nil { + return fmt.Errorf("failed to get checkpoints: %w", err) + } + + // Start late compute worker and check if it syncs with a checkpoint. + sc.Logger.Info("running late compute worker") + lateWorker := sc.Net.ComputeWorkers()[len(sc.Net.ComputeWorkers())-1] + if err = lateWorker.Start(); err != nil { + return fmt.Errorf("failed to start late compute worker: %w", err) + } + if err = lateWorker.WaitReady(ctx); err != nil { + return fmt.Errorf("failed to wait for late compute worker to become ready: %w", err) + } + + // Wait a bit to give the logger in the node time to sync to disk. + <-time.After(1 * time.Second) + + return sc.Net.CheckLogWatchers() +} diff --git a/go/oasis-test-runner/scenario/e2e/runtime/scenario.go b/go/oasis-test-runner/scenario/e2e/runtime/scenario.go index 121bee89eb7..c788fcc1650 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/scenario.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/scenario.go @@ -342,6 +342,7 @@ func RegisterScenarios() error { StorageSyncFromRegistered, StorageSyncInconsistent, StorageEarlyStateSync, + CheckpointSync, // Sentry test. Sentry, // Keymanager tests. diff --git a/go/worker/storage/config/config.go b/go/worker/storage/config/config.go index d4f2c54aed9..77846089d04 100644 --- a/go/worker/storage/config/config.go +++ b/go/worker/storage/config/config.go @@ -20,6 +20,8 @@ type Config struct { PublicRPCEnabled bool `yaml:"public_rpc_enabled,omitempty"` // Disable initial storage sync from checkpoints. CheckpointSyncDisabled bool `yaml:"checkpoint_sync_disabled,omitempty"` + // Disable serving legacy storage sync p2p protocol. + LegacySyncServerDisabled bool `yaml:"legacy_sync_server_disabled,omitempty"` // Storage checkpointer configuration. Checkpointer CheckpointerConfig `yaml:"checkpointer,omitempty"` @@ -47,11 +49,12 @@ func (c *Config) Validate() error { // DefaultConfig returns the default configuration settings. func DefaultConfig() Config { return Config{ - Backend: "auto", - MaxCacheSize: "64mb", - FetcherCount: 4, - PublicRPCEnabled: false, - CheckpointSyncDisabled: false, + Backend: "auto", + MaxCacheSize: "64mb", + FetcherCount: 4, + PublicRPCEnabled: false, + CheckpointSyncDisabled: false, + LegacySyncServerDisabled: false, Checkpointer: CheckpointerConfig{ Enabled: false, CheckInterval: 1 * time.Minute, From bbce19cec177943079de0e98ca203cd66c6f0cb2 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Wed, 30 Jul 2025 22:14:18 +0200 Subject: [PATCH 7/7] go/worker/storage: Remove legacy storagy sync p2p protocol --- .changelog/6261.internal.md | 5 + go/oasis-node/cmd/debug/byzantine/node.go | 4 +- .../scenario/e2e/runtime/checkpoint_sync.go | 118 -------------- .../scenario/e2e/runtime/scenario.go | 1 - .../storage/committee/checkpoint_sync.go | 89 ++--------- go/worker/storage/committee/node.go | 28 +--- go/worker/storage/p2p/synclegacy/client.go | 149 ------------------ go/worker/storage/p2p/synclegacy/protocol.go | 101 ------------ go/worker/storage/p2p/synclegacy/server.go | 108 ------------- 9 files changed, 28 insertions(+), 575 deletions(-) create mode 100644 .changelog/6261.internal.md delete mode 100644 go/oasis-test-runner/scenario/e2e/runtime/checkpoint_sync.go delete mode 100644 go/worker/storage/p2p/synclegacy/client.go delete mode 100644 go/worker/storage/p2p/synclegacy/protocol.go delete mode 100644 go/worker/storage/p2p/synclegacy/server.go diff --git a/.changelog/6261.internal.md b/.changelog/6261.internal.md new file mode 100644 index 00000000000..3991e71fe89 --- /dev/null +++ b/.changelog/6261.internal.md @@ -0,0 +1,5 @@ +go/worker/storage: Remove legacy storage sync p2p protocol + +The node will now only server new checkpoint and diff sync +p2p protocols. This is backward compatible since legacy +clients still serve both legacy and new protocols. diff --git a/go/oasis-node/cmd/debug/byzantine/node.go b/go/oasis-node/cmd/debug/byzantine/node.go index 3920f96f11e..416f6d4acb8 100644 --- a/go/oasis-node/cmd/debug/byzantine/node.go +++ b/go/oasis-node/cmd/debug/byzantine/node.go @@ -22,7 +22,7 @@ import ( scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api" storage "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/worker/client" - "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" ) type byzantine struct { @@ -154,7 +154,7 @@ func initializeAndRegisterByzantineNode( if err != nil { return nil, fmt.Errorf("initializing storage node failed: %w", err) } - b.p2p.service.RegisterProtocolServer(synclegacy.NewServer(b.chainContext, b.runtimeID, storage)) + b.p2p.service.RegisterProtocolServer(checkpointsync.NewServer(b.chainContext, b.runtimeID, storage)) b.storage = storage // Wait for activation epoch. diff --git a/go/oasis-test-runner/scenario/e2e/runtime/checkpoint_sync.go b/go/oasis-test-runner/scenario/e2e/runtime/checkpoint_sync.go deleted file mode 100644 index b33ad257b14..00000000000 --- a/go/oasis-test-runner/scenario/e2e/runtime/checkpoint_sync.go +++ /dev/null @@ -1,118 +0,0 @@ -package runtime - -import ( - "context" - "fmt" - "strconv" - "time" - - "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env" - "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/log" - "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis" - "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario" - "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" -) - -// CheckpointSync tests interoperability of the new checkpoint and diff sync -// p2p protocols with the legacy storage sync p2p protocol. -// -// The test checks that hosts that serve both protocols are compatible -// with clients that fallback to both. -// -// To simulate legacy host comment out fallback to the new protocols -// inside storage committee worker and disable registration of new checkpoint -// and diff sync protocols. This is not tested automatically as it would -// further pollute existing code and require additional config flags. -var CheckpointSync scenario.Scenario = newCheckpointSyncImpl() - -type checkpointSync struct { - Scenario -} - -func newCheckpointSyncImpl() scenario.Scenario { - return &checkpointSync{ - Scenario: *NewScenario( - "checkpoint-sync", - NewTestClient().WithScenario(SimpleScenario), - ), - } -} - -func (sc *checkpointSync) Clone() scenario.Scenario { - return &checkpointSync{ - Scenario: *sc.Scenario.Clone().(*Scenario), - } -} - -func (sc *checkpointSync) Fixture() (*oasis.NetworkFixture, error) { - f, err := sc.Scenario.Fixture() - if err != nil { - return nil, err - } - - // Make the first compute worker check for checkpoints more often. - f.ComputeWorkers[0].CheckpointCheckInterval = time.Second - // Configure runtime for storage checkpointing. - f.Runtimes[1].Storage.CheckpointInterval = 10 - f.Runtimes[1].Storage.CheckpointNumKept = 10 - f.Runtimes[1].Storage.CheckpointChunkSize = 1024 - // Serve both legacy and new protocols. - for i := range f.ComputeWorkers { - f.ComputeWorkers[i].LegacySyncServerDisabled = false - } - f.ComputeWorkers = append(f.ComputeWorkers, oasis.ComputeWorkerFixture{ - NodeFixture: oasis.NodeFixture{ - NoAutoStart: true, - }, - Entity: 1, - Runtimes: []int{1}, - CheckpointSyncEnabled: true, - LogWatcherHandlerFactories: []log.WatcherHandlerFactory{ - oasis.LogAssertCheckpointSync(), - }, - }) - - return f, nil -} - -func (sc *checkpointSync) Run(ctx context.Context, _ *env.Env) error { - if err := sc.Net.Start(); err != nil { - return err - } - - if err := sc.WaitForClientSync(ctx); err != nil { - return fmt.Errorf("failed to wait for client sync: %w", err) - } - - // Generate some more rounds to trigger checkpointing. - for i := 0; i < 15; i++ { - sc.Logger.Info("submitting transaction to runtime", "seq", i) - if _, err := sc.submitKeyValueRuntimeInsertTx(ctx, KeyValueRuntimeID, uint64(i), "checkpoint", strconv.Itoa(i), 0, 0, plaintextTxKind); err != nil { - return err - } - } - - // Make sure that the first compute node created checkpoints. - ctrl, err := oasis.NewController(sc.Net.ComputeWorkers()[0].SocketPath()) - if err != nil { - return fmt.Errorf("failed to connect with the first compute node: %w", err) - } - if _, err = ctrl.Storage.GetCheckpoints(ctx, &checkpoint.GetCheckpointsRequest{Version: 1, Namespace: KeyValueRuntimeID}); err != nil { - return fmt.Errorf("failed to get checkpoints: %w", err) - } - - // Start late compute worker and check if it syncs with a checkpoint. - sc.Logger.Info("running late compute worker") - lateWorker := sc.Net.ComputeWorkers()[len(sc.Net.ComputeWorkers())-1] - if err = lateWorker.Start(); err != nil { - return fmt.Errorf("failed to start late compute worker: %w", err) - } - if err = lateWorker.WaitReady(ctx); err != nil { - return fmt.Errorf("failed to wait for late compute worker to become ready: %w", err) - } - - // Wait a bit to give the logger in the node time to sync to disk. - <-time.After(1 * time.Second) - - return sc.Net.CheckLogWatchers() -} diff --git a/go/oasis-test-runner/scenario/e2e/runtime/scenario.go b/go/oasis-test-runner/scenario/e2e/runtime/scenario.go index c788fcc1650..121bee89eb7 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/scenario.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/scenario.go @@ -342,7 +342,6 @@ func RegisterScenarios() error { StorageSyncFromRegistered, StorageSyncInconsistent, StorageEarlyStateSync, - CheckpointSync, // Sentry test. Sentry, // Keymanager tests. diff --git a/go/worker/storage/committee/checkpoint_sync.go b/go/worker/storage/committee/checkpoint_sync.go index ad553272a90..ac2ccaa306b 100644 --- a/go/worker/storage/committee/checkpoint_sync.go +++ b/go/worker/storage/committee/checkpoint_sync.go @@ -11,11 +11,9 @@ import ( "sync" "time" - "github.com/oasisprotocol/oasis-core/go/p2p/rpc" storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" - "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" ) const ( @@ -103,7 +101,19 @@ func (n *Node) checkpointChunkFetcher( defer cancel() // Fetch chunk from peers. - rsp, pf, err := n.fetchChunk(chunkCtx, chunk) + rsp, pf, err := n.checkpointSync.GetCheckpointChunk( + chunkCtx, + &checkpointsync.GetCheckpointChunkRequest{ + Version: chunk.Version, + Root: chunk.Root, + Index: chunk.Index, + Digest: chunk.Digest, + }, + &checkpointsync.Checkpoint{ + Metadata: chunk.checkpoint.Metadata, + Peers: chunk.checkpoint.Peers, + }, + ) if err != nil { n.logger.Error("failed to fetch chunk from peers", "err", err, @@ -114,7 +124,7 @@ func (n *Node) checkpointChunkFetcher( } // Restore fetched chunk. - done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp)) + done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp.Chunk)) cancel() switch { @@ -154,46 +164,6 @@ func (n *Node) checkpointChunkFetcher( } } -// fetchChunk fetches chunk using checkpoint sync p2p protocol client. -// -// In case of no peers or error, it fallbacks to the legacy storage sync protocol. -func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFeedback, error) { - rsp1, pf, err := n.checkpointSync.GetCheckpointChunk( - ctx, - &checkpointsync.GetCheckpointChunkRequest{ - Version: chunk.Version, - Root: chunk.Root, - Index: chunk.Index, - Digest: chunk.Digest, - }, - &checkpointsync.Checkpoint{ - Metadata: chunk.checkpoint.Metadata, - Peers: chunk.checkpoint.Peers, - }, - ) - if err == nil { // if NO error - return rsp1.Chunk, pf, nil - } - - rsp2, pf, err := n.legacyStorageSync.GetCheckpointChunk( - ctx, - &synclegacy.GetCheckpointChunkRequest{ - Version: chunk.Version, - Root: chunk.Root, - Index: chunk.Index, - Digest: chunk.Digest, - }, - &synclegacy.Checkpoint{ - Metadata: chunk.checkpoint.Metadata, - Peers: chunk.checkpoint.Peers, - }, - ) - if err != nil { - return nil, nil, err - } - return rsp2.Chunk, pf, nil -} - func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check.Metadata); err != nil { // Any previous restores were already aborted by the driver up the call stack, so @@ -317,7 +287,9 @@ func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) { ctx, cancel := context.WithTimeout(n.ctx, cpListsTimeout) defer cancel() - list, err := n.fetchCheckpoints(ctx) + list, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{ + Version: 1, + }) if err != nil { n.logger.Error("failed to retrieve any checkpoints", "err", err, @@ -331,33 +303,6 @@ func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) { return list, nil } -// fetchCheckpoints fetches checkpoints using checkpoint sync p2p protocol client. -// -// In case of no peers, error or no checkpoints, it fallbacks to the legacy storage sync protocol. -func (n *Node) fetchCheckpoints(ctx context.Context) ([]*checkpointsync.Checkpoint, error) { - list1, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{ - Version: 1, - }) - if err == nil && len(list1) > 0 { // if NO error and at least one checkpoint - return list1, nil - } - - list2, err := n.legacyStorageSync.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{ - Version: 1, - }) - if err != nil { - return nil, err - } - var cps []*checkpointsync.Checkpoint - for _, cp := range list2 { - cps = append(cps, &checkpointsync.Checkpoint{ - Metadata: cp.Metadata, - Peers: cp.Peers, - }) - } - return cps, nil -} - // sortCheckpoints sorts the slice in-place (descending by version, peers, hash). func sortCheckpoints(s []*checkpointsync.Checkpoint) { slices.SortFunc(s, func(a, b *checkpointsync.Checkpoint) int { diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index e141dd77483..fae4adc8701 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -35,7 +35,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync" storagePub "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/pub" - "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" ) var ( @@ -130,9 +129,8 @@ type Node struct { // nolint: maligned localStorage storageApi.LocalBackend - diffSync diffsync.Client - checkpointSync checkpointsync.Client - legacyStorageSync synclegacy.Client + diffSync diffsync.Client + checkpointSync checkpointsync.Client undefinedRound uint64 @@ -226,7 +224,6 @@ func NewNode( }) // Advertise and serve p2p protocols. - commonNode.P2P.RegisterProtocolServer(synclegacy.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) commonNode.P2P.RegisterProtocolServer(diffsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) if config.GlobalConfig.Storage.Checkpointer.Enabled { commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) @@ -236,7 +233,6 @@ func NewNode( } // Create p2p protocol clients. - n.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) @@ -444,29 +440,13 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { ctx, cancel := context.WithCancel(n.ctx) defer cancel() - wl, pf, err := n.getDiff(ctx, prevRoot, thisRoot) + rsp, pf, err := n.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) if err != nil { result.err = err return } result.pf = pf - result.writeLog = wl -} - -// getDiff fetches writelog using diff sync p2p protocol client. -// -// In case of no peers or error, it fallbacks to the legacy storage sync protocol. -func (n *Node) getDiff(ctx context.Context, prevRoot, thisRoot storageApi.Root) (storageApi.WriteLog, rpc.PeerFeedback, error) { - rsp1, pf, err := n.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) - if err == nil { // if NO error - return rsp1.WriteLog, pf, nil - } - - rsp2, pf, err := n.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) - if err != nil { - return nil, nil, err - } - return rsp2.WriteLog, pf, nil + result.writeLog = rsp.WriteLog } func (n *Node) finalize(summary *blockSummary) { diff --git a/go/worker/storage/p2p/synclegacy/client.go b/go/worker/storage/p2p/synclegacy/client.go deleted file mode 100644 index 702fea0fdad..00000000000 --- a/go/worker/storage/p2p/synclegacy/client.go +++ /dev/null @@ -1,149 +0,0 @@ -package synclegacy - -import ( - "context" - - "github.com/libp2p/go-libp2p/core" - - "github.com/oasisprotocol/oasis-core/go/common" - "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" - "github.com/oasisprotocol/oasis-core/go/p2p/rpc" - "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" -) - -const ( - // minProtocolPeers is the minimum number of peers from the registry we want to have connected - // for StorageSync protocol. - minProtocolPeers = 5 - - // totalProtocolPeers is the number of peers we want to have connected for StorageSync protocol. - totalProtocolPeers = 10 -) - -// Client is a storage sync protocol client. -type Client interface { - // GetDiff requests a write log of entries that must be applied to get from the first given root - // to the second one. - GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) - - // GetCheckpoints returns a list of checkpoint metadata for all known checkpoints. - GetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) ([]*Checkpoint, error) - - // GetCheckpointChunk requests a specific checkpoint chunk. - GetCheckpointChunk( - ctx context.Context, - request *GetCheckpointChunkRequest, - cp *Checkpoint, - ) (*GetCheckpointChunkResponse, rpc.PeerFeedback, error) -} - -// Checkpoint contains checkpoint metadata together with peer information. -type Checkpoint struct { - *checkpoint.Metadata - - // Peers are the feedback structures of all the peers that have advertised this checkpoint. - Peers []rpc.PeerFeedback -} - -type client struct { - rcC rpc.Client - rcD rpc.Client - mgrC rpc.PeerManager - mgrD rpc.PeerManager -} - -func (c *client) GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) { - var rsp GetDiffResponse - pf, err := c.rcD.CallOne(ctx, c.mgrD.GetBestPeers(), MethodGetDiff, request, &rsp, - rpc.WithMaxPeerResponseTime(MaxGetDiffResponseTime), - ) - if err != nil { - return nil, nil, err - } - return &rsp, pf, nil -} - -func (c *client) GetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) ([]*Checkpoint, error) { - var rsp GetCheckpointsResponse - rsps, pfs, err := c.rcC.CallMulti(ctx, c.mgrC.GetBestPeers(), MethodGetCheckpoints, request, rsp) - if err != nil { - return nil, err - } - - // Combine deduplicated results into a single result. - var checkpoints []*Checkpoint - cps := make(map[hash.Hash]*Checkpoint) - for i, peerRsp := range rsps { - peerCps := peerRsp.(*GetCheckpointsResponse).Checkpoints - - for _, cpMeta := range peerCps { - h := cpMeta.EncodedHash() - cp := cps[h] - if cp == nil { - cp = &Checkpoint{ - Metadata: cpMeta, - } - cps[h] = cp - checkpoints = append(checkpoints, cp) - } - cp.Peers = append(cp.Peers, pfs[i]) - } - - // Record success for a peer if it returned at least one checkpoint. - if len(peerCps) > 0 { - pfs[i].RecordSuccess() - } - } - return checkpoints, nil -} - -func (c *client) GetCheckpointChunk( - ctx context.Context, - request *GetCheckpointChunkRequest, - cp *Checkpoint, -) (*GetCheckpointChunkResponse, rpc.PeerFeedback, error) { - var opts []rpc.BestPeersOption - // When a checkpoint is passed, we limit requests to only those peers that actually advertised - // having the checkpoint in question to avoid needless requests. - if cp != nil { - peers := make([]core.PeerID, 0, len(cp.Peers)) - for _, pf := range cp.Peers { - peers = append(peers, pf.PeerID()) - } - opts = append(opts, rpc.WithLimitPeers(peers)) - } - - var rsp GetCheckpointChunkResponse - pf, err := c.rcC.CallOne(ctx, c.mgrC.GetBestPeers(opts...), MethodGetCheckpointChunk, request, &rsp, - rpc.WithMaxPeerResponseTime(MaxGetCheckpointChunkResponseTime), - ) - if err != nil { - return nil, nil, err - } - return &rsp, pf, nil -} - -// NewClient creates a new storage sync protocol client. -func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { - // Use two separate clients and managers for the same protocol. This is to make sure that peers - // are scored differently between the two use cases (syncing diffs vs. syncing checkpoints). We - // could consider separating this into two protocols in the future. - pid := ProtocolID(chainContext, runtimeID) - - rcC := rpc.NewClient(p2p.Host(), pid) - mgrC := rpc.NewPeerManager(p2p, pid) - rcC.RegisterListener(mgrC) - - rcD := rpc.NewClient(p2p.Host(), pid) - mgrD := rpc.NewPeerManager(p2p, pid) - rcD.RegisterListener(mgrD) - - p2p.RegisterProtocol(pid, minProtocolPeers, totalProtocolPeers) - - return &client{ - rcC: rcC, - rcD: rcD, - mgrC: mgrC, - mgrD: mgrD, - } -} diff --git a/go/worker/storage/p2p/synclegacy/protocol.go b/go/worker/storage/p2p/synclegacy/protocol.go deleted file mode 100644 index e97a05efa09..00000000000 --- a/go/worker/storage/p2p/synclegacy/protocol.go +++ /dev/null @@ -1,101 +0,0 @@ -// Package synclegacy defines wire protocol together with client/server -// implementations for the legacy storage sync protocol, used for runtime block sync. -// -// The protocol was split into storage diff and checkpoints protocol. -// -// TODO: Remove it: https://github.com/oasisprotocol/oasis-core/issues/6261 -package synclegacy - -import ( - "time" - - "github.com/libp2p/go-libp2p/core" - - "github.com/oasisprotocol/oasis-core/go/common" - "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" - "github.com/oasisprotocol/oasis-core/go/common/node" - "github.com/oasisprotocol/oasis-core/go/common/version" - "github.com/oasisprotocol/oasis-core/go/p2p/peermgmt" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" - storage "github.com/oasisprotocol/oasis-core/go/storage/api" - "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" -) - -// StorageSyncProtocolID is a unique protocol identifier for the storage sync protocol. -const StorageSyncProtocolID = "storagesync" - -// StorageSyncProtocolVersion is the supported version of the storage sync protocol. -var StorageSyncProtocolVersion = version.Version{Major: 2, Minor: 0, Patch: 0} - -// ProtocolID returns the runtime storage sync protocol ID. -func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID { - return protocol.NewRuntimeProtocolID(chainContext, runtimeID, StorageSyncProtocolID, StorageSyncProtocolVersion) -} - -// Constants related to the GetDiff method. -const ( - MethodGetDiff = "GetDiff" - MaxGetDiffResponseTime = 15 * time.Second -) - -// GetDiffRequest is a GetDiff request. -type GetDiffRequest struct { - StartRoot storage.Root `json:"start_root"` - EndRoot storage.Root `json:"end_root"` -} - -// GetDiffResponse is a response to a GetDiff request. -type GetDiffResponse struct { - WriteLog storage.WriteLog `json:"write_log,omitempty"` -} - -// Constants related to the GetCheckpoints method. -const ( - MethodGetCheckpoints = "GetCheckpoints" -) - -// GetCheckpointsRequest is a GetCheckpoints request. -type GetCheckpointsRequest struct { - Version uint16 `json:"version"` -} - -// GetCheckpointsResponse is a response to a GetCheckpoints request. -type GetCheckpointsResponse struct { - Checkpoints []*checkpoint.Metadata `json:"checkpoints,omitempty"` -} - -// Constants related to the GetCheckpointChunk method. -const ( - MethodGetCheckpointChunk = "GetCheckpointChunk" - MaxGetCheckpointChunkResponseTime = 60 * time.Second -) - -// GetCheckpointChunkRequest is a GetCheckpointChunk request. -type GetCheckpointChunkRequest struct { - Version uint16 `json:"version"` - Root storage.Root `json:"root"` - Index uint64 `json:"index"` - Digest hash.Hash `json:"digest"` -} - -// GetCheckpointChunkResponse is a response to a GetCheckpointChunk request. -type GetCheckpointChunkResponse struct { - Chunk []byte `json:"chunk,omitempty"` -} - -func init() { - peermgmt.RegisterNodeHandler(&peermgmt.NodeHandlerBundle{ - ProtocolsFn: func(n *node.Node, chainContext string) []core.ProtocolID { - if !n.HasRoles(node.RoleComputeWorker | node.RoleStorageRPC) { - return []core.ProtocolID{} - } - - protocols := make([]core.ProtocolID, len(n.Runtimes)) - for i, rt := range n.Runtimes { - protocols[i] = ProtocolID(chainContext, rt.ID) - } - - return protocols - }, - }) -} diff --git a/go/worker/storage/p2p/synclegacy/server.go b/go/worker/storage/p2p/synclegacy/server.go deleted file mode 100644 index a09c342bcaf..00000000000 --- a/go/worker/storage/p2p/synclegacy/server.go +++ /dev/null @@ -1,108 +0,0 @@ -package synclegacy - -import ( - "bytes" - "context" - - "github.com/oasisprotocol/oasis-core/go/common" - "github.com/oasisprotocol/oasis-core/go/common/cbor" - "github.com/oasisprotocol/oasis-core/go/p2p/rpc" - storage "github.com/oasisprotocol/oasis-core/go/storage/api" - "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" -) - -type service struct { - backend storage.Backend -} - -func (s *service) HandleRequest(ctx context.Context, method string, body cbor.RawMessage) (any, error) { - switch method { - case MethodGetDiff: - var rq GetDiffRequest - if err := cbor.Unmarshal(body, &rq); err != nil { - return nil, rpc.ErrBadRequest - } - - return s.handleGetDiff(ctx, &rq) - case MethodGetCheckpoints: - var rq GetCheckpointsRequest - if err := cbor.Unmarshal(body, &rq); err != nil { - return nil, rpc.ErrBadRequest - } - - return s.handleGetCheckpoints(ctx, &rq) - case MethodGetCheckpointChunk: - var rq GetCheckpointChunkRequest - if err := cbor.Unmarshal(body, &rq); err != nil { - return nil, rpc.ErrBadRequest - } - - return s.handleGetCheckpointChunk(ctx, &rq) - default: - return nil, rpc.ErrMethodNotSupported - } -} - -func (s *service) handleGetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, error) { - it, err := s.backend.GetDiff(ctx, &storage.GetDiffRequest{ - StartRoot: request.StartRoot, - EndRoot: request.EndRoot, - }) - if err != nil { - return nil, err - } - - var rsp GetDiffResponse - for { - more, err := it.Next() - if err != nil { - return nil, err - } - if !more { - break - } - - chunk, err := it.Value() - if err != nil { - return nil, err - } - rsp.WriteLog = append(rsp.WriteLog, chunk) - } - return &rsp, nil -} - -func (s *service) handleGetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) (*GetCheckpointsResponse, error) { - cps, err := s.backend.GetCheckpoints(ctx, &checkpoint.GetCheckpointsRequest{ - Version: request.Version, - }) - if err != nil { - return nil, err - } - - return &GetCheckpointsResponse{ - Checkpoints: cps, - }, nil -} - -func (s *service) handleGetCheckpointChunk(ctx context.Context, request *GetCheckpointChunkRequest) (*GetCheckpointChunkResponse, error) { - // TODO: Use stream resource manager to track buffer use. - var buf bytes.Buffer - err := s.backend.GetCheckpointChunk(ctx, &checkpoint.ChunkMetadata{ - Version: request.Version, - Root: request.Root, - Index: request.Index, - Digest: request.Digest, - }, &buf) - if err != nil { - return nil, err - } - - return &GetCheckpointChunkResponse{ - Chunk: buf.Bytes(), - }, nil -} - -// NewServer creates a new storage sync protocol server. -func NewServer(chainContext string, runtimeID common.Namespace, backend storage.Backend) rpc.Server { - return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{backend}) -}