Skip to content

Commit 051ea08

Browse files
committed
go/worker/storage/p2p: Split storage sync protocol
1 parent 8a75690 commit 051ea08

File tree

8 files changed

+882
-0
lines changed

8 files changed

+882
-0
lines changed
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package checkpointsync
2+
3+
import (
4+
"context"
5+
6+
"github.com/libp2p/go-libp2p/core"
7+
8+
"github.com/oasisprotocol/oasis-core/go/common"
9+
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
10+
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
11+
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
12+
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
13+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
14+
)
15+
16+
const (
17+
// minProtocolPeers is the minimum number of peers from the registry we want to have connected
18+
// for StorageSync protocol.
19+
minProtocolPeers = 5
20+
21+
// totalProtocolPeers is the number of peers we want to have connected for StorageSync protocol.
22+
totalProtocolPeers = 10
23+
)
24+
25+
// Client is a checkpoints sync protocol client.
26+
type Client interface {
27+
// GetCheckpoints returns a list of checkpoint metadata for all known checkpoints.
28+
GetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) ([]*Checkpoint, error)
29+
30+
// GetCheckpointChunk requests a specific checkpoint chunk.
31+
GetCheckpointChunk(
32+
ctx context.Context,
33+
request *GetCheckpointChunkRequest,
34+
cp *Checkpoint,
35+
) (*GetCheckpointChunkResponse, rpc.PeerFeedback, error)
36+
}
37+
38+
// Checkpoint contains checkpoint metadata together with peer information.
39+
type Checkpoint struct {
40+
*checkpoint.Metadata
41+
42+
// Peers are the feedback structures of all the peers that have advertised this checkpoint.
43+
Peers []rpc.PeerFeedback
44+
}
45+
46+
type client struct {
47+
rc rpc.Client
48+
mgr rpc.PeerManager
49+
fallbackMgr rpc.PeerManager
50+
}
51+
52+
func (c *client) GetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) ([]*Checkpoint, error) {
53+
var rsp GetCheckpointsResponse
54+
rsps, pfs, err := c.rc.CallMulti(ctx, c.getBestPeers(), MethodGetCheckpoints, request, rsp)
55+
if err != nil {
56+
return nil, err
57+
}
58+
59+
// Combine deduplicated results into a single result.
60+
var checkpoints []*Checkpoint
61+
cps := make(map[hash.Hash]*Checkpoint)
62+
for i, peerRsp := range rsps {
63+
peerCps := peerRsp.(*GetCheckpointsResponse).Checkpoints
64+
65+
for _, cpMeta := range peerCps {
66+
h := cpMeta.EncodedHash()
67+
cp := cps[h]
68+
if cp == nil {
69+
cp = &Checkpoint{
70+
Metadata: cpMeta,
71+
}
72+
cps[h] = cp
73+
checkpoints = append(checkpoints, cp)
74+
}
75+
cp.Peers = append(cp.Peers, pfs[i])
76+
}
77+
78+
// Record success for a peer if it returned at least one checkpoint.
79+
if len(peerCps) > 0 {
80+
pfs[i].RecordSuccess()
81+
}
82+
}
83+
return checkpoints, nil
84+
}
85+
86+
func (c *client) GetCheckpointChunk(
87+
ctx context.Context,
88+
request *GetCheckpointChunkRequest,
89+
cp *Checkpoint,
90+
) (*GetCheckpointChunkResponse, rpc.PeerFeedback, error) {
91+
var opts []rpc.BestPeersOption
92+
// When a checkpoint is passed, we limit requests to only those peers that actually advertised
93+
// having the checkpoint in question to avoid needless requests.
94+
if cp != nil {
95+
peers := make([]core.PeerID, 0, len(cp.Peers))
96+
for _, pf := range cp.Peers {
97+
peers = append(peers, pf.PeerID())
98+
}
99+
opts = append(opts, rpc.WithLimitPeers(peers))
100+
}
101+
102+
var rsp GetCheckpointChunkResponse
103+
peers := c.getBestPeers(opts...)
104+
pf, err := c.rc.CallOne(ctx, peers, MethodGetCheckpointChunk, request, &rsp,
105+
rpc.WithMaxPeerResponseTime(MaxGetCheckpointChunkResponseTime),
106+
)
107+
if err != nil {
108+
return nil, nil, err
109+
}
110+
return &rsp, pf, nil
111+
}
112+
113+
func (c *client) getBestPeers(opts ...rpc.BestPeersOption) []core.PeerID {
114+
return append(c.mgr.GetBestPeers(opts...), c.fallbackMgr.GetBestPeers(opts...)...)
115+
}
116+
117+
// NewClient creates a new checkpoint sync protocol client.
118+
//
119+
// Previously, it was part of the storage sync protocol. To enable seamless rolling
120+
// upgrades of the network, this client has a fallback to the old legacy protocol.
121+
// The new protocol is prioritized.
122+
//
123+
// Warning: This client only registers the checkpoint sync protocol with the P2P
124+
// service. To enable advertisement of the legacy protocol, it must be registered
125+
// separately.
126+
func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client {
127+
pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion)
128+
fPid := sync.GetStorageSyncProtocolID(chainContext, runtimeID)
129+
rc := rpc.NewClient(p2p.Host(), pid, fPid)
130+
mgr := rpc.NewPeerManager(p2p, pid)
131+
rc.RegisterListener(mgr)
132+
133+
// Fallback protocol requires a separate manager to manage peers that also support legacy protocol.
134+
fMgr := rpc.NewPeerManager(p2p, fPid)
135+
rc.RegisterListener(fMgr)
136+
137+
p2p.RegisterProtocol(pid, minProtocolPeers, totalProtocolPeers)
138+
139+
return &client{
140+
rc: rc,
141+
mgr: mgr,
142+
fallbackMgr: fMgr,
143+
}
144+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Package checkpointsync defines wire protocol together with client/server
2+
// implementations for the checkpoints sync protocol, used for runtime state sync.
3+
package checkpointsync
4+
5+
import (
6+
"time"
7+
8+
"github.com/libp2p/go-libp2p/core"
9+
10+
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
11+
"github.com/oasisprotocol/oasis-core/go/common/node"
12+
"github.com/oasisprotocol/oasis-core/go/common/version"
13+
"github.com/oasisprotocol/oasis-core/go/p2p/peermgmt"
14+
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
15+
"github.com/oasisprotocol/oasis-core/go/storage/api"
16+
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
17+
)
18+
19+
// CheckpointSyncProtocolID is a unique protocol identifier for the checkpoint sync protocol.
20+
const CheckpointSyncProtocolID = "checkpointsync"
21+
22+
// CheckpointSyncProtocolVersion is the supported version of the checkpoint sync protocol.
23+
var CheckpointSyncProtocolVersion = version.Version{Major: 1, Minor: 0, Patch: 0}
24+
25+
// Constants related to the GetCheckpoints method.
26+
const (
27+
MethodGetCheckpoints = "GetCheckpoints"
28+
)
29+
30+
// GetCheckpointsRequest is a GetCheckpoints request.
31+
type GetCheckpointsRequest struct {
32+
Version uint16 `json:"version"`
33+
}
34+
35+
// GetCheckpointsResponse is a response to a GetCheckpoints request.
36+
type GetCheckpointsResponse struct {
37+
Checkpoints []*checkpoint.Metadata `json:"checkpoints,omitempty"`
38+
}
39+
40+
// Constants related to the GetCheckpointChunk method.
41+
const (
42+
MethodGetCheckpointChunk = "GetCheckpointChunk"
43+
MaxGetCheckpointChunkResponseTime = 60 * time.Second
44+
)
45+
46+
// GetCheckpointChunkRequest is a GetCheckpointChunk request.
47+
type GetCheckpointChunkRequest struct {
48+
Version uint16 `json:"version"`
49+
Root api.Root `json:"root"`
50+
Index uint64 `json:"index"`
51+
Digest hash.Hash `json:"digest"`
52+
}
53+
54+
// GetCheckpointChunkResponse is a response to a GetCheckpointChunk request.
55+
type GetCheckpointChunkResponse struct {
56+
Chunk []byte `json:"chunk,omitempty"`
57+
}
58+
59+
func init() {
60+
peermgmt.RegisterNodeHandler(&peermgmt.NodeHandlerBundle{
61+
ProtocolsFn: func(n *node.Node, chainContext string) []core.ProtocolID {
62+
if !n.HasRoles(node.RoleComputeWorker | node.RoleStorageRPC) {
63+
return []core.ProtocolID{}
64+
}
65+
66+
protocols := make([]core.ProtocolID, len(n.Runtimes))
67+
for i, rt := range n.Runtimes {
68+
protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion)
69+
}
70+
71+
return protocols
72+
},
73+
})
74+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package checkpointsync
2+
3+
import (
4+
"bytes"
5+
"context"
6+
7+
"github.com/oasisprotocol/oasis-core/go/common"
8+
"github.com/oasisprotocol/oasis-core/go/common/cbor"
9+
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
10+
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
11+
"github.com/oasisprotocol/oasis-core/go/storage/api"
12+
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
13+
)
14+
15+
type service struct {
16+
backend api.Backend
17+
}
18+
19+
func (s *service) HandleRequest(ctx context.Context, method string, body cbor.RawMessage) (any, error) {
20+
switch method {
21+
case MethodGetCheckpoints:
22+
var rq GetCheckpointsRequest
23+
if err := cbor.Unmarshal(body, &rq); err != nil {
24+
return nil, rpc.ErrBadRequest
25+
}
26+
27+
return s.handleGetCheckpoints(ctx, &rq)
28+
case MethodGetCheckpointChunk:
29+
var rq GetCheckpointChunkRequest
30+
if err := cbor.Unmarshal(body, &rq); err != nil {
31+
return nil, rpc.ErrBadRequest
32+
}
33+
34+
return s.handleGetCheckpointChunk(ctx, &rq)
35+
default:
36+
return nil, rpc.ErrMethodNotSupported
37+
}
38+
}
39+
40+
func (s *service) handleGetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) (*GetCheckpointsResponse, error) {
41+
cps, err := s.backend.GetCheckpoints(ctx, &checkpoint.GetCheckpointsRequest{
42+
Version: request.Version,
43+
})
44+
if err != nil {
45+
return nil, err
46+
}
47+
48+
return &GetCheckpointsResponse{
49+
Checkpoints: cps,
50+
}, nil
51+
}
52+
53+
func (s *service) handleGetCheckpointChunk(ctx context.Context, request *GetCheckpointChunkRequest) (*GetCheckpointChunkResponse, error) {
54+
// TODO: Use stream resource manager to track buffer use.
55+
var buf bytes.Buffer
56+
err := s.backend.GetCheckpointChunk(ctx, &checkpoint.ChunkMetadata{
57+
Version: request.Version,
58+
Root: request.Root,
59+
Index: request.Index,
60+
Digest: request.Digest,
61+
}, &buf)
62+
if err != nil {
63+
return nil, err
64+
}
65+
66+
return &GetCheckpointChunkResponse{
67+
Chunk: buf.Bytes(),
68+
}, nil
69+
}
70+
71+
// NewServer creates a new checkpoints protocol server.
72+
func NewServer(chainContext string, runtimeID common.Namespace, backend api.Backend) rpc.Server {
73+
return rpc.NewServer(protocol.NewRuntimeProtocolID(chainContext, runtimeID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion), &service{backend})
74+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package diffsync
2+
3+
import (
4+
"context"
5+
6+
"github.com/oasisprotocol/oasis-core/go/common"
7+
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
8+
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
9+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
10+
)
11+
12+
const (
13+
// minProtocolPeers is the minimum number of peers from the registry we want to have connected
14+
// for storage diff protocol.
15+
minProtocolPeers = 5
16+
17+
// totalProtocolPeers is the number of peers we want to have connected for storage diff protocol.
18+
totalProtocolPeers = 10
19+
)
20+
21+
// Client is a diff sync protocol client.
22+
type Client interface {
23+
// GetDiff requests a write log of entries that must be applied to get from the first given root
24+
// to the second one.
25+
GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error)
26+
}
27+
28+
type client struct {
29+
rc rpc.Client
30+
mgr rpc.PeerManager
31+
fallbackMgr rpc.PeerManager
32+
}
33+
34+
func (c *client) GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) {
35+
var rsp GetDiffResponse
36+
peers := append(c.mgr.GetBestPeers(), c.fallbackMgr.GetBestPeers()...)
37+
pf, err := c.rc.CallOne(ctx, peers, MethodGetDiff, request, &rsp,
38+
rpc.WithMaxPeerResponseTime(MaxGetDiffResponseTime),
39+
)
40+
if err != nil {
41+
return nil, nil, err
42+
}
43+
return &rsp, pf, nil
44+
}
45+
46+
// NewClient creates a new diff sync protocol client.
47+
//
48+
// Previously, it was part of the storage sync protocol. To enable seamless rolling
49+
// upgrades of the network, this client has a fallback to the old legacy protocol.
50+
// The new protocol is prioritized.
51+
//
52+
// Warning: This client only registers the diff sync protocol with the P2P
53+
// service. To enable advertisement of the legacy protocol, it must be registered
54+
// separately.
55+
func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client {
56+
pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, DiffSyncProtocolID, DiffSyncProtocolVersion)
57+
fPid := sync.GetStorageSyncProtocolID(chainContext, runtimeID)
58+
rc := rpc.NewClient(p2p.Host(), pid, fPid)
59+
mgr := rpc.NewPeerManager(p2p, pid)
60+
rc.RegisterListener(mgr)
61+
62+
// Fallback protocol requires a separate manager to manage peers that also support legacy protocol.
63+
fMgr := rpc.NewPeerManager(p2p, fPid)
64+
rc.RegisterListener(fMgr)
65+
66+
p2p.RegisterProtocol(pid, minProtocolPeers, totalProtocolPeers)
67+
68+
return &client{
69+
rc: rc,
70+
mgr: mgr,
71+
fallbackMgr: fMgr,
72+
}
73+
}

0 commit comments

Comments
 (0)