@@ -11,11 +11,9 @@ import (
1111 "sync"
1212 "time"
1313
14- "github.com/oasisprotocol/oasis-core/go/p2p/rpc"
1514 storageApi "github.com/oasisprotocol/oasis-core/go/storage/api"
1615 "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
1716 "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
18- "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
1917)
2018
2119const (
@@ -103,7 +101,19 @@ func (n *Node) checkpointChunkFetcher(
103101 defer cancel ()
104102
105103 // Fetch chunk from peers.
106- rsp , pf , err := n .fetchChunk (chunkCtx , chunk )
104+ rsp , pf , err := n .checkpointSync .GetCheckpointChunk (
105+ chunkCtx ,
106+ & checkpointsync.GetCheckpointChunkRequest {
107+ Version : chunk .Version ,
108+ Root : chunk .Root ,
109+ Index : chunk .Index ,
110+ Digest : chunk .Digest ,
111+ },
112+ & checkpointsync.Checkpoint {
113+ Metadata : chunk .checkpoint .Metadata ,
114+ Peers : chunk .checkpoint .Peers ,
115+ },
116+ )
107117 if err != nil {
108118 n .logger .Error ("failed to fetch chunk from peers" ,
109119 "err" , err ,
@@ -114,7 +124,7 @@ func (n *Node) checkpointChunkFetcher(
114124 }
115125
116126 // Restore fetched chunk.
117- done , err := n .localStorage .Checkpointer ().RestoreChunk (chunkCtx , chunk .Index , bytes .NewBuffer (rsp ))
127+ done , err := n .localStorage .Checkpointer ().RestoreChunk (chunkCtx , chunk .Index , bytes .NewBuffer (rsp . Chunk ))
118128 cancel ()
119129
120130 switch {
@@ -154,46 +164,6 @@ func (n *Node) checkpointChunkFetcher(
154164 }
155165}
156166
157- // fetchChunk fetches chunk using checkpoint sync p2p protocol client.
158- //
159- // In case of no peers or error, it fallbacks to the legacy storage sync protocol.
160- func (n * Node ) fetchChunk (ctx context.Context , chunk * chunk ) ([]byte , rpc.PeerFeedback , error ) {
161- rsp1 , pf , err := n .checkpointSync .GetCheckpointChunk (
162- ctx ,
163- & checkpointsync.GetCheckpointChunkRequest {
164- Version : chunk .Version ,
165- Root : chunk .Root ,
166- Index : chunk .Index ,
167- Digest : chunk .Digest ,
168- },
169- & checkpointsync.Checkpoint {
170- Metadata : chunk .checkpoint .Metadata ,
171- Peers : chunk .checkpoint .Peers ,
172- },
173- )
174- if err == nil { // if NO error
175- return rsp1 .Chunk , pf , nil
176- }
177-
178- rsp2 , pf , err := n .legacyStorageSync .GetCheckpointChunk (
179- ctx ,
180- & synclegacy.GetCheckpointChunkRequest {
181- Version : chunk .Version ,
182- Root : chunk .Root ,
183- Index : chunk .Index ,
184- Digest : chunk .Digest ,
185- },
186- & synclegacy.Checkpoint {
187- Metadata : chunk .checkpoint .Metadata ,
188- Peers : chunk .checkpoint .Peers ,
189- },
190- )
191- if err != nil {
192- return nil , nil , err
193- }
194- return rsp2 .Chunk , pf , nil
195- }
196-
197167func (n * Node ) handleCheckpoint (check * checkpointsync.Checkpoint , maxParallelRequests uint ) (cpStatus int , rerr error ) {
198168 if err := n .localStorage .Checkpointer ().StartRestore (n .ctx , check .Metadata ); err != nil {
199169 // Any previous restores were already aborted by the driver up the call stack, so
@@ -317,7 +287,9 @@ func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) {
317287 ctx , cancel := context .WithTimeout (n .ctx , cpListsTimeout )
318288 defer cancel ()
319289
320- list , err := n .fetchCheckpoints (ctx )
290+ list , err := n .checkpointSync .GetCheckpoints (ctx , & checkpointsync.GetCheckpointsRequest {
291+ Version : 1 ,
292+ })
321293 if err != nil {
322294 n .logger .Error ("failed to retrieve any checkpoints" ,
323295 "err" , err ,
@@ -331,33 +303,6 @@ func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) {
331303 return list , nil
332304}
333305
334- // fetchCheckpoints fetches checkpoints using checkpoint sync p2p protocol client.
335- //
336- // In case of no peers, error or no checkpoints, it fallbacks to the legacy storage sync protocol.
337- func (n * Node ) fetchCheckpoints (ctx context.Context ) ([]* checkpointsync.Checkpoint , error ) {
338- list1 , err := n .checkpointSync .GetCheckpoints (ctx , & checkpointsync.GetCheckpointsRequest {
339- Version : 1 ,
340- })
341- if err == nil && len (list1 ) > 0 { // if NO error and at least one checkpoint
342- return list1 , nil
343- }
344-
345- list2 , err := n .legacyStorageSync .GetCheckpoints (ctx , & synclegacy.GetCheckpointsRequest {
346- Version : 1 ,
347- })
348- if err != nil {
349- return nil , err
350- }
351- var cps []* checkpointsync.Checkpoint
352- for _ , cp := range list2 {
353- cps = append (cps , & checkpointsync.Checkpoint {
354- Metadata : cp .Metadata ,
355- Peers : cp .Peers ,
356- })
357- }
358- return cps , nil
359- }
360-
361306// sortCheckpoints sorts the slice in-place (descending by version, peers, hash).
362307func sortCheckpoints (s []* checkpointsync.Checkpoint ) {
363308 slices .SortFunc (s , func (a , b * checkpointsync.Checkpoint ) int {
0 commit comments