Skip to content

Commit 4109161

Browse files
committed
go/worker/storage: Integrate new p2p protocols
Legacy storage sync protocol is still advertised and served to enable seamless rolling upgrades of the network.
1 parent 1d5433d commit 4109161

File tree

12 files changed

+74
-39
lines changed

12 files changed

+74
-39
lines changed

.changelog/6262.feature.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
go: Split storage sync p2p protocol
2+
3+
Storage sync protocol was split into two independent protocols (checkpoint
4+
and diff sync).
5+
6+
This change was made since there may be fewer nodes that expose checkpoints
7+
than storage diff. Previously, this could lead to issues with state sync
8+
when a node was connected with peers that supported storage sync protocol
9+
but had no checkpoints available.
10+
11+
This was done in backwards compatible manner, so that both protocols are still
12+
advertised and used. Eventually, we plan to remove legacy protocol.

go/oasis-node/cmd/debug/byzantine/node.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api"
2323
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
2424
"github.com/oasisprotocol/oasis-core/go/worker/client"
25-
storageP2P "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
25+
storageP2P "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
2626
)
2727

2828
type byzantine struct {

go/worker/storage/committee/checkpoint_sync.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313

1414
storageApi "github.com/oasisprotocol/oasis-core/go/storage/api"
1515
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
16-
storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
16+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
1717
)
1818

