diff --git a/harmony/harmonydb/sql/20250603-pdp-public-service.sql b/harmony/harmonydb/sql/20250603-pdp-public-service.sql new file mode 100644 index 000000000..e5cf33328 --- /dev/null +++ b/harmony/harmonydb/sql/20250603-pdp-public-service.sql @@ -0,0 +1,18 @@ +-- Create a default "public" PDP service for NullAuth operations +-- This ensures all nodes can add data/proofsets without authentication +DO $$ +BEGIN + -- Check if "public" service already exists + IF NOT EXISTS (SELECT 1 FROM pdp_services WHERE service_label = 'public') THEN + -- Create a dummy public key for the public service + -- This is a placeholder key that allows NullAuth operations + INSERT INTO pdp_services (service_label, pubkey) + VALUES ('public', decode('1C0DE4C0FFEE', 'hex')); + + -- Log the creation + RAISE NOTICE 'Created default "public" PDP service for NullAuth operations'; + ELSE + RAISE NOTICE 'Public PDP service already exists, skipping creation'; + END IF; +END +$$; \ No newline at end of file diff --git a/pdp/auth.go b/pdp/auth.go index e47dbc7d6..73365ff32 100644 --- a/pdp/auth.go +++ b/pdp/auth.go @@ -9,10 +9,30 @@ import ( "strings" "github.com/golang-jwt/jwt/v4" + + "github.com/filecoin-project/curio/harmony/harmonydb" ) -// verifyJWTToken extracts and verifies the JWT token from the request and returns the serviceID. -func (p *PDPService) verifyJWTToken(r *http.Request) (string, error) { +type Auth interface { + AuthService(r *http.Request) (string, error) +} + +type NullAuth struct{} + +var _ Auth = (*NullAuth)(nil) + +func (a *NullAuth) AuthService(r *http.Request) (string, error) { + return "public", nil +} + +type JWTAuth struct { + db *harmonydb.DB +} + +var _ Auth = (*JWTAuth)(nil) + +// JWTAuth extracts and verifies the JWT token from the request and returns the serviceID. +func (a *JWTAuth) AuthService(r *http.Request) (string, error) { // Get the Authorization header authHeader := r.Header.Get("Authorization") if authHeader == "" { @@ -63,7 +83,7 @@ func (p *PDPService) verifyJWTToken(r *http.Request) (string, error) { // Query the database for the public key using serviceID var pubKeyBytes []byte ctx := r.Context() - err := p.db.QueryRow(ctx, ` + err := a.db.QueryRow(ctx, ` SELECT pubkey FROM pdp_services WHERE service_label=$1 `, service).Scan(&pubKeyBytes) if err != nil { diff --git a/pdp/handlers.go b/pdp/handlers.go index 70fe22e68..5576fe39c 100644 --- a/pdp/handlers.go +++ b/pdp/handlers.go @@ -38,6 +38,7 @@ const PDPRoutePath = "/pdp" // PDPService represents the service for managing proof sets and pieces type PDPService struct { + Auth db *harmonydb.DB storage paths.StashStore @@ -53,6 +54,7 @@ type PDPServiceNodeApi interface { // NewPDPService creates a new instance of PDPService with the provided stores func NewPDPService(db *harmonydb.DB, stor paths.StashStore, ec *ethclient.Client, fc PDPServiceNodeApi, sn *message.SenderETH) *PDPService { return &PDPService{ + Auth: &NullAuth{}, db: db, storage: stor, @@ -115,7 +117,7 @@ func Routes(r *chi.Mux, p *PDPService) { func (p *PDPService) handlePing(w http.ResponseWriter, r *http.Request) { // Verify that the request is authorized using ECDSA JWT - _, err := p.verifyJWTToken(r) + _, err := p.AuthService(r) if err != nil { http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) return @@ -130,7 +132,7 @@ func (p *PDPService) handleCreateProofSet(w http.ResponseWriter, r *http.Request ctx := r.Context() // Step 1: Verify that the request is authorized using ECDSA JWT - serviceLabel, err := p.verifyJWTToken(r) + serviceLabel, err := p.AuthService(r) if err != nil { http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) return @@ -283,7 +285,7 @@ func (p *PDPService) handleGetProofSetCreationStatus(w http.ResponseWriter, r *h ctx := r.Context() // Step 1: Verify that the request is authorized using ECDSA JWT - serviceLabel, err := p.verifyJWTToken(r) + serviceLabel, err := p.AuthService(r) if err != nil { http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) return @@ -408,7 +410,7 @@ func (p *PDPService) handleGetProofSet(w http.ResponseWriter, r *http.Request) { ctx := r.Context() // Step 1: Verify that the request is authorized using ECDSA JWT - serviceLabel, err := p.verifyJWTToken(r) + serviceLabel, err := p.AuthService(r) if err != nil { http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) return @@ -533,7 +535,7 @@ func (p *PDPService) handleAddRootToProofSet(w http.ResponseWriter, r *http.Requ ctx := r.Context() // Step 1: Verify that the request is authorized using ECDSA JWT - serviceLabel, err := p.verifyJWTToken(r) + serviceLabel, err := p.AuthService(r) if err != nil { http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) return @@ -919,7 +921,7 @@ func (p *PDPService) handleAddRootToProofSet(w http.ResponseWriter, r *http.Requ func (p *PDPService) handleDeleteProofSetRoot(w http.ResponseWriter, r *http.Request) { ctx := r.Context() // Step 1: Verify that the request is authorized using ECDSA JWT - serviceLabel, err := p.verifyJWTToken(r) + serviceLabel, err := p.AuthService(r) if err != nil { http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) return diff --git a/pdp/handlers_upload.go b/pdp/handlers_upload.go index fe4cb0f59..483019282 100644 --- a/pdp/handlers_upload.go +++ b/pdp/handlers_upload.go @@ -113,7 +113,7 @@ func (ph *PieceHash) maybeStaticCommp() (cid.Cid, bool) { func (p *PDPService) handlePiecePost(w http.ResponseWriter, r *http.Request) { // Verify that the request is authorized using ECDSA JWT - serviceID, err := p.verifyJWTToken(r) + serviceID, err := p.AuthService(r) if err != nil { http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) return @@ -448,7 +448,7 @@ func (p *PDPService) handlePieceUpload(w http.ResponseWriter, r *http.Request) { // query parameters func (p *PDPService) handleFindPiece(w http.ResponseWriter, r *http.Request) { // Verify that the request is authorized using ECDSA JWT - _, err := p.verifyJWTToken(r) + _, err := p.AuthService(r) if err != nil { http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) return diff --git a/tasks/pdp/cleanup_roots.go b/tasks/pdp/cleanup_roots.go new file mode 100644 index 000000000..38e6aa49c --- /dev/null +++ b/tasks/pdp/cleanup_roots.go @@ -0,0 +1,85 @@ +package pdp + +import ( + "context" + "database/sql" + "errors" + "math/big" + + "github.com/ethereum/go-ethereum/ethclient" + "golang.org/x/xerrors" + + "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/filecoin-project/curio/pdp/contract" +) + +// CleanupScheduledRemovals processes any scheduled root removals for a proofset +// This should be called before nextProvingPeriod to ensure deletions aren't lost +func CleanupScheduledRemovals(ctx context.Context, db *harmonydb.DB, ethClient *ethclient.Client, proofSetID int64) error { + pdpVerifier, err := contract.NewPDPVerifier(contract.ContractAddresses().PDPVerifier, ethClient) + if err != nil { + return xerrors.Errorf("failed to instantiate PDPVerifier contract: %w", err) + } + + removals, err := pdpVerifier.GetScheduledRemovals(nil, big.NewInt(proofSetID)) + if err != nil { + return xerrors.Errorf("failed to get scheduled removals: %w", err) + } + + if len(removals) == 0 { + // No removals to process + return nil + } + + // Execute cleanup in a transaction + ok, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) { + for _, removeID := range removals { + log.Debugw("cleanupScheduledRemovals", "proofSetID", proofSetID, "removeID", removeID) + + // Get the pdp_pieceref ID for the root before deleting + var pdpPieceRefID int64 + err := tx.QueryRow(` + SELECT pdp_pieceref + FROM pdp_proofset_roots + WHERE proofset = $1 AND root_id = $2 + `, proofSetID, removeID.Int64()).Scan(&pdpPieceRefID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + // Root already deleted, skip + continue + } + return false, xerrors.Errorf("failed to get piece ref for root %d: %w", removeID, err) + } + + // Delete the parked piece ref, this will cascade to the pdp piece ref too + _, err = tx.Exec(` + DELETE FROM parked_piece_refs + WHERE ref_id = $1 + `, pdpPieceRefID) + if err != nil { + return false, xerrors.Errorf("failed to delete parked piece ref %d: %w", pdpPieceRefID, err) + } + + // Delete the root entry + _, err = tx.Exec(` + DELETE FROM pdp_proofset_roots + WHERE proofset = $1 AND root_id = $2 + `, proofSetID, removeID) + if err != nil { + return false, xerrors.Errorf("failed to delete root %d: %w", removeID, err) + } + } + + return true, nil + }, harmonydb.OptionRetry()) + + if err != nil { + return xerrors.Errorf("failed to cleanup deleted roots: %w", err) + } + if !ok { + return xerrors.Errorf("database delete not committed") + } + + log.Infow("cleaned up scheduled removals", "proofSetID", proofSetID, "count", len(removals)) + return nil +} \ No newline at end of file diff --git a/tasks/pdp/task_init_pp.go b/tasks/pdp/task_init_pp.go index b5bce2010..d61f5f309 100644 --- a/tasks/pdp/task_init_pp.go +++ b/tasks/pdp/task_init_pp.go @@ -136,6 +136,14 @@ func (ipp *InitProvingPeriodTask) Do(taskID harmonytask.TaskID, stillOwned func( return false, xerrors.Errorf("failed to get next challenge window start: %w", err) } init_prove_at = init_prove_at.Add(init_prove_at, challengeWindow.Div(challengeWindow, big.NewInt(2))) // Give a buffer of 1/2 challenge window epochs so that we are still within challenge window + + // Clean up any scheduled removals before calling nextProvingPeriod + // This prevents losing deletions if we missed a proving window + err = CleanupScheduledRemovals(ctx, ipp.db, ipp.ethClient, proofSetID) + if err != nil { + return false, xerrors.Errorf("failed to cleanup scheduled removals: %w", err) + } + // Instantiate the PDPVerifier contract pdpContracts := contract.ContractAddresses() pdpVeriferAddress := pdpContracts.PDPVerifier diff --git a/tasks/pdp/task_next_pp.go b/tasks/pdp/task_next_pp.go index 14790e844..ffb08cd08 100644 --- a/tasks/pdp/task_next_pp.go +++ b/tasks/pdp/task_next_pp.go @@ -127,6 +127,13 @@ func (n *NextProvingPeriodTask) Do(taskID harmonytask.TaskID, stillOwned func() return false, xerrors.Errorf("failed to get next challenge window start: %w", err) } + // Clean up any scheduled removals before calling nextProvingPeriod + // This prevents losing deletions if we missed a proving window + err = CleanupScheduledRemovals(ctx, n.db, n.ethClient, proofSetID) + if err != nil { + return false, xerrors.Errorf("failed to cleanup scheduled removals: %w", err) + } + // Instantiate the PDPVerifier contract pdpContracts := contract.ContractAddresses() pdpVerifierAddress := pdpContracts.PDPVerifier diff --git a/tasks/pdp/task_prove.go b/tasks/pdp/task_prove.go index a81e8615b..b08d9559d 100644 --- a/tasks/pdp/task_prove.go +++ b/tasks/pdp/task_prove.go @@ -169,6 +169,23 @@ func (p *ProveTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("failed to get task details: %w", err) } + // Check if the proofset has any roots + var rootCount int + err = p.db.QueryRow(ctx, ` + SELECT COUNT(*) + FROM pdp_proofset_roots + WHERE proofset = $1 + `, proofSetID).Scan(&rootCount) + if err != nil { + return false, xerrors.Errorf("failed to check root count: %w", err) + } + + if rootCount == 0 { + // No roots in proofset, nothing to prove + log.Infow("PDP Prove Task: proofset has no roots, skipping", "proofSetID", proofSetID, "taskID", taskID) + return true, nil + } + pdpContracts := contract.ContractAddresses() pdpVerifierAddress := pdpContracts.PDPVerifier @@ -282,7 +299,7 @@ func (p *ProveTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } // Remove the roots previously scheduled for deletion - err = p.cleanupDeletedRoots(ctx, proofSetID, pdpVerifier) + err = CleanupScheduledRemovals(ctx, p.db, p.ethClient, proofSetID) if err != nil { return false, xerrors.Errorf("failed to cleanup deleted roots: %w", err) } @@ -654,64 +671,6 @@ func (p *ProveTask) getSenderAddress(ctx context.Context, match common.Address) return address, nil } -func (p *ProveTask) cleanupDeletedRoots(ctx context.Context, proofSetID int64, pdpVerifier *contract.PDPVerifier) error { - - removals, err := pdpVerifier.GetScheduledRemovals(nil, big.NewInt(proofSetID)) - if err != nil { - return xerrors.Errorf("failed to get scheduled removals: %w", err) - } - - // Execute cleanup in a transaction - ok, err := p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) { - - for _, removeID := range removals { - log.Debugw("cleanupDeletedRoots", "removeID", removeID) - // Get the pdp_pieceref ID for the root before deleting - var pdpPieceRefID int64 - err := tx.QueryRow(` - SELECT pdp_pieceref - FROM pdp_proofset_roots - WHERE proofset = $1 AND root_id = $2 - `, proofSetID, removeID.Int64()).Scan(&pdpPieceRefID) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - // Root already deleted, skip - continue - } - return false, xerrors.Errorf("failed to get piece ref for root %d: %w", removeID, err) - } - - // Delete the parked piece ref, this will cascade to the pdp piece ref too - _, err = tx.Exec(` - DELETE FROM parked_piece_refs - WHERE ref_id = $1 - `, pdpPieceRefID) - if err != nil { - return false, xerrors.Errorf("failed to delete parked piece ref %d: %w", pdpPieceRefID, err) - } - - // Delete the root entry - _, err = tx.Exec(` - DELETE FROM pdp_proofset_roots - WHERE proofset = $1 AND root_id = $2 - `, proofSetID, removeID) - if err != nil { - return false, xerrors.Errorf("failed to delete root %d: %w", removeID, err) - } - } - - return true, nil - }, harmonydb.OptionRetry()) - - if err != nil { - return xerrors.Errorf("failed to cleanup deleted roots: %w", err) - } - if !ok { - return xerrors.Errorf("database delete not committed") - } - - return nil -} func (p *ProveTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { if len(ids) == 0 {