From 4213d9a5c032c968a09dda9c3cd9e039142514a5 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Wed, 23 Jul 2025 22:11:07 +0200 Subject: [PATCH 1/5] go/p2p/rpc: Make CallMulti fail if 0 peers --- go/p2p/rpc/client.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/p2p/rpc/client.go b/go/p2p/rpc/client.go index 888045ad471..e471db89092 100644 --- a/go/p2p/rpc/client.go +++ b/go/p2p/rpc/client.go @@ -364,6 +364,10 @@ func (c *client) CallMulti( ) ([]any, []PeerFeedback, error) { c.logger.Debug("call multiple", "method", method) + if len(peers) == 0 { + return nil, nil, fmt.Errorf("no peers given to service the request") + } + co := NewCallMultiOptions(opts...) // Prepare the request. From fc405de209bb3c815530db947e2fba0cf8b7a8da Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Wed, 23 Jul 2025 23:05:42 +0200 Subject: [PATCH 2/5] go/worker/storage/p2p: Split storage sync protocol --- .../storage/p2p/checkpointsync/client.go | 125 ++++++++++++++++++ .../storage/p2p/checkpointsync/protocol.go | 80 +++++++++++ .../storage/p2p/checkpointsync/server.go | 73 ++++++++++ go/worker/storage/p2p/diffsync/client.go | 58 ++++++++ go/worker/storage/p2p/diffsync/protocol.go | 61 +++++++++ go/worker/storage/p2p/diffsync/server.go | 61 +++++++++ 6 files changed, 458 insertions(+) create mode 100644 go/worker/storage/p2p/checkpointsync/client.go create mode 100644 go/worker/storage/p2p/checkpointsync/protocol.go create mode 100644 go/worker/storage/p2p/checkpointsync/server.go create mode 100644 go/worker/storage/p2p/diffsync/client.go create mode 100644 go/worker/storage/p2p/diffsync/protocol.go create mode 100644 go/worker/storage/p2p/diffsync/server.go diff --git a/go/worker/storage/p2p/checkpointsync/client.go b/go/worker/storage/p2p/checkpointsync/client.go new file mode 100644 index 00000000000..64edb9ecc0a --- /dev/null +++ b/go/worker/storage/p2p/checkpointsync/client.go @@ -0,0 +1,125 @@ +package checkpointsync + +import ( + "context" + + "github.com/libp2p/go-libp2p/core" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/p2p/protocol" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" +) + +const ( + // minProtocolPeers is the minimum number of peers from the registry we want to have connected + // for checkpoint sync protocol. + minProtocolPeers = 5 + + // totalProtocolPeers is the number of peers we want to have connected for checkpoint sync protocol. + totalProtocolPeers = 10 +) + +// Client is a checkpoint sync protocol client. +type Client interface { + // GetCheckpoints returns a list of checkpoint metadata for all known checkpoints. + GetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) ([]*Checkpoint, error) + + // GetCheckpointChunk requests a specific checkpoint chunk. + GetCheckpointChunk( + ctx context.Context, + request *GetCheckpointChunkRequest, + cp *Checkpoint, + ) (*GetCheckpointChunkResponse, rpc.PeerFeedback, error) +} + +// Checkpoint contains checkpoint metadata together with peer information. +type Checkpoint struct { + *checkpoint.Metadata + + // Peers are the feedback structures of all the peers that have advertised this checkpoint. + Peers []rpc.PeerFeedback +} + +type client struct { + rc rpc.Client + mgr rpc.PeerManager +} + +func (c *client) GetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) ([]*Checkpoint, error) { + var rsp GetCheckpointsResponse + rsps, pfs, err := c.rc.CallMulti(ctx, c.mgr.GetBestPeers(), MethodGetCheckpoints, request, rsp) + if err != nil { + return nil, err + } + + // Combine deduplicated results into a single result. + var checkpoints []*Checkpoint + cps := make(map[hash.Hash]*Checkpoint) + for i, peerRsp := range rsps { + peerCps := peerRsp.(*GetCheckpointsResponse).Checkpoints + + for _, cpMeta := range peerCps { + h := cpMeta.EncodedHash() + cp := cps[h] + if cp == nil { + cp = &Checkpoint{ + Metadata: cpMeta, + } + cps[h] = cp + checkpoints = append(checkpoints, cp) + } + cp.Peers = append(cp.Peers, pfs[i]) + } + + // Record success for a peer if it returned at least one checkpoint. + if len(peerCps) > 0 { + pfs[i].RecordSuccess() + } + } + return checkpoints, nil +} + +func (c *client) GetCheckpointChunk( + ctx context.Context, + request *GetCheckpointChunkRequest, + cp *Checkpoint, +) (*GetCheckpointChunkResponse, rpc.PeerFeedback, error) { + var opts []rpc.BestPeersOption + // When a checkpoint is passed, we limit requests to only those peers that actually advertised + // having the checkpoint in question to avoid needless requests. + if cp != nil { + peers := make([]core.PeerID, 0, len(cp.Peers)) + for _, pf := range cp.Peers { + peers = append(peers, pf.PeerID()) + } + opts = append(opts, rpc.WithLimitPeers(peers)) + } + + var rsp GetCheckpointChunkResponse + pf, err := c.rc.CallOne(ctx, c.mgr.GetBestPeers(opts...), MethodGetCheckpointChunk, request, &rsp, + rpc.WithMaxPeerResponseTime(MaxGetCheckpointChunkResponseTime), + ) + if err != nil { + return nil, nil, err + } + return &rsp, pf, nil +} + +// NewClient creates a new checkpoint sync protocol client. +// +// Moreover, it ensures underlying p2p service starts tracking protocol peers. +func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { + pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion) + rc := rpc.NewClient(p2p.Host(), pid) + mgr := rpc.NewPeerManager(p2p, pid) + rc.RegisterListener(mgr) + + p2p.RegisterProtocol(pid, minProtocolPeers, totalProtocolPeers) + + return &client{ + rc: rc, + mgr: mgr, + } +} diff --git a/go/worker/storage/p2p/checkpointsync/protocol.go b/go/worker/storage/p2p/checkpointsync/protocol.go new file mode 100644 index 00000000000..140f5e3d618 --- /dev/null +++ b/go/worker/storage/p2p/checkpointsync/protocol.go @@ -0,0 +1,80 @@ +// Package checkpointsync defines wire protocol together with client/server +// implementations for the checkpoint sync protocol, used for runtime state sync. +package checkpointsync + +import ( + "time" + + "github.com/libp2p/go-libp2p/core" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/common/node" + "github.com/oasisprotocol/oasis-core/go/common/version" + "github.com/oasisprotocol/oasis-core/go/p2p/peermgmt" + "github.com/oasisprotocol/oasis-core/go/p2p/protocol" + "github.com/oasisprotocol/oasis-core/go/storage/api" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" +) + +// CheckpointSyncProtocolID is a unique protocol identifier for the checkpoint sync protocol. +const CheckpointSyncProtocolID = "checkpointsync" + +// CheckpointSyncProtocolVersion is the supported version of the checkpoint sync protocol. +var CheckpointSyncProtocolVersion = version.Version{Major: 1, Minor: 0, Patch: 0} + +// ProtocolID returns the runtime checkpoint sync protocol ID. +func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID { + return protocol.NewRuntimeProtocolID(chainContext, runtimeID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion) +} + +// Constants related to the GetCheckpoints method. +const ( + MethodGetCheckpoints = "GetCheckpoints" +) + +// GetCheckpointsRequest is a GetCheckpoints request. +type GetCheckpointsRequest struct { + Version uint16 `json:"version"` +} + +// GetCheckpointsResponse is a response to a GetCheckpoints request. +type GetCheckpointsResponse struct { + Checkpoints []*checkpoint.Metadata `json:"checkpoints,omitempty"` +} + +// Constants related to the GetCheckpointChunk method. +const ( + MethodGetCheckpointChunk = "GetCheckpointChunk" + MaxGetCheckpointChunkResponseTime = time.Minute +) + +// GetCheckpointChunkRequest is a GetCheckpointChunk request. +type GetCheckpointChunkRequest struct { + Version uint16 `json:"version"` + Root api.Root `json:"root"` + Index uint64 `json:"index"` + Digest hash.Hash `json:"digest"` +} + +// GetCheckpointChunkResponse is a response to a GetCheckpointChunk request. +type GetCheckpointChunkResponse struct { + Chunk []byte `json:"chunk,omitempty"` +} + +func init() { + peermgmt.RegisterNodeHandler(&peermgmt.NodeHandlerBundle{ + ProtocolsFn: func(n *node.Node, chainContext string) []core.ProtocolID { + if !n.HasRoles(node.RoleComputeWorker | node.RoleStorageRPC) { + return []core.ProtocolID{} + } + + protocols := make([]core.ProtocolID, len(n.Runtimes)) + for i, rt := range n.Runtimes { + protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion) + } + + return protocols + }, + }) +} diff --git a/go/worker/storage/p2p/checkpointsync/server.go b/go/worker/storage/p2p/checkpointsync/server.go new file mode 100644 index 00000000000..2df4d0f68db --- /dev/null +++ b/go/worker/storage/p2p/checkpointsync/server.go @@ -0,0 +1,73 @@ +package checkpointsync + +import ( + "bytes" + "context" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/cbor" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" + "github.com/oasisprotocol/oasis-core/go/storage/api" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" +) + +type service struct { + backend api.Backend +} + +func (s *service) HandleRequest(ctx context.Context, method string, body cbor.RawMessage) (any, error) { + switch method { + case MethodGetCheckpoints: + var rq GetCheckpointsRequest + if err := cbor.Unmarshal(body, &rq); err != nil { + return nil, rpc.ErrBadRequest + } + + return s.handleGetCheckpoints(ctx, &rq) + case MethodGetCheckpointChunk: + var rq GetCheckpointChunkRequest + if err := cbor.Unmarshal(body, &rq); err != nil { + return nil, rpc.ErrBadRequest + } + + return s.handleGetCheckpointChunk(ctx, &rq) + default: + return nil, rpc.ErrMethodNotSupported + } +} + +func (s *service) handleGetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) (*GetCheckpointsResponse, error) { + cps, err := s.backend.GetCheckpoints(ctx, &checkpoint.GetCheckpointsRequest{ + Version: request.Version, + }) + if err != nil { + return nil, err + } + + return &GetCheckpointsResponse{ + Checkpoints: cps, + }, nil +} + +func (s *service) handleGetCheckpointChunk(ctx context.Context, request *GetCheckpointChunkRequest) (*GetCheckpointChunkResponse, error) { + // Consider using stream resource manager to track buffer use. + var buf bytes.Buffer + err := s.backend.GetCheckpointChunk(ctx, &checkpoint.ChunkMetadata{ + Version: request.Version, + Root: request.Root, + Index: request.Index, + Digest: request.Digest, + }, &buf) + if err != nil { + return nil, err + } + + return &GetCheckpointChunkResponse{ + Chunk: buf.Bytes(), + }, nil +} + +// NewServer creates a new checkpoint sync protocol server. +func NewServer(chainContext string, runtimeID common.Namespace, backend api.Backend) rpc.Server { + return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{backend}) +} diff --git a/go/worker/storage/p2p/diffsync/client.go b/go/worker/storage/p2p/diffsync/client.go new file mode 100644 index 00000000000..6fcfc6d111b --- /dev/null +++ b/go/worker/storage/p2p/diffsync/client.go @@ -0,0 +1,58 @@ +package diffsync + +import ( + "context" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/p2p/protocol" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" +) + +const ( + // minProtocolPeers is the minimum number of peers from the registry we want to have connected + // for diff sync protocol. + minProtocolPeers = 5 + + // totalProtocolPeers is the number of peers we want to have connected for diff sync protocol. + totalProtocolPeers = 10 +) + +// Client is a diff sync protocol client. +type Client interface { + // GetDiff requests a write log of entries that must be applied to get from the first given root + // to the second one. + GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) +} + +type client struct { + rc rpc.Client + mgr rpc.PeerManager +} + +func (c *client) GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) { + var rsp GetDiffResponse + pf, err := c.rc.CallOne(ctx, c.mgr.GetBestPeers(), MethodGetDiff, request, &rsp, + rpc.WithMaxPeerResponseTime(MaxGetDiffResponseTime), + ) + if err != nil { + return nil, nil, err + } + return &rsp, pf, nil +} + +// NewClient creates a new diff sync protocol client. +// +// Moreover, it ensures underlying p2p service starts tracking protocol peers. +func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { + pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, DiffSyncProtocolID, DiffSyncProtocolVersion) + rc := rpc.NewClient(p2p.Host(), pid) + mgr := rpc.NewPeerManager(p2p, pid) + rc.RegisterListener(mgr) + + p2p.RegisterProtocol(pid, minProtocolPeers, totalProtocolPeers) + + return &client{ + rc: rc, + mgr: mgr, + } +} diff --git a/go/worker/storage/p2p/diffsync/protocol.go b/go/worker/storage/p2p/diffsync/protocol.go new file mode 100644 index 00000000000..c53304d5666 --- /dev/null +++ b/go/worker/storage/p2p/diffsync/protocol.go @@ -0,0 +1,61 @@ +// Package diffsync defines wire protocol together with client/server +// implementations for the diff sync protocol, used for runtime block sync. +package diffsync + +import ( + "time" + + "github.com/libp2p/go-libp2p/core" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/node" + "github.com/oasisprotocol/oasis-core/go/common/version" + "github.com/oasisprotocol/oasis-core/go/p2p/peermgmt" + "github.com/oasisprotocol/oasis-core/go/p2p/protocol" + "github.com/oasisprotocol/oasis-core/go/storage/api" +) + +// DiffSyncProtocolID is a unique protocol identifier for the diff sync protocol. +const DiffSyncProtocolID = "diffsync" + +// DiffSyncProtocolVersion is the supported version of the diff sync protocol. +var DiffSyncProtocolVersion = version.Version{Major: 1, Minor: 0, Patch: 0} + +// ProtocolID returns the runtime diff sync protocol ID. +func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID { + return protocol.NewRuntimeProtocolID(chainContext, runtimeID, DiffSyncProtocolID, DiffSyncProtocolVersion) +} + +// Constants related to the GetDiff method. +const ( + MethodGetDiff = "GetDiff" + MaxGetDiffResponseTime = 15 * time.Second +) + +// GetDiffRequest is a GetDiff request. +type GetDiffRequest struct { + StartRoot api.Root `json:"start_root"` + EndRoot api.Root `json:"end_root"` +} + +// GetDiffResponse is a response to a GetDiff request. +type GetDiffResponse struct { + WriteLog api.WriteLog `json:"write_log,omitempty"` +} + +func init() { + peermgmt.RegisterNodeHandler(&peermgmt.NodeHandlerBundle{ + ProtocolsFn: func(n *node.Node, chainContext string) []core.ProtocolID { + if !n.HasRoles(node.RoleComputeWorker | node.RoleStorageRPC) { + return []core.ProtocolID{} + } + + protocols := make([]core.ProtocolID, len(n.Runtimes)) + for i, rt := range n.Runtimes { + protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, DiffSyncProtocolID, DiffSyncProtocolVersion) + } + + return protocols + }, + }) +} diff --git a/go/worker/storage/p2p/diffsync/server.go b/go/worker/storage/p2p/diffsync/server.go new file mode 100644 index 00000000000..4267f8dc6a8 --- /dev/null +++ b/go/worker/storage/p2p/diffsync/server.go @@ -0,0 +1,61 @@ +package diffsync + +import ( + "context" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/cbor" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" + "github.com/oasisprotocol/oasis-core/go/storage/api" +) + +type service struct { + backend api.Backend +} + +func (s *service) HandleRequest(ctx context.Context, method string, body cbor.RawMessage) (any, error) { + switch method { + case MethodGetDiff: + var rq GetDiffRequest + if err := cbor.Unmarshal(body, &rq); err != nil { + return nil, rpc.ErrBadRequest + } + + return s.handleGetDiff(ctx, &rq) + default: + return nil, rpc.ErrMethodNotSupported + } +} + +func (s *service) handleGetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, error) { + it, err := s.backend.GetDiff(ctx, &api.GetDiffRequest{ + StartRoot: request.StartRoot, + EndRoot: request.EndRoot, + }) + if err != nil { + return nil, err + } + + var rsp GetDiffResponse + for { + more, err := it.Next() + if err != nil { + return nil, err + } + if !more { + break + } + + chunk, err := it.Value() + if err != nil { + return nil, err + } + rsp.WriteLog = append(rsp.WriteLog, chunk) + } + return &rsp, nil +} + +// NewServer creates a new diff sync protocol server. +func NewServer(chainContext string, runtimeID common.Namespace, backend api.Backend) rpc.Server { + return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{backend}) +} From 7e40e635aaf25d9328809eab6caee5cb127be498 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Thu, 24 Jul 2025 00:18:29 +0200 Subject: [PATCH 3/5] go/worker/storage: Integrate new p2p protocols Legacy storage sync protocol is still advertised and served to enable seamless rolling upgrades of the network. --- .changelog/5751.feature.md | 12 +++ go/oasis-node/cmd/debug/byzantine/node.go | 4 +- .../storage/committee/checkpoint_sync.go | 98 +++++++++++++++---- .../storage/committee/checkpoint_sync_test.go | 14 +-- go/worker/storage/committee/node.go | 44 +++++++-- .../p2p/{sync => synclegacy}/client.go | 2 +- .../p2p/{sync => synclegacy}/protocol.go | 8 +- .../p2p/{sync => synclegacy}/server.go | 2 +- 8 files changed, 145 insertions(+), 39 deletions(-) create mode 100644 .changelog/5751.feature.md rename go/worker/storage/p2p/{sync => synclegacy}/client.go (99%) rename go/worker/storage/p2p/{sync => synclegacy}/protocol.go (90%) rename go/worker/storage/p2p/{sync => synclegacy}/server.go (99%) diff --git a/.changelog/5751.feature.md b/.changelog/5751.feature.md new file mode 100644 index 00000000000..766413d7ce9 --- /dev/null +++ b/.changelog/5751.feature.md @@ -0,0 +1,12 @@ +go: Split storage sync p2p protocol + +Storage sync protocol was split into two independent protocols (checkpoint +and diff sync). + +This change was made since there may be fewer nodes that expose checkpoints +than storage diff. Previously, this could lead to issues with state sync +when a node was connected with peers that supported storage sync protocol +but had no checkpoints available. + +This was done in backwards compatible manner, so that both protocols are still +advertised and used. Eventually, we plan to remove legacy protocol. diff --git a/go/oasis-node/cmd/debug/byzantine/node.go b/go/oasis-node/cmd/debug/byzantine/node.go index 447def2d766..3920f96f11e 100644 --- a/go/oasis-node/cmd/debug/byzantine/node.go +++ b/go/oasis-node/cmd/debug/byzantine/node.go @@ -22,7 +22,7 @@ import ( scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api" storage "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/worker/client" - storageP2P "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" ) type byzantine struct { @@ -154,7 +154,7 @@ func initializeAndRegisterByzantineNode( if err != nil { return nil, fmt.Errorf("initializing storage node failed: %w", err) } - b.p2p.service.RegisterProtocolServer(storageP2P.NewServer(b.chainContext, b.runtimeID, storage)) + b.p2p.service.RegisterProtocolServer(synclegacy.NewServer(b.chainContext, b.runtimeID, storage)) b.storage = storage // Wait for activation epoch. diff --git a/go/worker/storage/committee/checkpoint_sync.go b/go/worker/storage/committee/checkpoint_sync.go index c9236f4cada..ad553272a90 100644 --- a/go/worker/storage/committee/checkpoint_sync.go +++ b/go/worker/storage/committee/checkpoint_sync.go @@ -11,9 +11,11 @@ import ( "sync" "time" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" - storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" ) const ( @@ -55,7 +57,7 @@ type chunk struct { *checkpoint.ChunkMetadata // checkpoint points to the checkpoint this chunk originated from. - checkpoint *storageSync.Checkpoint + checkpoint *checkpointsync.Checkpoint } type chunkHeap struct { @@ -101,12 +103,7 @@ func (n *Node) checkpointChunkFetcher( defer cancel() // Fetch chunk from peers. - rsp, pf, err := n.storageSync.GetCheckpointChunk(chunkCtx, &storageSync.GetCheckpointChunkRequest{ - Version: chunk.Version, - Root: chunk.Root, - Index: chunk.Index, - Digest: chunk.Digest, - }, chunk.checkpoint) + rsp, pf, err := n.fetchChunk(chunkCtx, chunk) if err != nil { n.logger.Error("failed to fetch chunk from peers", "err", err, @@ -117,7 +114,7 @@ func (n *Node) checkpointChunkFetcher( } // Restore fetched chunk. - done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp.Chunk)) + done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp)) cancel() switch { @@ -157,7 +154,47 @@ func (n *Node) checkpointChunkFetcher( } } -func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { +// fetchChunk fetches chunk using checkpoint sync p2p protocol client. +// +// In case of no peers or error, it fallbacks to the legacy storage sync protocol. +func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFeedback, error) { + rsp1, pf, err := n.checkpointSync.GetCheckpointChunk( + ctx, + &checkpointsync.GetCheckpointChunkRequest{ + Version: chunk.Version, + Root: chunk.Root, + Index: chunk.Index, + Digest: chunk.Digest, + }, + &checkpointsync.Checkpoint{ + Metadata: chunk.checkpoint.Metadata, + Peers: chunk.checkpoint.Peers, + }, + ) + if err == nil { // if NO error + return rsp1.Chunk, pf, nil + } + + rsp2, pf, err := n.legacyStorageSync.GetCheckpointChunk( + ctx, + &synclegacy.GetCheckpointChunkRequest{ + Version: chunk.Version, + Root: chunk.Root, + Index: chunk.Index, + Digest: chunk.Digest, + }, + &synclegacy.Checkpoint{ + Metadata: chunk.checkpoint.Metadata, + Peers: chunk.checkpoint.Peers, + }, + ) + if err != nil { + return nil, nil, err + } + return rsp2.Chunk, pf, nil +} + +func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check.Metadata); err != nil { // Any previous restores were already aborted by the driver up the call stack, so // things should have been going smoothly here; bail. @@ -276,13 +313,11 @@ func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelReques } } -func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) { +func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) { ctx, cancel := context.WithTimeout(n.ctx, cpListsTimeout) defer cancel() - list, err := n.storageSync.GetCheckpoints(ctx, &storageSync.GetCheckpointsRequest{ - Version: 1, - }) + list, err := n.fetchCheckpoints(ctx) if err != nil { n.logger.Error("failed to retrieve any checkpoints", "err", err, @@ -296,9 +331,36 @@ func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) { return list, nil } +// fetchCheckpoints fetches checkpoints using checkpoint sync p2p protocol client. +// +// In case of no peers, error or no checkpoints, it fallbacks to the legacy storage sync protocol. +func (n *Node) fetchCheckpoints(ctx context.Context) ([]*checkpointsync.Checkpoint, error) { + list1, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{ + Version: 1, + }) + if err == nil && len(list1) > 0 { // if NO error and at least one checkpoint + return list1, nil + } + + list2, err := n.legacyStorageSync.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{ + Version: 1, + }) + if err != nil { + return nil, err + } + var cps []*checkpointsync.Checkpoint + for _, cp := range list2 { + cps = append(cps, &checkpointsync.Checkpoint{ + Metadata: cp.Metadata, + Peers: cp.Peers, + }) + } + return cps, nil +} + // sortCheckpoints sorts the slice in-place (descending by version, peers, hash). -func sortCheckpoints(s []*storageSync.Checkpoint) { - slices.SortFunc(s, func(a, b *storageSync.Checkpoint) int { +func sortCheckpoints(s []*checkpointsync.Checkpoint) { + slices.SortFunc(s, func(a, b *checkpointsync.Checkpoint) int { return cmp.Or( cmp.Compare(b.Root.Version, a.Root.Version), cmp.Compare(len(b.Peers), len(a.Peers)), @@ -307,7 +369,7 @@ func sortCheckpoints(s []*storageSync.Checkpoint) { }) } -func (n *Node) checkCheckpointUsable(cp *storageSync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { +func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { namespace := n.commonNode.Runtime.ID() if !namespace.Equal(&cp.Root.Namespace) { // Not for the right runtime. @@ -357,7 +419,7 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc // If we only want the genesis checkpoint, filter it out. if wantOnlyGenesis && len(cps) > 0 { - var filteredCps []*storageSync.Checkpoint + var filteredCps []*checkpointsync.Checkpoint for _, cp := range cps { if cp.Root.Version == genesisRound { filteredCps = append(filteredCps, cp) diff --git a/go/worker/storage/committee/checkpoint_sync_test.go b/go/worker/storage/committee/checkpoint_sync_test.go index 2e0ff2c206d..d39e50f3239 100644 --- a/go/worker/storage/committee/checkpoint_sync_test.go +++ b/go/worker/storage/committee/checkpoint_sync_test.go @@ -8,11 +8,11 @@ import ( "github.com/oasisprotocol/oasis-core/go/p2p/rpc" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/node" - "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" ) func TestSortCheckpoints(t *testing.T) { - cp1 := &sync.Checkpoint{ + cp1 := &checkpointsync.Checkpoint{ Metadata: &checkpoint.Metadata{ Root: node.Root{ Version: 2, @@ -20,7 +20,7 @@ func TestSortCheckpoints(t *testing.T) { }, Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()}, } - cp2 := &sync.Checkpoint{ + cp2 := &checkpointsync.Checkpoint{ Metadata: &checkpoint.Metadata{ Root: node.Root{ Version: 2, @@ -28,7 +28,7 @@ func TestSortCheckpoints(t *testing.T) { }, Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()}, } - cp3 := &sync.Checkpoint{ + cp3 := &checkpointsync.Checkpoint{ Metadata: &checkpoint.Metadata{ Root: node.Root{ Version: 1, @@ -36,7 +36,7 @@ func TestSortCheckpoints(t *testing.T) { }, Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()}, } - cp4 := &sync.Checkpoint{ + cp4 := &checkpointsync.Checkpoint{ Metadata: &checkpoint.Metadata{ Root: node.Root{ Version: 1, @@ -45,9 +45,9 @@ func TestSortCheckpoints(t *testing.T) { Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()}, } - s := []*sync.Checkpoint{cp2, cp3, cp4, cp1} + s := []*checkpointsync.Checkpoint{cp2, cp3, cp4, cp1} sortCheckpoints(s) - assert.Equal(t, s, []*sync.Checkpoint{cp1, cp2, cp3, cp4}) + assert.Equal(t, s, []*checkpointsync.Checkpoint{cp1, cp2, cp3, cp4}) } diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index 9832cb98648..dfa8dbff18f 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -32,8 +32,10 @@ import ( "github.com/oasisprotocol/oasis-core/go/worker/common/committee" "github.com/oasisprotocol/oasis-core/go/worker/registration" "github.com/oasisprotocol/oasis-core/go/worker/storage/api" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync" storagePub "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/pub" - storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" ) var ( @@ -128,7 +130,9 @@ type Node struct { // nolint: maligned localStorage storageApi.LocalBackend - storageSync storageSync.Client + diffSync diffsync.Client + checkpointSync checkpointsync.Client + legacyStorageSync synclegacy.Client undefinedRound uint64 @@ -272,15 +276,21 @@ func NewNode( node: n, }) - // Register storage sync service. - commonNode.P2P.RegisterProtocolServer(storageSync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - n.storageSync = storageSync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - - // Register storage pub service if configured. + // Advertise and serve p2p protocols. + commonNode.P2P.RegisterProtocolServer(synclegacy.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + commonNode.P2P.RegisterProtocolServer(diffsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + if checkInterval != checkpoint.CheckIntervalDisabled { + commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + } if rpcRoleProvider != nil { commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) } + // Create p2p protocol clients. + n.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + return n, nil } @@ -430,13 +440,29 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { ctx, cancel := context.WithCancel(n.ctx) defer cancel() - rsp, pf, err := n.storageSync.GetDiff(ctx, &storageSync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) + wl, pf, err := n.getDiff(ctx, prevRoot, thisRoot) if err != nil { result.err = err return } result.pf = pf - result.writeLog = rsp.WriteLog + result.writeLog = wl +} + +// getDiff fetches writelog using diff sync p2p protocol client. +// +// In case of no peers or error, it fallbacks to the legacy storage sync protocol. +func (n *Node) getDiff(ctx context.Context, prevRoot, thisRoot storageApi.Root) (storageApi.WriteLog, rpc.PeerFeedback, error) { + rsp1, pf, err := n.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) + if err == nil { // if NO error + return rsp1.WriteLog, pf, nil + } + + rsp2, pf, err := n.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) + if err != nil { + return nil, nil, err + } + return rsp2.WriteLog, pf, nil } func (n *Node) finalize(summary *blockSummary) { diff --git a/go/worker/storage/p2p/sync/client.go b/go/worker/storage/p2p/synclegacy/client.go similarity index 99% rename from go/worker/storage/p2p/sync/client.go rename to go/worker/storage/p2p/synclegacy/client.go index fdb8ed7379d..2058b2dae53 100644 --- a/go/worker/storage/p2p/sync/client.go +++ b/go/worker/storage/p2p/synclegacy/client.go @@ -1,4 +1,4 @@ -package sync +package synclegacy import ( "context" diff --git a/go/worker/storage/p2p/sync/protocol.go b/go/worker/storage/p2p/synclegacy/protocol.go similarity index 90% rename from go/worker/storage/p2p/sync/protocol.go rename to go/worker/storage/p2p/synclegacy/protocol.go index 7dca895e014..e32d764bca6 100644 --- a/go/worker/storage/p2p/sync/protocol.go +++ b/go/worker/storage/p2p/synclegacy/protocol.go @@ -1,4 +1,10 @@ -package sync +// Package synclegacy defines wire protocol together with client/server +// implementations for the legacy storage sync protocol, used for runtime block sync. +// +// The protocol was split into storage diff and checkpoints protocol. +// +// TODO: Remove it: https://github.com/oasisprotocol/oasis-core/issues/6261 +package synclegacy import ( "time" diff --git a/go/worker/storage/p2p/sync/server.go b/go/worker/storage/p2p/synclegacy/server.go similarity index 99% rename from go/worker/storage/p2p/sync/server.go rename to go/worker/storage/p2p/synclegacy/server.go index 22b6731465a..a09c342bcaf 100644 --- a/go/worker/storage/p2p/sync/server.go +++ b/go/worker/storage/p2p/synclegacy/server.go @@ -1,4 +1,4 @@ -package sync +package synclegacy import ( "bytes" From 490a54d8c14f81f12a9f3d43f033bbff53dbee5e Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Thu, 24 Jul 2025 00:22:40 +0200 Subject: [PATCH 4/5] go/worker: Use ProtocolID helper to avoid duplication --- go/worker/common/p2p/txsync/client.go | 3 +-- go/worker/common/p2p/txsync/protocol.go | 2 +- go/worker/keymanager/p2p/client.go | 3 +-- go/worker/keymanager/p2p/protocol.go | 2 +- go/worker/storage/p2p/checkpointsync/client.go | 3 +-- go/worker/storage/p2p/checkpointsync/protocol.go | 2 +- go/worker/storage/p2p/diffsync/client.go | 3 +-- go/worker/storage/p2p/diffsync/protocol.go | 2 +- go/worker/storage/p2p/pub/client.go | 3 +-- go/worker/storage/p2p/pub/protocol.go | 2 +- go/worker/storage/p2p/synclegacy/client.go | 3 +-- go/worker/storage/p2p/synclegacy/protocol.go | 2 +- 12 files changed, 12 insertions(+), 18 deletions(-) diff --git a/go/worker/common/p2p/txsync/client.go b/go/worker/common/p2p/txsync/client.go index 54d69f6242c..f293986b123 100644 --- a/go/worker/common/p2p/txsync/client.go +++ b/go/worker/common/p2p/txsync/client.go @@ -5,7 +5,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" ) @@ -82,7 +81,7 @@ func (c *client) GetTxs(ctx context.Context, request *GetTxsRequest) (*GetTxsRes // NewClient creates a new transaction sync protocol client. func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { - pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, TxSyncProtocolID, TxSyncProtocolVersion) + pid := ProtocolID(chainContext, runtimeID) mgr := rpc.NewPeerManager(p2p, pid) rc := rpc.NewClient(p2p.Host(), pid) rc.RegisterListener(mgr) diff --git a/go/worker/common/p2p/txsync/protocol.go b/go/worker/common/p2p/txsync/protocol.go index e5aef20bdd7..5d1e05f9676 100644 --- a/go/worker/common/p2p/txsync/protocol.go +++ b/go/worker/common/p2p/txsync/protocol.go @@ -47,7 +47,7 @@ func init() { protocols := make([]core.ProtocolID, len(n.Runtimes)) for i, rt := range n.Runtimes { - protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, TxSyncProtocolID, TxSyncProtocolVersion) + protocols[i] = ProtocolID(chainContext, rt.ID) } return protocols diff --git a/go/worker/keymanager/p2p/client.go b/go/worker/keymanager/p2p/client.go index b7972d1168e..0eb0a3194ca 100644 --- a/go/worker/keymanager/p2p/client.go +++ b/go/worker/keymanager/p2p/client.go @@ -7,7 +7,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" p2p "github.com/oasisprotocol/oasis-core/go/p2p/api" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" ) @@ -46,7 +45,7 @@ func (c *client) CallEnclave(ctx context.Context, request *CallEnclaveRequest, p // NewClient creates a new keymanager protocol client. func NewClient(p2p p2p.Service, chainContext string, keymanagerID common.Namespace) Client { - pid := protocol.NewRuntimeProtocolID(chainContext, keymanagerID, KeyManagerProtocolID, KeyManagerProtocolVersion) + pid := ProtocolID(chainContext, keymanagerID) mgr := rpc.NewPeerManager(p2p, pid) rc := rpc.NewClient(p2p.Host(), pid) rc.RegisterListener(mgr) diff --git a/go/worker/keymanager/p2p/protocol.go b/go/worker/keymanager/p2p/protocol.go index f7dfda7f407..21d7dd383f0 100644 --- a/go/worker/keymanager/p2p/protocol.go +++ b/go/worker/keymanager/p2p/protocol.go @@ -50,7 +50,7 @@ func init() { protocols := make([]core.ProtocolID, len(n.Runtimes)) for i, rt := range n.Runtimes { - protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, KeyManagerProtocolID, KeyManagerProtocolVersion) + protocols[i] = ProtocolID(chainContext, rt.ID) } return protocols diff --git a/go/worker/storage/p2p/checkpointsync/client.go b/go/worker/storage/p2p/checkpointsync/client.go index 64edb9ecc0a..e364a8a54d7 100644 --- a/go/worker/storage/p2p/checkpointsync/client.go +++ b/go/worker/storage/p2p/checkpointsync/client.go @@ -7,7 +7,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" ) @@ -111,7 +110,7 @@ func (c *client) GetCheckpointChunk( // // Moreover, it ensures underlying p2p service starts tracking protocol peers. func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { - pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion) + pid := ProtocolID(chainContext, runtimeID) rc := rpc.NewClient(p2p.Host(), pid) mgr := rpc.NewPeerManager(p2p, pid) rc.RegisterListener(mgr) diff --git a/go/worker/storage/p2p/checkpointsync/protocol.go b/go/worker/storage/p2p/checkpointsync/protocol.go index 140f5e3d618..2b07d38e917 100644 --- a/go/worker/storage/p2p/checkpointsync/protocol.go +++ b/go/worker/storage/p2p/checkpointsync/protocol.go @@ -71,7 +71,7 @@ func init() { protocols := make([]core.ProtocolID, len(n.Runtimes)) for i, rt := range n.Runtimes { - protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion) + protocols[i] = ProtocolID(chainContext, rt.ID) } return protocols diff --git a/go/worker/storage/p2p/diffsync/client.go b/go/worker/storage/p2p/diffsync/client.go index 6fcfc6d111b..4c0363fe21d 100644 --- a/go/worker/storage/p2p/diffsync/client.go +++ b/go/worker/storage/p2p/diffsync/client.go @@ -4,7 +4,6 @@ import ( "context" "github.com/oasisprotocol/oasis-core/go/common" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" ) @@ -44,7 +43,7 @@ func (c *client) GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiff // // Moreover, it ensures underlying p2p service starts tracking protocol peers. func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { - pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, DiffSyncProtocolID, DiffSyncProtocolVersion) + pid := ProtocolID(chainContext, runtimeID) rc := rpc.NewClient(p2p.Host(), pid) mgr := rpc.NewPeerManager(p2p, pid) rc.RegisterListener(mgr) diff --git a/go/worker/storage/p2p/diffsync/protocol.go b/go/worker/storage/p2p/diffsync/protocol.go index c53304d5666..e2949504039 100644 --- a/go/worker/storage/p2p/diffsync/protocol.go +++ b/go/worker/storage/p2p/diffsync/protocol.go @@ -52,7 +52,7 @@ func init() { protocols := make([]core.ProtocolID, len(n.Runtimes)) for i, rt := range n.Runtimes { - protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, DiffSyncProtocolID, DiffSyncProtocolVersion) + protocols[i] = ProtocolID(chainContext, rt.ID) } return protocols diff --git a/go/worker/storage/p2p/pub/client.go b/go/worker/storage/p2p/pub/client.go index 4b75b8f5d32..1590df41cb2 100644 --- a/go/worker/storage/p2p/pub/client.go +++ b/go/worker/storage/p2p/pub/client.go @@ -4,7 +4,6 @@ import ( "context" "github.com/oasisprotocol/oasis-core/go/common" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" ) @@ -64,7 +63,7 @@ func (c *client) Iterate(ctx context.Context, request *IterateRequest) (*ProofRe // NewClient creates a new storage pub protocol client. func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { - pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, StoragePubProtocolID, StoragePubProtocolVersion) + pid := ProtocolID(chainContext, runtimeID) mgr := rpc.NewPeerManager(p2p, pid) rc := rpc.NewClient(p2p.Host(), pid) rc.RegisterListener(mgr) diff --git a/go/worker/storage/p2p/pub/protocol.go b/go/worker/storage/p2p/pub/protocol.go index e3641b77b78..48ac31eff0e 100644 --- a/go/worker/storage/p2p/pub/protocol.go +++ b/go/worker/storage/p2p/pub/protocol.go @@ -58,7 +58,7 @@ func init() { protocols := make([]core.ProtocolID, len(n.Runtimes)) for i, rt := range n.Runtimes { - protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, StoragePubProtocolID, StoragePubProtocolVersion) + protocols[i] = ProtocolID(chainContext, rt.ID) } return protocols diff --git a/go/worker/storage/p2p/synclegacy/client.go b/go/worker/storage/p2p/synclegacy/client.go index 2058b2dae53..702fea0fdad 100644 --- a/go/worker/storage/p2p/synclegacy/client.go +++ b/go/worker/storage/p2p/synclegacy/client.go @@ -7,7 +7,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" ) @@ -129,7 +128,7 @@ func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Cli // Use two separate clients and managers for the same protocol. This is to make sure that peers // are scored differently between the two use cases (syncing diffs vs. syncing checkpoints). We // could consider separating this into two protocols in the future. - pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, StorageSyncProtocolID, StorageSyncProtocolVersion) + pid := ProtocolID(chainContext, runtimeID) rcC := rpc.NewClient(p2p.Host(), pid) mgrC := rpc.NewPeerManager(p2p, pid) diff --git a/go/worker/storage/p2p/synclegacy/protocol.go b/go/worker/storage/p2p/synclegacy/protocol.go index e32d764bca6..e97a05efa09 100644 --- a/go/worker/storage/p2p/synclegacy/protocol.go +++ b/go/worker/storage/p2p/synclegacy/protocol.go @@ -92,7 +92,7 @@ func init() { protocols := make([]core.ProtocolID, len(n.Runtimes)) for i, rt := range n.Runtimes { - protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, StorageSyncProtocolID, StorageSyncProtocolVersion) + protocols[i] = ProtocolID(chainContext, rt.ID) } return protocols From c02ca0c1711574ca2c8316d69bfffa5821c2f97b Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Wed, 30 Jul 2025 21:56:54 +0200 Subject: [PATCH 5/5] go/worker/storage/committee: Factor out checkpointer creation --- go/worker/storage/committee/node.go | 66 +++++++++++++++-------------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index dfa8dbff18f..c4f6a890b4d 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -212,8 +212,38 @@ func NewNode( n.ctx, n.ctxCancel = context.WithCancel(context.Background()) - // Create a new checkpointer. Always create a checkpointer, even if checkpointing is disabled - // in configuration so we can ensure that the genesis checkpoint is available. + // Create a checkpointer (even if checkpointing is disabled) to ensure the genesis checkpoint is available. + checkpointer, err := n.newCheckpointer(n.ctx, commonNode, localStorage) + if err != nil { + return nil, fmt.Errorf("failed to create checkpointer: %w", err) + } + n.checkpointer = checkpointer + + // Register prune handler. + commonNode.Runtime.History().Pruner().RegisterHandler(&pruneHandler{ + logger: n.logger, + node: n, + }) + + // Advertise and serve p2p protocols. + commonNode.P2P.RegisterProtocolServer(synclegacy.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + commonNode.P2P.RegisterProtocolServer(diffsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + if config.GlobalConfig.Storage.Checkpointer.Enabled { + commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + } + if rpcRoleProvider != nil { + commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + } + + // Create p2p protocol clients. + n.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + + return n, nil +} + +func (n *Node) newCheckpointer(ctx context.Context, commonNode *committee.Node, localStorage storageApi.LocalBackend) (checkpoint.Checkpointer, error) { checkInterval := checkpoint.CheckIntervalDisabled if config.GlobalConfig.Storage.Checkpointer.Enabled { checkInterval = config.GlobalConfig.Storage.Checkpointer.CheckInterval @@ -259,39 +289,13 @@ func NewNode( return blk.Header.StorageRoots(), nil }, } - var err error - n.checkpointer, err = checkpoint.NewCheckpointer( - n.ctx, + + return checkpoint.NewCheckpointer( + ctx, localStorage.NodeDB(), localStorage.Checkpointer(), checkpointerCfg, ) - if err != nil { - return nil, fmt.Errorf("failed to create checkpointer: %w", err) - } - - // Register prune handler. - commonNode.Runtime.History().Pruner().RegisterHandler(&pruneHandler{ - logger: n.logger, - node: n, - }) - - // Advertise and serve p2p protocols. - commonNode.P2P.RegisterProtocolServer(synclegacy.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - commonNode.P2P.RegisterProtocolServer(diffsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - if checkInterval != checkpoint.CheckIntervalDisabled { - commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - } - if rpcRoleProvider != nil { - commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - } - - // Create p2p protocol clients. - n.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - - return n, nil } // Service interface.