Skip to content

Commit f32e6a8

Browse files
authored
minor Mk20 fixes (#672)
* minor Mk20 fixes * client fix * use lo instead of for loop
1 parent 58cebe6 commit f32e6a8

File tree

15 files changed

+140
-97
lines changed

15 files changed

+140
-97
lines changed

cmd/sptool/toolbox_deal_client.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2228,6 +2228,7 @@ var mk20PDPDealCmd = &cli.Command{
22282228
removeRoot := cctx.Bool("remove-piece")
22292229
removeProofset := cctx.Bool("remove-dataset")
22302230
recordKeeper := cctx.String("record-keeper")
2231+
rootIDSet := cctx.IsSet("piece-id")
22312232
rootIDs := cctx.Uint64Slice("piece-id")
22322233
proofSetSet := cctx.IsSet("dataset-id")
22332234
proofsetID := cctx.Uint64("dataset-id")
@@ -2240,6 +2241,9 @@ var mk20PDPDealCmd = &cli.Command{
22402241
}
22412242

22422243
if addRoot {
2244+
if !proofSetSet {
2245+
return xerrors.Errorf("proofset-id must be set when adding a piece")
2246+
}
22432247
commp := cctx.String("pcidv2")
22442248
pieceCid, err := cid.Parse(commp)
22452249
if err != nil {
@@ -2313,7 +2317,7 @@ var mk20PDPDealCmd = &cli.Command{
23132317
return err
23142318
}
23152319
}
2316-
id, err := pclient.AddPieceWithAggregate(ctx, walletAddr.String(), recordKeeper, nil, &proofsetID, pieceCid, true, false, mk20.AggregateTypeNone, pieces)
2320+
id, err := pclient.AddPieceWithAggregate(ctx, walletAddr.String(), recordKeeper, nil, &proofsetID, pieceCid, true, false, mk20.AggregateTypeV1, pieces)
23172321
if err != nil {
23182322
return xerrors.Errorf("failed to add piece: %w", err)
23192323
}
@@ -2354,6 +2358,11 @@ var mk20PDPDealCmd = &cli.Command{
23542358
if !proofSetSet {
23552359
return xerrors.Errorf("proofset-id must be set when removing a root")
23562360
}
2361+
2362+
if !rootIDSet {
2363+
return xerrors.Errorf("piece-id must be set when removing a root")
2364+
}
2365+
23572366
id, err := pclient.RemovePiece(ctx, walletAddr.String(), recordKeeper, nil, &proofsetID, rootIDs)
23582367
if err != nil {
23592368
return xerrors.Errorf("failed to remove piece: %w", err)

lib/cachedreader/cachedreader.go

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (r *cachedSectionReader) Close() error {
133133
return nil
134134
}
135135

136-
func (cpr *CachedPieceReader) getPieceReaderFromMarketPieceDeal(ctx context.Context, pieceCidV2 cid.Cid) (storiface.Reader, uint64, error) {
136+
func (cpr *CachedPieceReader) getPieceReaderFromMarketPieceDeal(ctx context.Context, pieceCidV2 cid.Cid, retrieval bool) (storiface.Reader, uint64, error) {
137137
// Get all deals containing this piece
138138

139139
commp, err := commcidv2.CommPFromPCidV2(pieceCidV2)
@@ -175,7 +175,14 @@ func (cpr *CachedPieceReader) getPieceReaderFromMarketPieceDeal(ctx context.Cont
175175
}
176176

177177
if len(deals) == 0 {
178-
return nil, 0, fmt.Errorf("piece cid %s: %w", pieceCid, ErrNoDeal)
178+
if retrieval {
179+
return nil, 0, fmt.Errorf("piece cid %s: %w", pieceCid, ErrNoDeal)
180+
}
181+
reader, rawSize, err := cpr.getPieceReaderFromPiecePark(ctx, nil, &pieceCid, &pieceSize)
182+
if err != nil {
183+
return nil, 0, fmt.Errorf("failed to read piece from piece park: %w", err)
184+
}
185+
return reader, rawSize, nil
179186
}
180187

181188
// For each deal, try to read an unsealed copy of the data from the sector
@@ -204,7 +211,8 @@ func (cpr *CachedPieceReader) getPieceReaderFromMarketPieceDeal(ctx context.Cont
204211

205212
if dl.PieceRef.Valid {
206213
// This is a MK20 deal, get from piece park
207-
reader, rawSize, err := cpr.getPieceReaderFromPiecePark(ctx, dl.PieceRef.Int64)
214+
ref := dl.PieceRef.Int64
215+
reader, rawSize, err := cpr.getPieceReaderFromPiecePark(ctx, &ref, nil, nil)
208216
if err != nil {
209217
merr = multierror.Append(merr, xerrors.Errorf("failed to read piece from piece park: %w", err))
210218
continue
@@ -217,42 +225,67 @@ func (cpr *CachedPieceReader) getPieceReaderFromMarketPieceDeal(ctx context.Cont
217225
return nil, 0, merr
218226
}
219227

220-
func (cpr *CachedPieceReader) getPieceReaderFromPiecePark(ctx context.Context, piece_ref int64) (storiface.Reader, uint64, error) {
221-
// Query parked_pieces and parked_piece_refs in one go
222-
var pieceData []struct {
228+
func (cpr *CachedPieceReader) getPieceReaderFromPiecePark(ctx context.Context, pieceRef *int64, pieceCid *cid.Cid, pieceSize *abi.PaddedPieceSize) (storiface.Reader, uint64, error) {
229+
type pieceData struct {
223230
ID int64 `db:"id"`
224231
PieceCid string `db:"piece_cid"`
225232
PieceRawSize int64 `db:"piece_raw_size"`
226233
}
227234

228-
err := cpr.db.Select(ctx, &pieceData, `
235+
var pd []pieceData
236+
237+
if pieceRef != nil {
238+
var pdr []pieceData
239+
err := cpr.db.Select(ctx, &pdr, `
229240
SELECT
230241
pp.id,
231242
pp.piece_cid,
232243
pp.piece_raw_size
233244
FROM parked_piece_refs pr
234245
JOIN parked_pieces pp ON pp.id = pr.piece_id
235246
WHERE pr.ref_id = $1 AND pp.complete = TRUE and pp.long_term = TRUE;
236-
`, piece_ref)
237-
if err != nil {
238-
return nil, 0, fmt.Errorf("failed to query parked_pieces and parked_piece_refs for piece_ref %d: %w", piece_ref, err)
247+
`, pieceRef)
248+
if err != nil {
249+
return nil, 0, fmt.Errorf("failed to query parked_pieces and parked_piece_refs for piece_ref %d: %w", pieceRef, err)
250+
}
251+
if len(pdr) > 0 {
252+
pd = append(pd, pdr...)
253+
}
254+
}
255+
256+
if pieceCid != nil && pieceSize != nil {
257+
pcid := *pieceCid
258+
var pdc []pieceData
259+
err := cpr.db.Select(ctx, &pdc, `
260+
SELECT
261+
id,
262+
piece_cid,
263+
piece_raw_size
264+
FROM parked_pieces
265+
WHERE piece_cid = $1 AND piece_padded_size = $2;`, pcid.String(), *pieceSize)
266+
if err != nil {
267+
return nil, 0, fmt.Errorf("failed to query parked_pieces and parked_piece_refs for piece_ref %d: %w", pieceRef, err)
268+
}
269+
if len(pdc) > 0 {
270+
pd = append(pd, pdc...)
271+
}
239272
}
240273

241-
if len(pieceData) == 0 {
242-
return nil, 0, fmt.Errorf("failed to find piece in parked_pieces for piece_ref %d", piece_ref)
274+
if len(pd) == 0 {
275+
return nil, 0, fmt.Errorf("failed to find piece in parked_pieces for piece_ref %d", pieceRef)
243276
}
244277

245-
pcid, err := cid.Parse(pieceData[0].PieceCid)
278+
pcid, err := cid.Parse(pd[0].PieceCid)
246279
if err != nil {
247280
return nil, 0, fmt.Errorf("failed to parse piece cid: %w", err)
248281
}
249282

250-
reader, err := cpr.pieceParkReader.ReadPiece(ctx, storiface.PieceNumber(pieceData[0].ID), pieceData[0].PieceRawSize, pcid)
283+
reader, err := cpr.pieceParkReader.ReadPiece(ctx, storiface.PieceNumber(pd[0].ID), pd[0].PieceRawSize, pcid)
251284
if err != nil {
252285
return nil, 0, fmt.Errorf("failed to read piece from piece park: %w", err)
253286
}
254287

255-
return reader, uint64(pieceData[0].PieceRawSize), nil
288+
return reader, uint64(pd[0].PieceRawSize), nil
256289
}
257290

258291
type SubPieceReader struct {
@@ -276,7 +309,7 @@ func (s SubPieceReader) ReadAt(p []byte, off int64) (n int, err error) {
276309
return s.sr.ReadAt(p, off)
277310
}
278311

279-
func (cpr *CachedPieceReader) getPieceReaderFromAggregate(ctx context.Context, pieceCidV2 cid.Cid) (storiface.Reader, uint64, error) {
312+
func (cpr *CachedPieceReader) getPieceReaderFromAggregate(ctx context.Context, pieceCidV2 cid.Cid, retrieval bool) (storiface.Reader, uint64, error) {
280313
pieces, err := cpr.idxStor.FindPieceInAggregate(ctx, pieceCidV2)
281314
if err != nil {
282315
return nil, 0, fmt.Errorf("failed to find piece in aggregate: %w", err)
@@ -294,7 +327,7 @@ func (cpr *CachedPieceReader) getPieceReaderFromAggregate(ctx context.Context, p
294327
var merr error
295328

296329
for _, p := range pieces {
297-
reader, _, err := cpr.getPieceReaderFromMarketPieceDeal(ctx, p.Cid)
330+
reader, _, err := cpr.getPieceReaderFromMarketPieceDeal(ctx, p.Cid, retrieval)
298331
if err != nil {
299332
merr = multierror.Append(merr, err)
300333
continue
@@ -307,7 +340,7 @@ func (cpr *CachedPieceReader) getPieceReaderFromAggregate(ctx context.Context, p
307340
return nil, 0, fmt.Errorf("failed to find piece in aggregate: %w", merr)
308341
}
309342

310-
func (cpr *CachedPieceReader) GetSharedPieceReader(ctx context.Context, pieceCidV2 cid.Cid) (storiface.Reader, uint64, error) {
343+
func (cpr *CachedPieceReader) GetSharedPieceReader(ctx context.Context, pieceCidV2 cid.Cid, retrieval bool) (storiface.Reader, uint64, error) {
311344
// Check if this is PieceCidV1 and try to convert to v2 if possible
312345
yes := commcidv2.IsPieceCidV2(pieceCidV2)
313346
if !yes {
@@ -386,13 +419,13 @@ func (cpr *CachedPieceReader) GetSharedPieceReader(ctx context.Context, pieceCid
386419
readerCtx, readerCtxCancel := context.WithCancel(context.Background())
387420
defer close(r.ready)
388421

389-
reader, size, err := cpr.getPieceReaderFromAggregate(readerCtx, pieceCidV2)
422+
reader, size, err := cpr.getPieceReaderFromAggregate(readerCtx, pieceCidV2, retrieval)
390423
if err != nil {
391424
log.Debugw("failed to get piece reader from aggregate", "piececid", pieceCidV2.String(), "err", err)
392425

393426
aerr := err
394427

395-
reader, size, err = cpr.getPieceReaderFromMarketPieceDeal(readerCtx, pieceCidV2)
428+
reader, size, err = cpr.getPieceReaderFromMarketPieceDeal(readerCtx, pieceCidV2, retrieval)
396429
if err != nil {
397430
log.Errorw("failed to get piece reader", "piececid", pieceCid, "piece size", pieceSize, "err", err)
398431
finalErr := fmt.Errorf("failed to get piece reader from aggregate, sector or piece park: %w, %w", aerr, err)

market/ipni/chunker/serve-chunker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ func (p *ServeChunker) reconstructChunkFromCar(ctx context.Context, chunk, piece
276276

277277
pi := commp.PieceInfo()
278278

279-
reader, _, err := p.cpr.GetSharedPieceReader(ctx, piecev2)
279+
reader, _, err := p.cpr.GetSharedPieceReader(ctx, piecev2, false)
280280
defer func(reader storiface.Reader) {
281281
_ = reader.Close()
282282
}(reader)

market/mk20/client/client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/ipfs/go-cid"
1010
logging "github.com/ipfs/go-log/v2"
1111
"github.com/oklog/ulid"
12+
"github.com/samber/lo"
1213
"golang.org/x/xerrors"
1314

1415
"github.com/filecoin-project/go-address"
@@ -172,7 +173,9 @@ func (c *Client) CreateDataSource(pieceCID cid.Cid, car, raw, aggregate, index,
172173
return nil, xerrors.Errorf("only one data format is supported")
173174
}
174175

175-
if !car && (index || withCDN) {
176+
acar := !lo.SomeBy(sub, func(s mk20.DataSource) bool { return s.Format.Car == nil })
177+
178+
if (!car && !acar) && (index || withCDN) {
176179
return nil, xerrors.Errorf("only car data format supports IPFS style CDN retrievals")
177180
}
178181

market/retrieval/piecehandler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (rp *Provider) handleByPieceCid(w http.ResponseWriter, r *http.Request) {
4545
}
4646

4747
// Get a reader over the piece
48-
reader, size, err := rp.cpr.GetSharedPieceReader(ctx, pieceCid)
48+
reader, size, err := rp.cpr.GetSharedPieceReader(ctx, pieceCid, true)
4949
if err != nil {
5050
log.Errorf("server error getting content for piece CID %s: %s", pieceCid, err)
5151
if errors.Is(err, cachedreader.ErrNoDeal) {

market/retrieval/remoteblockstore/remoteblockstore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (ro *RemoteBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks.Block,
115115
var merr error
116116
for _, piece := range pieces {
117117
data, err := func() ([]byte, error) {
118-
reader, _, err := ro.cpr.GetSharedPieceReader(ctx, piece.PieceCidV2)
118+
reader, _, err := ro.cpr.GetSharedPieceReader(ctx, piece.PieceCidV2, true)
119119
if err != nil {
120120
return nil, fmt.Errorf("getting piece reader: %w", err)
121121
}

tasks/indexing/task_indexing.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func (i *IndexingTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do
239239
var reader storiface.Reader
240240

241241
if task.Mk20 {
242-
reader, _, err = i.cpr.GetSharedPieceReader(ctx, pc2)
242+
reader, _, err = i.cpr.GetSharedPieceReader(ctx, pc2, false)
243243

244244
if err != nil {
245245
return false, xerrors.Errorf("getting piece reader: %w", err)

tasks/indexing/task_ipni.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func (I *IPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
274274
if err != nil {
275275
serr := err
276276
// Try to read piece (mk20 deal)
277-
reader, _, err = I.cpr.GetSharedPieceReader(ctx, pcid2)
277+
reader, _, err = I.cpr.GetSharedPieceReader(ctx, pcid2, false)
278278
if err != nil {
279279
return false, xerrors.Errorf("getting piece reader from sector and piece park: %w, %w", serr, err)
280280
}

tasks/indexing/task_pdp_indexing.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func (P *PDPIndexingTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
130130
return true, nil
131131
}
132132

133-
reader, _, err := P.cpr.GetSharedPieceReader(ctx, pcid2)
133+
reader, _, err := P.cpr.GetSharedPieceReader(ctx, pcid2, false)
134134

135135
if err != nil {
136136
return false, xerrors.Errorf("getting piece reader: %w", err)

tasks/indexing/task_pdp_ipni.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ func (P *PDPIPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (don
244244
var lnk ipld.Link
245245

246246
if pinfo.Payload {
247-
reader, _, err := P.cpr.GetSharedPieceReader(ctx, pcid2)
247+
reader, _, err := P.cpr.GetSharedPieceReader(ctx, pcid2, false)
248248
if err != nil {
249249
return false, xerrors.Errorf("getting piece reader from piece park: %w", err)
250250
}

0 commit comments

Comments
 (0)