From 0a8800ce31782f72bd911f538cd9ee1fa151e628 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Fri, 26 Sep 2025 12:59:08 +0400 Subject: [PATCH 1/2] Add PDP sync task --- cmd/curio/tasks/tasks.go | 4 +- tasks/pdp/dataset_delete_root_watch.go | 8 +- tasks/pdp/pdp_sync.go | 182 +++++++++++++++++++++++++ 3 files changed, 188 insertions(+), 6 deletions(-) create mode 100644 tasks/pdp/pdp_sync.go diff --git a/cmd/curio/tasks/tasks.go b/cmd/curio/tasks/tasks.go index 530e61f04..80e96af2d 100644 --- a/cmd/curio/tasks/tasks.go +++ b/cmd/curio/tasks/tasks.go @@ -320,7 +320,9 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan pdpCache := pdp.NewTaskPDPSaveCache(db, dependencies.CachedPieceReader, iStore) commPTask := pdp.NewPDPCommpTask(db, sc, cfg.Subsystems.CommPMaxTasks) - activeTasks = append(activeTasks, pdpNotifTask, pdpProveTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask, commPTask, pdpAddRoot, addProofSetTask, pdpAggregateTask, pdpCache, pdpDelRoot, pdpDelProofSetTask) + pdpSync := pdp.NewPDPSyncTask(db, ethClient) + + activeTasks = append(activeTasks, pdpNotifTask, pdpProveTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask, commPTask, pdpAddRoot, addProofSetTask, pdpAggregateTask, pdpCache, pdpDelRoot, pdpDelProofSetTask, pdpSync) } idxMax := taskhelp.Max(cfg.Subsystems.IndexingMaxTasks) diff --git a/tasks/pdp/dataset_delete_root_watch.go b/tasks/pdp/dataset_delete_root_watch.go index 94a917154..0f5eba073 100644 --- a/tasks/pdp/dataset_delete_root_watch.go +++ b/tasks/pdp/dataset_delete_root_watch.go @@ -107,17 +107,15 @@ func processDataSetPieceDelete(ctx context.Context, db *harmonydb.DB, psd DataSe } comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { - n, err := tx.Exec(`UPDATE pdp_dataset_piece SET removed = TRUE, + _, err = tx.Exec(`UPDATE pdp_dataset_piece SET removed = TRUE, remove_deal_id = $1, remove_message_hash = $2 WHERE data_set_id = $3 AND piece = ANY($4)`, psd.ID, psd.Hash, psd.DataSet, psd.Pieces) if err != nil { return false, xerrors.Errorf("failed to update pdp_dataset_piece: %w", err) } - if n != 1 { - return false, xerrors.Errorf("expected 1 row to be updated, got %d", n) - } - n, err = tx.Exec(`UPDATE market_mk20_deal + + n, err := tx.Exec(`UPDATE market_mk20_deal SET pdp_v1 = jsonb_set(pdp_v1, '{complete}', 'true'::jsonb, true) WHERE id = $1;`, psd.ID) if err != nil { diff --git a/tasks/pdp/pdp_sync.go b/tasks/pdp/pdp_sync.go new file mode 100644 index 000000000..c7aa987cb --- /dev/null +++ b/tasks/pdp/pdp_sync.go @@ -0,0 +1,182 @@ +package pdp + +import ( + "context" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/filecoin-project/curio/harmony/harmonytask" + "github.com/filecoin-project/curio/harmony/resources" + "github.com/filecoin-project/curio/harmony/taskhelp" + "github.com/filecoin-project/curio/pdp/contract" + "golang.org/x/xerrors" +) + +type PDPSyncTask struct { + db *harmonydb.DB + ethClient *ethclient.Client +} + +func NewPDPSyncTask(db *harmonydb.DB, ethClient *ethclient.Client) *PDPSyncTask { + return &PDPSyncTask{ + db: db, + ethClient: ethClient, + } +} + +func (P *PDPSyncTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + ctx := context.Background() + + // Fetch all proving pieces from DB + var provingPieces []struct { + ID int64 `db:"id"` + Pieces []int64 `db:"pieces"` + } + + err = P.db.Select(ctx, &provingPieces, `SELECT + d.id AS data_set_id, + array_agg(p.piece ORDER BY p.piece) AS pieces + FROM pdp_data_set d + JOIN pdp_dataset_piece p + ON p.data_set_id = d.id + WHERE COALESCE(d.removed, FALSE) = FALSE + AND COALESCE(p.removed, FALSE) = FALSE + GROUP BY d.id;`) + if err != nil { + return false, xerrors.Errorf("failed to get proving pieces: %w", err) + } + + verifier, err := contract.NewPDPVerifier(contract.ContractAddresses().PDPVerifier, P.ethClient) + if err != nil { + return false, xerrors.Errorf("failed to instantiate PDPVerifier contract: %w", err) + } + + var removedDataSetIDs []int64 + var provingDataSetIDs []int + + // Check if the data set is live + for i, pp := range provingPieces { + did := big.NewInt(pp.ID) + live, err := verifier.DataSetLive(&bind.CallOpts{Context: ctx}, did) + if err != nil { + return false, xerrors.Errorf("failed to check if data set %d is live: %w", pp.ID, err) + } + if !live { + removedDataSetIDs = append(removedDataSetIDs, pp.ID) + } else { + provingDataSetIDs = append(provingDataSetIDs, i) + } + } + + // Check if the pieces are live in the data set which are still live + removedPieces := make(map[int64][]int64) + + for _, pp := range provingDataSetIDs { + for _, piece := range provingPieces[pp].Pieces { + did := big.NewInt(piece) + live, err := verifier.PieceLive(&bind.CallOpts{Context: ctx}, did, big.NewInt(piece)) + if err != nil { + return false, xerrors.Errorf("failed to check if piece %d is live in data set %d: %w", piece, provingPieces[pp].ID, err) + } + if !live { + removedPieces[provingPieces[pp].ID] = append(removedPieces[provingPieces[pp].ID], piece) + } + } + } + + // Update in DB + comm, err := P.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + // Mark the data set as removed + if len(removedPieces) > 0 { + _, err = tx.Exec(`UPDATE pdp_data_set SET removed = TRUE, + remove_deal_id = $1, + remove_message_hash = $2 + WHERE id = ANY($3)`, "Terminated On Chain", "Terminated On Chain", removedDataSetIDs) // TODO: Figure out how can we reliably get txHash for this from events + if err != nil { + return false, xerrors.Errorf("failed to update pdp_data_set: %w", err) + } + + // Start piece cleanup tasks + _, err = tx.Exec(`INSERT INTO piece_cleanup (id, piece_cid_v2, pdp, sp_id, sector_number, piece_ref) + SELECT p.add_deal_id, p.piece_cid_v2, TRUE, -1, -1, p.piece_ref + FROM pdp_dataset_piece AS p + WHERE p.data_set_id = ANY($1) + AND p.removed = FALSE + ON CONFLICT (id, pdp) DO NOTHING;`, removedDataSetIDs) + if err != nil { + return false, xerrors.Errorf("failed to insert into piece_cleanup: %w", err) + } + + _, err = tx.Exec(`UPDATE pdp_dataset_piece SET removed = TRUE, + remove_deal_id = $1, + remove_message_hash = $2 + WHERE data_set_id = ANY($3)`, "Terminated On Chain", "Terminated On Chain", removedDataSetIDs) // TODO: Figure out how can we reliably get txHash for this from events + if err != nil { + return false, xerrors.Errorf("failed to update pdp_dataset_piece: %w", err) + } + } + + // Mark the pieces as removed + for did, pp := range removedPieces { + if len(pp) == 0 { + continue + } + + _, err = tx.Exec(`UPDATE pdp_dataset_piece SET + removed = TRUE, + remove_deal_id = $1, + remove_message_hash = $2 WHERE data_set_id = $3 AND piece = ANY($4)`, "Terminated On Chain", "Terminated On Chain", did, pp) // TODO: Figure out how can we reliably get txHash for this from events + if err != nil { + return false, xerrors.Errorf("failed to update pdp_dataset_piece: %w", err) + } + + _, err = tx.Exec(`INSERT INTO piece_cleanup (id, piece_cid_v2, pdp, sp_id, sector_number, piece_ref) + SELECT p.add_deal_id, p.piece_cid_v2, TRUE, -1, -1, p.piece_ref + FROM pdp_dataset_piece AS p + WHERE p.data_set_id = $1 + AND p.piece = ANY($2) + ON CONFLICT (id, pdp) DO NOTHING;`, did, pp) + if err != nil { + return false, xerrors.Errorf("failed to insert into piece_cleanup: %w", err) + } + } + + return true, nil + }, harmonydb.OptionRetry()) + + if err != nil { + return false, xerrors.Errorf("failed to commit transaction: %w", err) + } + + if !comm { + return false, xerrors.Errorf("failed to commit transaction") + } + + return true, nil +} + +func (P *PDPSyncTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + return &ids[0], nil +} + +func (P *PDPSyncTask) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Max: taskhelp.Max(1), + Name: "PDPSync", + Cost: resources.Resources{ + Cpu: 1, + Ram: 64 << 20, + Gpu: 0, + }, + MaxFailures: 3, + IAmBored: harmonytask.SingletonTaskAdder(time.Hour*6, P), + } +} + +func (P *PDPSyncTask) Adder(taskFunc harmonytask.AddTaskFunc) {} + +var _ harmonytask.TaskInterface = &PDPSyncTask{} +var _ = harmonytask.Reg(&PDPSyncTask{}) From a7b49370c1218074577eea69dc323103ff79e33a Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Fri, 26 Sep 2025 13:10:34 +0400 Subject: [PATCH 2/2] fix gen --- extern/filecoin-ffi | 2 +- tasks/pdp/pdp_sync.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/extern/filecoin-ffi b/extern/filecoin-ffi index 586063e9c..44f5dc459 160000 --- a/extern/filecoin-ffi +++ b/extern/filecoin-ffi @@ -1 +1 @@ -Subproject commit 586063e9cfa45147d554f176759520398715ba41 +Subproject commit 44f5dc459be3b74aec77138c4d3e976324b0d17b diff --git a/tasks/pdp/pdp_sync.go b/tasks/pdp/pdp_sync.go index c7aa987cb..c1ad67c8d 100644 --- a/tasks/pdp/pdp_sync.go +++ b/tasks/pdp/pdp_sync.go @@ -7,12 +7,13 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/ethclient" + "golang.org/x/xerrors" + "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" "github.com/filecoin-project/curio/harmony/resources" "github.com/filecoin-project/curio/harmony/taskhelp" "github.com/filecoin-project/curio/pdp/contract" - "golang.org/x/xerrors" ) type PDPSyncTask struct {