Skip to content

Commit 02e387a

Browse files
committed
go/worker/storage: Remove legacy storagy sync p2p protocol
1 parent 5a909da commit 02e387a

File tree

8 files changed

+26
-573
lines changed

8 files changed

+26
-573
lines changed

.changelog/6261.internal.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
go/worker/storage: Remove legacy storage sync p2p protocol
2+
3+
The node will now only server new checkpoint and diff sync
4+
p2p protocols. This is backward compatible since legacy
5+
clients still serve both legacy and new protocols.

go/oasis-test-runner/scenario/e2e/runtime/checkpoint_sync.go

Lines changed: 0 additions & 118 deletions
This file was deleted.

go/oasis-test-runner/scenario/e2e/runtime/scenario.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,6 @@ func RegisterScenarios() error {
342342
StorageSyncFromRegistered,
343343
StorageSyncInconsistent,
344344
StorageEarlyStateSync,
345-
CheckpointSync,
346345
// Sentry test.
347346
Sentry,
348347
// Keymanager tests.

go/worker/storage/committee/checkpoint_sync.go

Lines changed: 17 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,9 @@ import (
1111
"sync"
1212
"time"
1313

14-
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
1514
storageApi "github.com/oasisprotocol/oasis-core/go/storage/api"
1615
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
1716
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
18-
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
1917
)
2018

2119
const (
@@ -103,7 +101,19 @@ func (n *Node) checkpointChunkFetcher(
103101
defer cancel()
104102

105103
// Fetch chunk from peers.
106-
rsp, pf, err := n.fetchChunk(chunkCtx, chunk)
104+
rsp, pf, err := n.checkpointSync.GetCheckpointChunk(
105+
chunkCtx,
106+
&checkpointsync.GetCheckpointChunkRequest{
107+
Version: chunk.Version,
108+
Root: chunk.Root,
109+
Index: chunk.Index,
110+
Digest: chunk.Digest,
111+
},
112+
&checkpointsync.Checkpoint{
113+
Metadata: chunk.checkpoint.Metadata,
114+
Peers: chunk.checkpoint.Peers,
115+
},
116+
)
107117
if err != nil {
108118
n.logger.Error("failed to fetch chunk from peers",
109119
"err", err,
@@ -114,7 +124,7 @@ func (n *Node) checkpointChunkFetcher(
114124
}
115125

116126
// Restore fetched chunk.
117-
done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp))
127+
done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp.Chunk))
118128
cancel()
119129

120130
switch {
@@ -154,46 +164,6 @@ func (n *Node) checkpointChunkFetcher(
154164
}
155165
}
156166

157-
// fetchChunk fetches chunk using checkpoint sync p2p protocol client.
158-
//
159-
// In case of no peers or error, it fallbacks to the legacy storage sync protocol.
160-
func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFeedback, error) {
161-
rsp1, pf, err := n.checkpointSync.GetCheckpointChunk(
162-
ctx,
163-
&checkpointsync.GetCheckpointChunkRequest{
164-
Version: chunk.Version,
165-
Root: chunk.Root,
166-
Index: chunk.Index,
167-
Digest: chunk.Digest,
168-
},
169-
&checkpointsync.Checkpoint{
170-
Metadata: chunk.checkpoint.Metadata,
171-
Peers: chunk.checkpoint.Peers,
172-
},
173-
)
174-
if err == nil { // if NO error
175-
return rsp1.Chunk, pf, nil
176-
}
177-
178-
rsp2, pf, err := n.legacyStorageSync.GetCheckpointChunk(
179-
ctx,
180-
&synclegacy.GetCheckpointChunkRequest{
181-
Version: chunk.Version,
182-
Root: chunk.Root,
183-
Index: chunk.Index,
184-
Digest: chunk.Digest,
185-
},
186-
&synclegacy.Checkpoint{
187-
Metadata: chunk.checkpoint.Metadata,
188-
Peers: chunk.checkpoint.Peers,
189-
},
190-
)
191-
if err != nil {
192-
return nil, nil, err
193-
}
194-
return rsp2.Chunk, pf, nil
195-
}
196-
197167
func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) {
198168
if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check.Metadata); err != nil {
199169
// Any previous restores were already aborted by the driver up the call stack, so
@@ -317,7 +287,9 @@ func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) {
317287
ctx, cancel := context.WithTimeout(n.ctx, cpListsTimeout)
318288
defer cancel()
319289

320-
list, err := n.fetchCheckpoints(ctx)
290+
list, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{
291+
Version: 1,
292+
})
321293
if err != nil {
322294
n.logger.Error("failed to retrieve any checkpoints",
323295
"err", err,
@@ -331,33 +303,6 @@ func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) {
331303
return list, nil
332304
}
333305

334-
// fetchCheckpoints fetches checkpoints using checkpoint sync p2p protocol client.
335-
//
336-
// In case of no peers or error, it fallbacks to the legacy storage sync protocol.
337-
func (n *Node) fetchCheckpoints(ctx context.Context) ([]*checkpointsync.Checkpoint, error) {
338-
list1, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{
339-
Version: 1,
340-
})
341-
if err == nil && len(list1) > 0 { // if NO error and at least one checkpoint
342-
return list1, nil
343-
}
344-
345-
list2, err := n.legacyStorageSync.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{
346-
Version: 1,
347-
})
348-
if err != nil {
349-
return nil, err
350-
}
351-
var cps []*checkpointsync.Checkpoint
352-
for _, cp := range list2 {
353-
cps = append(cps, &checkpointsync.Checkpoint{
354-
Metadata: cp.Metadata,
355-
Peers: cp.Peers,
356-
})
357-
}
358-
return cps, nil
359-
}
360-
361306
// sortCheckpoints sorts the slice in-place (descending by version, peers, hash).
362307
func sortCheckpoints(s []*checkpointsync.Checkpoint) {
363308
slices.SortFunc(s, func(a, b *checkpointsync.Checkpoint) int {

go/worker/storage/committee/node.go

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
3636
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync"
3737
storagePub "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/pub"
38-
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
3938
)
4039

4140
var (
@@ -130,9 +129,8 @@ type Node struct { // nolint: maligned
130129

131130
localStorage storageApi.LocalBackend
132131

133-
diffSync diffsync.Client
134-
checkpointSync checkpointsync.Client
135-
legacyStorageSync synclegacy.Client
132+
diffSync diffsync.Client
133+
checkpointSync checkpointsync.Client
136134

137135
undefinedRound uint64
138136

@@ -226,7 +224,6 @@ func NewNode(
226224
})
227225

228226
// Advertise and serve p2p protocols.
229-
commonNode.P2P.RegisterProtocolServer(synclegacy.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
230227
commonNode.P2P.RegisterProtocolServer(diffsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
231228
if config.GlobalConfig.Storage.Checkpointer.Enabled {
232229
commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
@@ -236,7 +233,6 @@ func NewNode(
236233
}
237234

238235
// Create p2p protocol clients.
239-
n.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID())
240236
n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID())
241237
n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID())
242238

@@ -444,29 +440,13 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) {
444440
ctx, cancel := context.WithCancel(n.ctx)
445441
defer cancel()
446442

447-
wl, pf, err := n.getDiff(ctx, prevRoot, thisRoot)
443+
rsp, pf, err := n.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot})
448444
if err != nil {
449445
result.err = err
450446
return
451447
}
452448
result.pf = pf
453-
result.writeLog = wl
454-
}
455-
456-
// getDiff fetches writelog using diff sync p2p protocol client.
457-
//
458-
// In case of no peers or error, it fallbacks to the legacy storage sync protocol.
459-
func (n *Node) getDiff(ctx context.Context, prevRoot, thisRoot storageApi.Root) (storageApi.WriteLog, rpc.PeerFeedback, error) {
460-
rsp1, pf, err := n.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot})
461-
if err == nil { // if NO error
462-
return rsp1.WriteLog, pf, nil
463-
}
464-
465-
rsp2, pf, err := n.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot})
466-
if err != nil {
467-
return nil, nil, err
468-
}
469-
return rsp2.WriteLog, pf, nil
449+
result.writeLog = rsp.WriteLog
470450
}
471451

472452
func (n *Node) finalize(summary *blockSummary) {

0 commit comments

Comments
 (0)