Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion cmd/sptool/toolbox_deal_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,7 @@ var mk20PDPDealCmd = &cli.Command{
removeRoot := cctx.Bool("remove-piece")
removeProofset := cctx.Bool("remove-dataset")
recordKeeper := cctx.String("record-keeper")
rootIDSet := cctx.IsSet("piece-id")
rootIDs := cctx.Uint64Slice("piece-id")
proofSetSet := cctx.IsSet("dataset-id")
proofsetID := cctx.Uint64("dataset-id")
Expand All @@ -2240,6 +2241,9 @@ var mk20PDPDealCmd = &cli.Command{
}

if addRoot {
if !proofSetSet {
return xerrors.Errorf("proofset-id must be set when adding a piece")
}
commp := cctx.String("pcidv2")
pieceCid, err := cid.Parse(commp)
if err != nil {
Expand Down Expand Up @@ -2313,7 +2317,7 @@ var mk20PDPDealCmd = &cli.Command{
return err
}
}
id, err := pclient.AddPieceWithAggregate(ctx, walletAddr.String(), recordKeeper, nil, &proofsetID, pieceCid, true, false, mk20.AggregateTypeNone, pieces)
id, err := pclient.AddPieceWithAggregate(ctx, walletAddr.String(), recordKeeper, nil, &proofsetID, pieceCid, true, false, mk20.AggregateTypeV1, pieces)
if err != nil {
return xerrors.Errorf("failed to add piece: %w", err)
}
Expand Down Expand Up @@ -2354,6 +2358,11 @@ var mk20PDPDealCmd = &cli.Command{
if !proofSetSet {
return xerrors.Errorf("proofset-id must be set when removing a root")
}

if !rootIDSet {
return xerrors.Errorf("piece-id must be set when removing a root")
}

id, err := pclient.RemovePiece(ctx, walletAddr.String(), recordKeeper, nil, &proofsetID, rootIDs)
if err != nil {
return xerrors.Errorf("failed to remove piece: %w", err)
Expand Down
73 changes: 53 additions & 20 deletions lib/cachedreader/cachedreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (r *cachedSectionReader) Close() error {
return nil
}

func (cpr *CachedPieceReader) getPieceReaderFromMarketPieceDeal(ctx context.Context, pieceCidV2 cid.Cid) (storiface.Reader, uint64, error) {
func (cpr *CachedPieceReader) getPieceReaderFromMarketPieceDeal(ctx context.Context, pieceCidV2 cid.Cid, retrieval bool) (storiface.Reader, uint64, error) {
// Get all deals containing this piece

commp, err := commcidv2.CommPFromPCidV2(pieceCidV2)
Expand Down Expand Up @@ -175,7 +175,14 @@ func (cpr *CachedPieceReader) getPieceReaderFromMarketPieceDeal(ctx context.Cont
}

if len(deals) == 0 {
return nil, 0, fmt.Errorf("piece cid %s: %w", pieceCid, ErrNoDeal)
if retrieval {
return nil, 0, fmt.Errorf("piece cid %s: %w", pieceCid, ErrNoDeal)
}
reader, rawSize, err := cpr.getPieceReaderFromPiecePark(ctx, nil, &pieceCid, &pieceSize)
if err != nil {
return nil, 0, fmt.Errorf("failed to read piece from piece park: %w", err)
}
return reader, rawSize, nil
}

// For each deal, try to read an unsealed copy of the data from the sector
Expand Down Expand Up @@ -204,7 +211,8 @@ func (cpr *CachedPieceReader) getPieceReaderFromMarketPieceDeal(ctx context.Cont

if dl.PieceRef.Valid {
// This is a MK20 deal, get from piece park
reader, rawSize, err := cpr.getPieceReaderFromPiecePark(ctx, dl.PieceRef.Int64)
ref := dl.PieceRef.Int64
reader, rawSize, err := cpr.getPieceReaderFromPiecePark(ctx, &ref, nil, nil)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("failed to read piece from piece park: %w", err))
continue
Expand All @@ -217,42 +225,67 @@ func (cpr *CachedPieceReader) getPieceReaderFromMarketPieceDeal(ctx context.Cont
return nil, 0, merr
}

func (cpr *CachedPieceReader) getPieceReaderFromPiecePark(ctx context.Context, piece_ref int64) (storiface.Reader, uint64, error) {
// Query parked_pieces and parked_piece_refs in one go
var pieceData []struct {
func (cpr *CachedPieceReader) getPieceReaderFromPiecePark(ctx context.Context, pieceRef *int64, pieceCid *cid.Cid, pieceSize *abi.PaddedPieceSize) (storiface.Reader, uint64, error) {
type pieceData struct {
ID int64 `db:"id"`
PieceCid string `db:"piece_cid"`
PieceRawSize int64 `db:"piece_raw_size"`
}

err := cpr.db.Select(ctx, &pieceData, `
var pd []pieceData

if pieceRef != nil {
var pdr []pieceData
err := cpr.db.Select(ctx, &pdr, `
SELECT
pp.id,
pp.piece_cid,
pp.piece_raw_size
FROM parked_piece_refs pr
JOIN parked_pieces pp ON pp.id = pr.piece_id
WHERE pr.ref_id = $1 AND pp.complete = TRUE and pp.long_term = TRUE;
`, piece_ref)
if err != nil {
return nil, 0, fmt.Errorf("failed to query parked_pieces and parked_piece_refs for piece_ref %d: %w", piece_ref, err)
`, pieceRef)
if err != nil {
return nil, 0, fmt.Errorf("failed to query parked_pieces and parked_piece_refs for piece_ref %d: %w", pieceRef, err)
}
if len(pdr) > 0 {
pd = append(pd, pdr...)
}
}

if pieceCid != nil && pieceSize != nil {
pcid := *pieceCid
var pdc []pieceData
err := cpr.db.Select(ctx, &pdc, `
SELECT
id,
piece_cid,
piece_raw_size
FROM parked_pieces
WHERE piece_cid = $1 AND piece_padded_size = $2;`, pcid.String(), *pieceSize)
if err != nil {
return nil, 0, fmt.Errorf("failed to query parked_pieces and parked_piece_refs for piece_ref %d: %w", pieceRef, err)
}
if len(pdc) > 0 {
pd = append(pd, pdc...)
}
}

if len(pieceData) == 0 {
return nil, 0, fmt.Errorf("failed to find piece in parked_pieces for piece_ref %d", piece_ref)
if len(pd) == 0 {
return nil, 0, fmt.Errorf("failed to find piece in parked_pieces for piece_ref %d", pieceRef)
}

pcid, err := cid.Parse(pieceData[0].PieceCid)
pcid, err := cid.Parse(pd[0].PieceCid)
if err != nil {
return nil, 0, fmt.Errorf("failed to parse piece cid: %w", err)
}

reader, err := cpr.pieceParkReader.ReadPiece(ctx, storiface.PieceNumber(pieceData[0].ID), pieceData[0].PieceRawSize, pcid)
reader, err := cpr.pieceParkReader.ReadPiece(ctx, storiface.PieceNumber(pd[0].ID), pd[0].PieceRawSize, pcid)
if err != nil {
return nil, 0, fmt.Errorf("failed to read piece from piece park: %w", err)
}

return reader, uint64(pieceData[0].PieceRawSize), nil
return reader, uint64(pd[0].PieceRawSize), nil
}

type SubPieceReader struct {
Expand All @@ -276,7 +309,7 @@ func (s SubPieceReader) ReadAt(p []byte, off int64) (n int, err error) {
return s.sr.ReadAt(p, off)
}

func (cpr *CachedPieceReader) getPieceReaderFromAggregate(ctx context.Context, pieceCidV2 cid.Cid) (storiface.Reader, uint64, error) {
func (cpr *CachedPieceReader) getPieceReaderFromAggregate(ctx context.Context, pieceCidV2 cid.Cid, retrieval bool) (storiface.Reader, uint64, error) {
pieces, err := cpr.idxStor.FindPieceInAggregate(ctx, pieceCidV2)
if err != nil {
return nil, 0, fmt.Errorf("failed to find piece in aggregate: %w", err)
Expand All @@ -294,7 +327,7 @@ func (cpr *CachedPieceReader) getPieceReaderFromAggregate(ctx context.Context, p
var merr error

for _, p := range pieces {
reader, _, err := cpr.getPieceReaderFromMarketPieceDeal(ctx, p.Cid)
reader, _, err := cpr.getPieceReaderFromMarketPieceDeal(ctx, p.Cid, retrieval)
if err != nil {
merr = multierror.Append(merr, err)
continue
Expand All @@ -307,7 +340,7 @@ func (cpr *CachedPieceReader) getPieceReaderFromAggregate(ctx context.Context, p
return nil, 0, fmt.Errorf("failed to find piece in aggregate: %w", merr)
}

func (cpr *CachedPieceReader) GetSharedPieceReader(ctx context.Context, pieceCidV2 cid.Cid) (storiface.Reader, uint64, error) {
func (cpr *CachedPieceReader) GetSharedPieceReader(ctx context.Context, pieceCidV2 cid.Cid, retrieval bool) (storiface.Reader, uint64, error) {
// Check if this is PieceCidV1 and try to convert to v2 if possible
yes := commcidv2.IsPieceCidV2(pieceCidV2)
if !yes {
Expand Down Expand Up @@ -386,13 +419,13 @@ func (cpr *CachedPieceReader) GetSharedPieceReader(ctx context.Context, pieceCid
readerCtx, readerCtxCancel := context.WithCancel(context.Background())
defer close(r.ready)

reader, size, err := cpr.getPieceReaderFromAggregate(readerCtx, pieceCidV2)
reader, size, err := cpr.getPieceReaderFromAggregate(readerCtx, pieceCidV2, retrieval)
if err != nil {
log.Debugw("failed to get piece reader from aggregate", "piececid", pieceCidV2.String(), "err", err)

aerr := err

reader, size, err = cpr.getPieceReaderFromMarketPieceDeal(readerCtx, pieceCidV2)
reader, size, err = cpr.getPieceReaderFromMarketPieceDeal(readerCtx, pieceCidV2, retrieval)
if err != nil {
log.Errorw("failed to get piece reader", "piececid", pieceCid, "piece size", pieceSize, "err", err)
finalErr := fmt.Errorf("failed to get piece reader from aggregate, sector or piece park: %w, %w", aerr, err)
Expand Down
2 changes: 1 addition & 1 deletion market/ipni/chunker/serve-chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (p *ServeChunker) reconstructChunkFromCar(ctx context.Context, chunk, piece

pi := commp.PieceInfo()

reader, _, err := p.cpr.GetSharedPieceReader(ctx, piecev2)
reader, _, err := p.cpr.GetSharedPieceReader(ctx, piecev2, false)
defer func(reader storiface.Reader) {
_ = reader.Close()
}(reader)
Expand Down
5 changes: 4 additions & 1 deletion market/mk20/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/oklog/ulid"
"github.com/samber/lo"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -172,7 +173,9 @@ func (c *Client) CreateDataSource(pieceCID cid.Cid, car, raw, aggregate, index,
return nil, xerrors.Errorf("only one data format is supported")
}

if !car && (index || withCDN) {
acar := !lo.SomeBy(sub, func(s mk20.DataSource) bool { return s.Format.Car == nil })

if (!car && !acar) && (index || withCDN) {
return nil, xerrors.Errorf("only car data format supports IPFS style CDN retrievals")
}

Expand Down
2 changes: 1 addition & 1 deletion market/retrieval/piecehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (rp *Provider) handleByPieceCid(w http.ResponseWriter, r *http.Request) {
}

// Get a reader over the piece
reader, size, err := rp.cpr.GetSharedPieceReader(ctx, pieceCid)
reader, size, err := rp.cpr.GetSharedPieceReader(ctx, pieceCid, true)
if err != nil {
log.Errorf("server error getting content for piece CID %s: %s", pieceCid, err)
if errors.Is(err, cachedreader.ErrNoDeal) {
Expand Down
2 changes: 1 addition & 1 deletion market/retrieval/remoteblockstore/remoteblockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (ro *RemoteBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks.Block,
var merr error
for _, piece := range pieces {
data, err := func() ([]byte, error) {
reader, _, err := ro.cpr.GetSharedPieceReader(ctx, piece.PieceCidV2)
reader, _, err := ro.cpr.GetSharedPieceReader(ctx, piece.PieceCidV2, true)
if err != nil {
return nil, fmt.Errorf("getting piece reader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion tasks/indexing/task_indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (i *IndexingTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do
var reader storiface.Reader

if task.Mk20 {
reader, _, err = i.cpr.GetSharedPieceReader(ctx, pc2)
reader, _, err = i.cpr.GetSharedPieceReader(ctx, pc2, false)

if err != nil {
return false, xerrors.Errorf("getting piece reader: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion tasks/indexing/task_ipni.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (I *IPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
if err != nil {
serr := err
// Try to read piece (mk20 deal)
reader, _, err = I.cpr.GetSharedPieceReader(ctx, pcid2)
reader, _, err = I.cpr.GetSharedPieceReader(ctx, pcid2, false)
if err != nil {
return false, xerrors.Errorf("getting piece reader from sector and piece park: %w, %w", serr, err)
}
Expand Down
2 changes: 1 addition & 1 deletion tasks/indexing/task_pdp_indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (P *PDPIndexingTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
return true, nil
}

reader, _, err := P.cpr.GetSharedPieceReader(ctx, pcid2)
reader, _, err := P.cpr.GetSharedPieceReader(ctx, pcid2, false)

if err != nil {
return false, xerrors.Errorf("getting piece reader: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion tasks/indexing/task_pdp_ipni.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (P *PDPIPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (don
var lnk ipld.Link

if pinfo.Payload {
reader, _, err := P.cpr.GetSharedPieceReader(ctx, pcid2)
reader, _, err := P.cpr.GetSharedPieceReader(ctx, pcid2, false)
if err != nil {
return false, xerrors.Errorf("getting piece reader from piece park: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions tasks/pdp/task_prove.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (p *ProveTask) proveRoot(ctx context.Context, dataSetID int64, pieceID int6
// If piece is less than 100 MiB, let's generate proof directly without using cache
if pi.RawSize < MinSizeForCache {
// Get original file reader
reader, _, err := p.cpr.GetSharedPieceReader(ctx, pcid)
reader, _, err := p.cpr.GetSharedPieceReader(ctx, pcid, false)
if err != nil {
return contract.IPDPTypesProof{}, xerrors.Errorf("failed to get piece reader: %w", err)
}
Expand Down Expand Up @@ -495,7 +495,7 @@ func (p *ProveTask) proveRoot(ctx context.Context, dataSetID int64, pieceID int6
subrootSize := padreader.PaddedSize(uint64(length)).Padded()

// Get original file reader
reader, reportedSize, err := p.cpr.GetSharedPieceReader(ctx, pcid)
reader, reportedSize, err := p.cpr.GetSharedPieceReader(ctx, pcid, false)
if err != nil {
return contract.IPDPTypesProof{}, xerrors.Errorf("failed to get reader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion tasks/pdp/task_save_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (t *TaskPDPSaveCache) Do(taskID harmonytask.TaskID, stillOwned func() bool)

if !has {
cp := savecache.NewCommPWithSize(pi.RawSize)
reader, _, err := t.cpr.GetSharedPieceReader(ctx, pcid)
reader, _, err := t.cpr.GetSharedPieceReader(ctx, pcid, false)
if err != nil {
return false, xerrors.Errorf("failed to get shared piece reader: %w", err)
}
Expand Down
Loading