Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
pdpCache := pdp.NewTaskPDPSaveCache(db, dependencies.CachedPieceReader, iStore)
commPTask := pdp.NewPDPCommpTask(db, sc, cfg.Subsystems.CommPMaxTasks)

activeTasks = append(activeTasks, pdpNotifTask, pdpProveTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask, commPTask, pdpAddRoot, addProofSetTask, pdpAggregateTask, pdpCache, pdpDelRoot, pdpDelProofSetTask)
pdpSync := pdp.NewPDPSyncTask(db, ethClient)

activeTasks = append(activeTasks, pdpNotifTask, pdpProveTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask, commPTask, pdpAddRoot, addProofSetTask, pdpAggregateTask, pdpCache, pdpDelRoot, pdpDelProofSetTask, pdpSync)
}

idxMax := taskhelp.Max(cfg.Subsystems.IndexingMaxTasks)
Expand Down
2 changes: 1 addition & 1 deletion extern/filecoin-ffi
8 changes: 3 additions & 5 deletions tasks/pdp/dataset_delete_root_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,15 @@ func processDataSetPieceDelete(ctx context.Context, db *harmonydb.DB, psd DataSe
}

comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
n, err := tx.Exec(`UPDATE pdp_dataset_piece SET removed = TRUE,
_, err = tx.Exec(`UPDATE pdp_dataset_piece SET removed = TRUE,
remove_deal_id = $1,
remove_message_hash = $2
WHERE data_set_id = $3 AND piece = ANY($4)`, psd.ID, psd.Hash, psd.DataSet, psd.Pieces)
if err != nil {
return false, xerrors.Errorf("failed to update pdp_dataset_piece: %w", err)
}
if n != 1 {
return false, xerrors.Errorf("expected 1 row to be updated, got %d", n)
}
n, err = tx.Exec(`UPDATE market_mk20_deal

n, err := tx.Exec(`UPDATE market_mk20_deal
SET pdp_v1 = jsonb_set(pdp_v1, '{complete}', 'true'::jsonb, true)
WHERE id = $1;`, psd.ID)
if err != nil {
Expand Down
183 changes: 183 additions & 0 deletions tasks/pdp/pdp_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package pdp

import (
"context"
"math/big"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/ethclient"
"golang.org/x/xerrors"

"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/harmony/taskhelp"
"github.com/filecoin-project/curio/pdp/contract"
)

type PDPSyncTask struct {
db *harmonydb.DB
ethClient *ethclient.Client
}

func NewPDPSyncTask(db *harmonydb.DB, ethClient *ethclient.Client) *PDPSyncTask {
return &PDPSyncTask{
db: db,
ethClient: ethClient,
}
}

func (P *PDPSyncTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
ctx := context.Background()

// Fetch all proving pieces from DB
var provingPieces []struct {
ID int64 `db:"id"`
Pieces []int64 `db:"pieces"`
}

err = P.db.Select(ctx, &provingPieces, `SELECT
d.id AS data_set_id,
array_agg(p.piece ORDER BY p.piece) AS pieces
FROM pdp_data_set d
JOIN pdp_dataset_piece p
ON p.data_set_id = d.id
WHERE COALESCE(d.removed, FALSE) = FALSE
AND COALESCE(p.removed, FALSE) = FALSE
GROUP BY d.id;`)
if err != nil {
return false, xerrors.Errorf("failed to get proving pieces: %w", err)
}

verifier, err := contract.NewPDPVerifier(contract.ContractAddresses().PDPVerifier, P.ethClient)
if err != nil {
return false, xerrors.Errorf("failed to instantiate PDPVerifier contract: %w", err)
}

var removedDataSetIDs []int64
var provingDataSetIDs []int

// Check if the data set is live
for i, pp := range provingPieces {
did := big.NewInt(pp.ID)
live, err := verifier.DataSetLive(&bind.CallOpts{Context: ctx}, did)
if err != nil {
return false, xerrors.Errorf("failed to check if data set %d is live: %w", pp.ID, err)
}
if !live {
removedDataSetIDs = append(removedDataSetIDs, pp.ID)
} else {
provingDataSetIDs = append(provingDataSetIDs, i)
}
}

// Check if the pieces are live in the data set which are still live
removedPieces := make(map[int64][]int64)

for _, pp := range provingDataSetIDs {
for _, piece := range provingPieces[pp].Pieces {
did := big.NewInt(piece)
live, err := verifier.PieceLive(&bind.CallOpts{Context: ctx}, did, big.NewInt(piece))
if err != nil {
return false, xerrors.Errorf("failed to check if piece %d is live in data set %d: %w", piece, provingPieces[pp].ID, err)
}
if !live {
removedPieces[provingPieces[pp].ID] = append(removedPieces[provingPieces[pp].ID], piece)
}
}
}

// Update in DB
comm, err := P.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
// Mark the data set as removed
if len(removedPieces) > 0 {
_, err = tx.Exec(`UPDATE pdp_data_set SET removed = TRUE,
remove_deal_id = $1,
remove_message_hash = $2
WHERE id = ANY($3)`, "Terminated On Chain", "Terminated On Chain", removedDataSetIDs) // TODO: Figure out how can we reliably get txHash for this from events
if err != nil {
return false, xerrors.Errorf("failed to update pdp_data_set: %w", err)
}

// Start piece cleanup tasks
_, err = tx.Exec(`INSERT INTO piece_cleanup (id, piece_cid_v2, pdp, sp_id, sector_number, piece_ref)
SELECT p.add_deal_id, p.piece_cid_v2, TRUE, -1, -1, p.piece_ref
FROM pdp_dataset_piece AS p
WHERE p.data_set_id = ANY($1)
AND p.removed = FALSE
ON CONFLICT (id, pdp) DO NOTHING;`, removedDataSetIDs)
if err != nil {
return false, xerrors.Errorf("failed to insert into piece_cleanup: %w", err)
}

_, err = tx.Exec(`UPDATE pdp_dataset_piece SET removed = TRUE,
remove_deal_id = $1,
remove_message_hash = $2
WHERE data_set_id = ANY($3)`, "Terminated On Chain", "Terminated On Chain", removedDataSetIDs) // TODO: Figure out how can we reliably get txHash for this from events
if err != nil {
return false, xerrors.Errorf("failed to update pdp_dataset_piece: %w", err)
}
}

// Mark the pieces as removed
for did, pp := range removedPieces {
if len(pp) == 0 {
continue
}

_, err = tx.Exec(`UPDATE pdp_dataset_piece SET
removed = TRUE,
remove_deal_id = $1,
remove_message_hash = $2 WHERE data_set_id = $3 AND piece = ANY($4)`, "Terminated On Chain", "Terminated On Chain", did, pp) // TODO: Figure out how can we reliably get txHash for this from events
if err != nil {
return false, xerrors.Errorf("failed to update pdp_dataset_piece: %w", err)
}

_, err = tx.Exec(`INSERT INTO piece_cleanup (id, piece_cid_v2, pdp, sp_id, sector_number, piece_ref)
SELECT p.add_deal_id, p.piece_cid_v2, TRUE, -1, -1, p.piece_ref
FROM pdp_dataset_piece AS p
WHERE p.data_set_id = $1
AND p.piece = ANY($2)
ON CONFLICT (id, pdp) DO NOTHING;`, did, pp)
if err != nil {
return false, xerrors.Errorf("failed to insert into piece_cleanup: %w", err)
}
}

return true, nil
}, harmonydb.OptionRetry())

if err != nil {
return false, xerrors.Errorf("failed to commit transaction: %w", err)
}

if !comm {
return false, xerrors.Errorf("failed to commit transaction")
}

return true, nil
}

func (P *PDPSyncTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
return &ids[0], nil
}

func (P *PDPSyncTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: taskhelp.Max(1),
Name: "PDPSync",
Cost: resources.Resources{
Cpu: 1,
Ram: 64 << 20,
Gpu: 0,
},
MaxFailures: 3,
IAmBored: harmonytask.SingletonTaskAdder(time.Hour*6, P),
}
}

func (P *PDPSyncTask) Adder(taskFunc harmonytask.AddTaskFunc) {}

var _ harmonytask.TaskInterface = &PDPSyncTask{}
var _ = harmonytask.Reg(&PDPSyncTask{})