Skip to content

Commit d8d0e69

Browse files
Merge pull request #6277 from oasisprotocol/martin/feature/split-storagesync-p2p-v2
Split storage sync P2P protocol (manual fallback)
2 parents 7e6e3a2 + c02ca0c commit d8d0e69

File tree

21 files changed

+637
-71
lines changed

21 files changed

+637
-71
lines changed

.changelog/5751.feature.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
go: Split storage sync p2p protocol
2+
3+
Storage sync protocol was split into two independent protocols (checkpoint
4+
and diff sync).
5+
6+
This change was made since there may be fewer nodes that expose checkpoints
7+
than storage diff. Previously, this could lead to issues with state sync
8+
when a node was connected with peers that supported storage sync protocol
9+
but had no checkpoints available.
10+
11+
This was done in backwards compatible manner, so that both protocols are still
12+
advertised and used. Eventually, we plan to remove legacy protocol.

go/oasis-node/cmd/debug/byzantine/node.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api"
2323
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
2424
"github.com/oasisprotocol/oasis-core/go/worker/client"
25-
storageP2P "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
25+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
2626
)
2727

2828
type byzantine struct {
@@ -154,7 +154,7 @@ func initializeAndRegisterByzantineNode(
154154
if err != nil {
155155
return nil, fmt.Errorf("initializing storage node failed: %w", err)
156156
}
157-
b.p2p.service.RegisterProtocolServer(storageP2P.NewServer(b.chainContext, b.runtimeID, storage))
157+
b.p2p.service.RegisterProtocolServer(synclegacy.NewServer(b.chainContext, b.runtimeID, storage))
158158
b.storage = storage
159159

160160
// Wait for activation epoch.

go/p2p/rpc/client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,10 @@ func (c *client) CallMulti(
364364
) ([]any, []PeerFeedback, error) {
365365
c.logger.Debug("call multiple", "method", method)
366366

367+
if len(peers) == 0 {
368+
return nil, nil, fmt.Errorf("no peers given to service the request")
369+
}
370+
367371
co := NewCallMultiOptions(opts...)
368372

369373
// Prepare the request.

go/worker/common/p2p/txsync/client.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55

66
"github.com/oasisprotocol/oasis-core/go/common"
77
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
8-
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
98
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
109
)
1110

@@ -82,7 +81,7 @@ func (c *client) GetTxs(ctx context.Context, request *GetTxsRequest) (*GetTxsRes
8281

8382
// NewClient creates a new transaction sync protocol client.
8483
func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client {
85-
pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, TxSyncProtocolID, TxSyncProtocolVersion)
84+
pid := ProtocolID(chainContext, runtimeID)
8685
mgr := rpc.NewPeerManager(p2p, pid)
8786
rc := rpc.NewClient(p2p.Host(), pid)
8887
rc.RegisterListener(mgr)

go/worker/common/p2p/txsync/protocol.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func init() {
4747

4848
protocols := make([]core.ProtocolID, len(n.Runtimes))
4949
for i, rt := range n.Runtimes {
50-
protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, TxSyncProtocolID, TxSyncProtocolVersion)
50+
protocols[i] = ProtocolID(chainContext, rt.ID)
5151
}
5252

5353
return protocols

go/worker/keymanager/p2p/client.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77

88
"github.com/oasisprotocol/oasis-core/go/common"
99
p2p "github.com/oasisprotocol/oasis-core/go/p2p/api"
10-
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
1110
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
1211
)
1312

@@ -46,7 +45,7 @@ func (c *client) CallEnclave(ctx context.Context, request *CallEnclaveRequest, p
4645

4746
// NewClient creates a new keymanager protocol client.
4847
func NewClient(p2p p2p.Service, chainContext string, keymanagerID common.Namespace) Client {
49-
pid := protocol.NewRuntimeProtocolID(chainContext, keymanagerID, KeyManagerProtocolID, KeyManagerProtocolVersion)
48+
pid := ProtocolID(chainContext, keymanagerID)
5049
mgr := rpc.NewPeerManager(p2p, pid)
5150
rc := rpc.NewClient(p2p.Host(), pid)
5251
rc.RegisterListener(mgr)

go/worker/keymanager/p2p/protocol.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func init() {
5050

5151
protocols := make([]core.ProtocolID, len(n.Runtimes))
5252
for i, rt := range n.Runtimes {
53-
protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, KeyManagerProtocolID, KeyManagerProtocolVersion)
53+
protocols[i] = ProtocolID(chainContext, rt.ID)
5454
}
5555

5656
return protocols

go/worker/storage/committee/checkpoint_sync.go

Lines changed: 80 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ import (
1111
"sync"
1212
"time"
1313

14+
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
1415
storageApi "github.com/oasisprotocol/oasis-core/go/storage/api"
1516
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
16-
storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
17+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
18+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
1719
)
1820

1921
const (
@@ -55,7 +57,7 @@ type chunk struct {
5557
*checkpoint.ChunkMetadata
5658

5759
// checkpoint points to the checkpoint this chunk originated from.
58-
checkpoint *storageSync.Checkpoint
60+
checkpoint *checkpointsync.Checkpoint
5961
}
6062

6163
type chunkHeap struct {
@@ -101,12 +103,7 @@ func (n *Node) checkpointChunkFetcher(
101103
defer cancel()
102104

103105
// Fetch chunk from peers.
104-
rsp, pf, err := n.storageSync.GetCheckpointChunk(chunkCtx, &storageSync.GetCheckpointChunkRequest{
105-
Version: chunk.Version,
106-
Root: chunk.Root,
107-
Index: chunk.Index,
108-
Digest: chunk.Digest,
109-
}, chunk.checkpoint)
106+
rsp, pf, err := n.fetchChunk(chunkCtx, chunk)
110107
if err != nil {
111108
n.logger.Error("failed to fetch chunk from peers",
112109
"err", err,
@@ -117,7 +114,7 @@ func (n *Node) checkpointChunkFetcher(
117114
}
118115

119116
// Restore fetched chunk.
120-
done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp.Chunk))
117+
done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp))
121118
cancel()
122119

123120
switch {
@@ -157,7 +154,47 @@ func (n *Node) checkpointChunkFetcher(
157154
}
158155
}
159156

160-
func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) {
157+
// fetchChunk fetches chunk using checkpoint sync p2p protocol client.
158+
//
159+
// In case of no peers or error, it fallbacks to the legacy storage sync protocol.
160+
func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFeedback, error) {
161+
rsp1, pf, err := n.checkpointSync.GetCheckpointChunk(
162+
ctx,
163+
&checkpointsync.GetCheckpointChunkRequest{
164+
Version: chunk.Version,
165+
Root: chunk.Root,
166+
Index: chunk.Index,
167+
Digest: chunk.Digest,
168+
},
169+
&checkpointsync.Checkpoint{
170+
Metadata: chunk.checkpoint.Metadata,
171+
Peers: chunk.checkpoint.Peers,
172+
},
173+
)
174+
if err == nil { // if NO error
175+
return rsp1.Chunk, pf, nil
176+
}
177+
178+
rsp2, pf, err := n.legacyStorageSync.GetCheckpointChunk(
179+
ctx,
180+
&synclegacy.GetCheckpointChunkRequest{
181+
Version: chunk.Version,
182+
Root: chunk.Root,
183+
Index: chunk.Index,
184+
Digest: chunk.Digest,
185+
},
186+
&synclegacy.Checkpoint{
187+
Metadata: chunk.checkpoint.Metadata,
188+
Peers: chunk.checkpoint.Peers,
189+
},
190+
)
191+
if err != nil {
192+
return nil, nil, err
193+
}
194+
return rsp2.Chunk, pf, nil
195+
}
196+
197+
func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) {
161198
if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check.Metadata); err != nil {
162199
// Any previous restores were already aborted by the driver up the call stack, so
163200
// things should have been going smoothly here; bail.
@@ -276,13 +313,11 @@ func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelReques
276313
}
277314
}
278315

279-
func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) {
316+
func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) {
280317
ctx, cancel := context.WithTimeout(n.ctx, cpListsTimeout)
281318
defer cancel()
282319

283-
list, err := n.storageSync.GetCheckpoints(ctx, &storageSync.GetCheckpointsRequest{
284-
Version: 1,
285-
})
320+
list, err := n.fetchCheckpoints(ctx)
286321
if err != nil {
287322
n.logger.Error("failed to retrieve any checkpoints",
288323
"err", err,
@@ -296,9 +331,36 @@ func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) {
296331
return list, nil
297332
}
298333

334+
// fetchCheckpoints fetches checkpoints using checkpoint sync p2p protocol client.
335+
//
336+
// In case of no peers, error or no checkpoints, it fallbacks to the legacy storage sync protocol.
337+
func (n *Node) fetchCheckpoints(ctx context.Context) ([]*checkpointsync.Checkpoint, error) {
338+
list1, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{
339+
Version: 1,
340+
})
341+
if err == nil && len(list1) > 0 { // if NO error and at least one checkpoint
342+
return list1, nil
343+
}
344+
345+
list2, err := n.legacyStorageSync.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{
346+
Version: 1,
347+
})
348+
if err != nil {
349+
return nil, err
350+
}
351+
var cps []*checkpointsync.Checkpoint
352+
for _, cp := range list2 {
353+
cps = append(cps, &checkpointsync.Checkpoint{
354+
Metadata: cp.Metadata,
355+
Peers: cp.Peers,
356+
})
357+
}
358+
return cps, nil
359+
}
360+
299361
// sortCheckpoints sorts the slice in-place (descending by version, peers, hash).
300-
func sortCheckpoints(s []*storageSync.Checkpoint) {
301-
slices.SortFunc(s, func(a, b *storageSync.Checkpoint) int {
362+
func sortCheckpoints(s []*checkpointsync.Checkpoint) {
363+
slices.SortFunc(s, func(a, b *checkpointsync.Checkpoint) int {
302364
return cmp.Or(
303365
cmp.Compare(b.Root.Version, a.Root.Version),
304366
cmp.Compare(len(b.Peers), len(a.Peers)),
@@ -307,7 +369,7 @@ func sortCheckpoints(s []*storageSync.Checkpoint) {
307369
})
308370
}
309371

310-
func (n *Node) checkCheckpointUsable(cp *storageSync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool {
372+
func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool {
311373
namespace := n.commonNode.Runtime.ID()
312374
if !namespace.Equal(&cp.Root.Namespace) {
313375
// Not for the right runtime.
@@ -357,7 +419,7 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc
357419

358420
// If we only want the genesis checkpoint, filter it out.
359421
if wantOnlyGenesis && len(cps) > 0 {
360-
var filteredCps []*storageSync.Checkpoint
422+
var filteredCps []*checkpointsync.Checkpoint
361423
for _, cp := range cps {
362424
if cp.Root.Version == genesisRound {
363425
filteredCps = append(filteredCps, cp)

go/worker/storage/committee/checkpoint_sync_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,35 +8,35 @@ import (
88
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
99
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
1010
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/node"
11-
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
11+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
1212
)
1313

1414
func TestSortCheckpoints(t *testing.T) {
15-
cp1 := &sync.Checkpoint{
15+
cp1 := &checkpointsync.Checkpoint{
1616
Metadata: &checkpoint.Metadata{
1717
Root: node.Root{
1818
Version: 2,
1919
},
2020
},
2121
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()},
2222
}
23-
cp2 := &sync.Checkpoint{
23+
cp2 := &checkpointsync.Checkpoint{
2424
Metadata: &checkpoint.Metadata{
2525
Root: node.Root{
2626
Version: 2,
2727
},
2828
},
2929
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()},
3030
}
31-
cp3 := &sync.Checkpoint{
31+
cp3 := &checkpointsync.Checkpoint{
3232
Metadata: &checkpoint.Metadata{
3333
Root: node.Root{
3434
Version: 1,
3535
},
3636
},
3737
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()},
3838
}
39-
cp4 := &sync.Checkpoint{
39+
cp4 := &checkpointsync.Checkpoint{
4040
Metadata: &checkpoint.Metadata{
4141
Root: node.Root{
4242
Version: 1,
@@ -45,9 +45,9 @@ func TestSortCheckpoints(t *testing.T) {
4545
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()},
4646
}
4747

48-
s := []*sync.Checkpoint{cp2, cp3, cp4, cp1}
48+
s := []*checkpointsync.Checkpoint{cp2, cp3, cp4, cp1}
4949

5050
sortCheckpoints(s)
5151

52-
assert.Equal(t, s, []*sync.Checkpoint{cp1, cp2, cp3, cp4})
52+
assert.Equal(t, s, []*checkpointsync.Checkpoint{cp1, cp2, cp3, cp4})
5353
}

0 commit comments

Comments
 (0)