1919
const (
@@ -55,7 +55,7 @@ type chunk struct {
5555
*checkpoint.ChunkMetadata
5656

5757
// checkpoint points to the checkpoint this chunk originated from.
58-
checkpoint *storageSync.Checkpoint
58+
checkpoint *checkpointsync.Checkpoint
5959
}
6060

6161
type chunkHeap struct {
@@ -101,7 +101,7 @@ func (n *Node) checkpointChunkFetcher(
101101
defer cancel()
102102

103103
// Fetch chunk from peers.
104-
rsp, pf, err := n.storageSync.GetCheckpointChunk(chunkCtx, &storageSync.GetCheckpointChunkRequest{
104+
rsp, pf, err := n.checkpointSync.GetCheckpointChunk(chunkCtx, &checkpointsync.GetCheckpointChunkRequest{
105105
Version: chunk.Version,
106106
Root: chunk.Root,
107107
Index: chunk.Index,
@@ -157,7 +157,7 @@ func (n *Node) checkpointChunkFetcher(
157157
}
158158
}
159159

160-
func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) {
160+
func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) {
161161
if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check.Metadata); err != nil {
162162
// Any previous restores were already aborted by the driver up the call stack, so
163163
// things should have been going smoothly here; bail.
@@ -276,11 +276,11 @@ func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelReques
276276
}
277277
}
278278

279-
func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) {
279+
func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) {
280280
ctx, cancel := context.WithTimeout(n.ctx, cpListsTimeout)
281281
defer cancel()
282282

283-
list, err := n.storageSync.GetCheckpoints(ctx, &storageSync.GetCheckpointsRequest{
283+
list, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{
284284
Version: 1,
285285
})
286286
if err != nil {
@@ -297,8 +297,8 @@ func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) {
297297
}
298298

299299
// sortCheckpoints sorts the slice in-place (descending by version, peers, hash).
300-
func sortCheckpoints(s []*storageSync.Checkpoint) {
301-
slices.SortFunc(s, func(a, b *storageSync.Checkpoint) int {
300+
func sortCheckpoints(s []*checkpointsync.Checkpoint) {
301+
slices.SortFunc(s, func(a, b *checkpointsync.Checkpoint) int {
302302
return cmp.Or(
303303
cmp.Compare(b.Root.Version, a.Root.Version),
304304
cmp.Compare(len(b.Peers), len(a.Peers)),
@@ -307,7 +307,7 @@ func sortCheckpoints(s []*storageSync.Checkpoint) {
307307
})
308308
}
309309

310-
func (n *Node) checkCheckpointUsable(cp *storageSync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool {
310+
func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool {
311311
namespace := n.commonNode.Runtime.ID()
312312
if !namespace.Equal(&cp.Root.Namespace) {
313313
// Not for the right runtime.
@@ -357,7 +357,7 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc
357357

358358
// If we only want the genesis checkpoint, filter it out.
359359
if wantOnlyGenesis && len(cps) > 0 {
360-
var filteredCps []*storageSync.Checkpoint
360+
var filteredCps []*checkpointsync.Checkpoint
361361
for _, cp := range cps {
362362
if cp.Root.Version == genesisRound {
363363
filteredCps = append(filteredCps, cp)

go/worker/storage/committee/checkpoint_sync_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,35 +8,35 @@ import (
88
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
99
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
1010
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/node"
11-
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
11+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
1212
)
1313

1414
func TestSortCheckpoints(t *testing.T) {
15-
cp1 := &sync.Checkpoint{
15+
cp1 := &checkpointsync.Checkpoint{
1616
Metadata: &checkpoint.Metadata{
1717
Root: node.Root{
1818
Version: 2,
1919
},
2020
},
2121
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()},
2222
}
23-
cp2 := &sync.Checkpoint{
23+
cp2 := &checkpointsync.Checkpoint{
2424
Metadata: &checkpoint.Metadata{
2525
Root: node.Root{
2626
Version: 2,
2727
},
2828
},
2929
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()},
3030
}
31-
cp3 := &sync.Checkpoint{
31+
cp3 := &checkpointsync.Checkpoint{
3232
Metadata: &checkpoint.Metadata{
3333
Root: node.Root{
3434
Version: 1,
3535
},
3636
},
3737
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()},
3838
}
39-
cp4 := &sync.Checkpoint{
39+
cp4 := &checkpointsync.Checkpoint{
4040
Metadata: &checkpoint.Metadata{
4141
Root: node.Root{
4242
Version: 1,
@@ -45,9 +45,9 @@ func TestSortCheckpoints(t *testing.T) {
4545
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()},
4646
}
4747

48-
s := []*sync.Checkpoint{cp2, cp3, cp4, cp1}
48+
s := []*checkpointsync.Checkpoint{cp2, cp3, cp4, cp1}
4949

5050
sortCheckpoints(s)
5151

52-
assert.Equal(t, s, []*sync.Checkpoint{cp1, cp2, cp3, cp4})
52+
assert.Equal(t, s, []*checkpointsync.Checkpoint{cp1, cp2, cp3, cp4})
5353
}

go/worker/storage/committee/node.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ import (
3232
"github.com/oasisprotocol/oasis-core/go/worker/common/committee"
3333
"github.com/oasisprotocol/oasis-core/go/worker/registration"
3434
"github.com/oasisprotocol/oasis-core/go/worker/storage/api"
35+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
36+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync"
3537
storagePub "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/pub"
36-
storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
38+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
3739
)
3840

3941
var (
@@ -128,7 +130,8 @@ type Node struct { // nolint: maligned
128130

129131
localStorage storageApi.LocalBackend
130132

131-
storageSync storageSync.Client
133+
diffSync diffsync.Client
134+
checkpointSync checkpointsync.Client
132135

133136
undefinedRound uint64
134137

@@ -272,9 +275,23 @@ func NewNode(
272275
node: n,
273276
})
274277

275-
// Register storage sync service.
276-
commonNode.P2P.RegisterProtocolServer(storageSync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
277-
n.storageSync = storageSync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID())
278+
// Advertise and serve legacy storage sync protocol.
279+
// TODO #6270 will move advertisement part out of the protocol registration.
280+
commonNode.P2P.RegisterProtocolServer(synclegacy.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
281+
commonNode.P2P.RegisterProtocol(synclegacy.GetStorageSyncProtocolID(commonNode.ChainContext, commonNode.Runtime.ID()), 5, 10)
282+
283+
// Register diff sync protocol server.
284+
commonNode.P2P.RegisterProtocolServer(diffsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
285+
// Register checkpoint sync protocol server if checkpoints are enabled.
286+
if checkInterval < checkpoint.CheckIntervalDisabled {
287+
commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
288+
}
289+
290+
// Create diff and checkpoint sync p2p protocol clients that have a fallback to the old legacy storage sync protocol.
291+
// TODO: This automatically starts advertising the given protocol even if the server is not registered.
292+
// #6270 wil fix that by moving advertisement out of the client creation.
293+
n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID())
294+
n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID())
278295

279296
// Register storage pub service if configured.
280297
if rpcRoleProvider != nil {
@@ -430,7 +447,7 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) {
430447
ctx, cancel := context.WithCancel(n.ctx)
431448
defer cancel()
432449

433-
rsp, pf, err := n.storageSync.GetDiff(ctx, &storageSync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot})
450+
rsp, pf, err := n.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot})
434451
if err != nil {
435452
result.err = err
436453
return

go/worker/storage/p2p/checkpointsync/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
1111
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
1212
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
13-
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
13+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
1414
)
1515

1616
const (
@@ -125,7 +125,7 @@ func (c *client) getBestPeers(opts ...rpc.BestPeersOption) []core.PeerID {
125125
// separately.
126126
func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client {
127127
pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion)
128-
fallbackPid := sync.GetStorageSyncProtocolID(chainContext, runtimeID)
128+
fallbackPid := synclegacy.GetStorageSyncProtocolID(chainContext, runtimeID)
129129
rc := rpc.NewClient(p2p.Host(), pid, fallbackPid)
130130
mgr := rpc.NewPeerManager(p2p, pid)
131131
rc.RegisterListener(mgr)

go/worker/storage/p2p/checkpointsync/protocol.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// Package checkpointsync defines wire protocol together with client/server
2-
// implementations for the checkpoints sync protocol, used for runtime state sync.
2+
// implementations for the checkpoint sync protocol, used for runtime state sync.
33
package checkpointsync
44

55
import (

go/worker/storage/p2p/diffsync/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"github.com/oasisprotocol/oasis-core/go/common"
77
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
88
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
9-
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
9+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
1010
)
1111

1212
const (
@@ -54,7 +54,7 @@ func (c *client) GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiff
5454
// separately.
5555
func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client {
5656
pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, DiffSyncProtocolID, DiffSyncProtocolVersion)
57-
fallbackPid := sync.GetStorageSyncProtocolID(chainContext, runtimeID)
57+
fallbackPid := synclegacy.GetStorageSyncProtocolID(chainContext, runtimeID)
5858
rc := rpc.NewClient(p2p.Host(), pid, fallbackPid)
5959
mgr := rpc.NewPeerManager(p2p, pid)
6060
rc.RegisterListener(mgr)

go/worker/storage/p2p/sync_test/interoperable_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/writelog"
3131
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
3232
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync"
33-
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
33+
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
3434
)
3535

3636
var (
@@ -116,18 +116,18 @@ func test(t *testing.T, dataDir string, legacyHost bool, peerKind peerKind) {
116116
func testLegacyHostClient(ctx context.Context, t *testing.T, host p2pApi.Service, backend storageApi.Backend) {
117117
require := require.New(t)
118118

119-
client := sync.NewClient(host, chainContext, runtimeID)
119+
client := synclegacy.NewClient(host, chainContext, runtimeID)
120120
time.Sleep(2 * time.Second)
121121

122122
// Test diff part of the storagesync protocol.
123-
rsp, _, err := client.GetDiff(ctx, &sync.GetDiffRequest{})
123+
rsp, _, err := client.GetDiff(ctx, &synclegacy.GetDiffRequest{})
124124
require.NoError(err, "Fetch storage diff from p2p")
125125

126126
err = assertEqualGetDiffResponse(ctx, backend, rsp.WriteLog)
127127
require.NoError(err, "Assert expected storage diff response")
128128

129129
// Test checkpoints part of the storagesync protocol.
130-
cps, err := client.GetCheckpoints(ctx, &sync.GetCheckpointsRequest{
130+
cps, err := client.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{
131131
Version: 1,
132132
})
133133
require.NoError(err, "Fetch checkpoints from p2p")
@@ -136,7 +136,7 @@ func testLegacyHostClient(ctx context.Context, t *testing.T, host p2pApi.Service
136136
Namespace: runtimeID,
137137
})
138138
require.NoError(err, "Fetch expected storage diff from backend")
139-
getMeta := func(cp *sync.Checkpoint) *checkpoint.Metadata { return cp.Metadata }
139+
getMeta := func(cp *synclegacy.Checkpoint) *checkpoint.Metadata { return cp.Metadata }
140140
err = assertEqualCheckpoints(cps, want, getMeta)
141141
require.NoError(err, "Assert expected checkpoints response")
142142
}
@@ -278,10 +278,10 @@ func mustStartNewPeer(t *testing.T, dataDir string, id int, backend storageApi.B
278278

279279
switch kind {
280280
case legacy:
281-
serverLegacy := sync.NewServer(chainContext, runtimeID, backend)
281+
serverLegacy := synclegacy.NewServer(chainContext, runtimeID, backend)
282282
p2p.RegisterProtocolServer(serverLegacy)
283283
case all:
284-
serverLegacy := sync.NewServer(chainContext, runtimeID, backend)
284+
serverLegacy := synclegacy.NewServer(chainContext, runtimeID, backend)
285285
p2p.RegisterProtocolServer(serverLegacy)
286286
diff := diffsync.NewServer(chainContext, runtimeID, backend)
287287
p2p.RegisterProtocolServer(diff)

go/worker/storage/p2p/sync/client.go renamed to go/worker/storage/p2p/synclegacy/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package sync
1+
package synclegacy
22

33
import (
44
"context"

0 commit comments

Comments
 (0)