diff --git a/cmd/app.go b/cmd/app.go index 1b21bc4b..2c99381f 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -1,236 +1,240 @@ package cmd import ( - "bytes" - "encoding/json" - "fmt" - "io" - "os" - "os/exec" - "strings" + "bytes" + "encoding/json" + "fmt" + "io" + "os" + "os/exec" + "strings" - "github.com/cockroachdb/errors" - "github.com/data-preservation-programs/singularity/cmd/admin" - "github.com/data-preservation-programs/singularity/cmd/cliutil" - "github.com/data-preservation-programs/singularity/cmd/dataprep" - "github.com/data-preservation-programs/singularity/cmd/deal" - "github.com/data-preservation-programs/singularity/cmd/deal/schedule" - "github.com/data-preservation-programs/singularity/cmd/dealtemplate" - "github.com/data-preservation-programs/singularity/cmd/ez" - "github.com/data-preservation-programs/singularity/cmd/run" - "github.com/data-preservation-programs/singularity/cmd/storage" - "github.com/data-preservation-programs/singularity/cmd/tool" - "github.com/data-preservation-programs/singularity/cmd/wallet" - "github.com/filecoin-project/go-address" - "github.com/ipfs/go-log/v2" - "github.com/rclone/rclone/lib/terminal" - "github.com/urfave/cli/v2" + "github.com/cockroachdb/errors" + "github.com/data-preservation-programs/singularity/cmd/admin" + "github.com/data-preservation-programs/singularity/cmd/cliutil" + "github.com/data-preservation-programs/singularity/cmd/dataprep" + "github.com/data-preservation-programs/singularity/cmd/deal" + "github.com/data-preservation-programs/singularity/cmd/deal/schedule" + "github.com/data-preservation-programs/singularity/cmd/dealtemplate" + "github.com/data-preservation-programs/singularity/cmd/ez" + "github.com/data-preservation-programs/singularity/cmd/run" + "github.com/data-preservation-programs/singularity/cmd/storage" + "github.com/data-preservation-programs/singularity/cmd/tool" + "github.com/data-preservation-programs/singularity/cmd/wallet" + "github.com/data-preservation-programs/singularity/service/dealprooftracker" + "github.com/filecoin-project/go-address" + "github.com/ipfs/go-log/v2" + "github.com/rclone/rclone/lib/terminal" + "github.com/urfave/cli/v2" ) var logger = log.Logger("singularity/cmd") +// Static CLI registration with dealproof appended var App = &cli.App{ - Name: "singularity", - Usage: "A tool for large-scale clients with PB-scale data onboarding to Filecoin network", - Description: `Database Backend Support: + Name: "singularity", + Usage: "A tool for large-scale clients with PB-scale data onboarding to Filecoin network", + Description: `Database Backend Support: Singularity supports multiple database backend: sqlite3, postgres, mysql5.7+ Use '--database-connection-string' or $DATABASE_CONNECTION_STRING to specify the database connection string. - Example for postgres - postgres://user:pass@example.com:5432/dbname - Example for mysql - mysql://user:pass@tcp(localhost:3306)/dbname?parseTime=true - Example for sqlite3 - sqlite:/absolute/path/to/database.db - or - sqlite:relative/path/to/database.db + Example for postgres - postgres://user:pass@example.com:5432/dbname + Example for mysql - mysql://user:pass@tcp(localhost:3306)/dbname?parseTime=true + Example for sqlite3 - sqlite:/absolute/path/to/database.db + or - sqlite:relative/path/to/database.db Network Support: Default settings in Singularity are for Mainnet. You may set below environment values for other network: - For Calibration network: - * Set LOTUS_API to https://api.calibration.node.glif.io/rpc/v1 - * Set MARKET_DEAL_URL to https://marketdeals-calibration.s3.amazonaws.com/StateMarketDeals.json.zst - * Set LOTUS_TEST to 1 - For all other networks: - * Set LOTUS_API to your network's Lotus API endpoint - * Set MARKET_DEAL_URL to empty string - * Set LOTUS_TEST to 0 or 1 based on whether the network address starts with 'f' or 't' - Switching between different networks with the same database instance is not recommended. + For Calibration network: + * Set LOTUS_API to https://api.calibration.node.glif.io/rpc/v1 + * Set MARKET_DEAL_URL to https://marketdeals-calibration.s3.amazonaws.com/StateMarketDeals.json.zst + * Set LOTUS_TEST to 1 + For all other networks: + * Set LOTUS_API to your network's Lotus API endpoint + * Set MARKET_DEAL_URL to empty string + * Set LOTUS_TEST to 0 or 1 based on whether the network address starts with 'f' or 't' + Switching between different networks with the same database instance is not recommended. Logging: Singularity uses go-log for logging and can be controlled by below environment variables: - * GOLOG_LOG_LEVEL - example values: debug, info, warn, error, dpanic, panic, fatal - * GOLOG_LOG_FMT - example values: color, nocolor, json - * More details can be found at https://github.com/ipfs/go-log + * GOLOG_LOG_LEVEL - example values: debug, info, warn, error, dpanic, panic, fatal + * GOLOG_LOG_FMT - example values: color, nocolor, json + * More details can be found at https://github.com/ipfs/go-log Upgrading: Within each minor version upgrade, use "singularity admin init" to upgrade the database schema. For major version upgrade, please refer to the release notes for upgrade instructions. `, - EnableBashCompletion: true, - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "database-connection-string", - Usage: "Connection string to the database", - DefaultText: "sqlite:" + "./singularity.db", - Value: "sqlite:" + "./singularity.db", - EnvVars: []string{"DATABASE_CONNECTION_STRING"}, - }, - &cli.BoolFlag{ - Name: "json", - Usage: "Enable JSON output", - Value: false, - }, - &cli.BoolFlag{ - Name: "verbose", - Usage: "Enable verbose output. This will print more columns for the result as well as full error trace", - Value: false, - }, - &cli.StringFlag{ - Name: "lotus-api", - Category: "Lotus", - Usage: "Lotus RPC API endpoint", - Value: "https://api.node.glif.io/rpc/v1", - EnvVars: []string{"LOTUS_API"}, - }, - &cli.StringFlag{ - Name: "lotus-token", - Category: "Lotus", - Usage: "Lotus RPC API token", - Value: "", - EnvVars: []string{"LOTUS_TOKEN"}, - }, - &cli.BoolFlag{ - Name: "lotus-test", - Category: "Lotus", - Usage: "Whether the runtime environment is using Testnet.", - EnvVars: []string{"LOTUS_TEST"}, - }, - }, - Before: func(c *cli.Context) error { - if c.Bool("lotus-test") { - address.CurrentNetwork = address.Testnet - logger.Infow("Current network is set to Testnet") - } else { - address.CurrentNetwork = address.Mainnet - } - return nil - }, - Commands: []*cli.Command{ - OnboardCmd, - ez.PrepCmd, - VersionCmd, - { - Name: "admin", - Usage: "Admin commands", - Category: "Operations", - Subcommands: []*cli.Command{ - admin.InitCmd, - admin.ResetCmd, - admin.MigrateCmd, - admin.MigrateDatasetCmd, - admin.MigrateScheduleCmd, - }, - }, - DownloadCmd, - tool.ExtractCarCmd, - { - Name: "deal", - Usage: "Replication / Deal making management", - Category: "Operations", - Subcommands: []*cli.Command{ - { - Name: "schedule", - Usage: "Schedule deals", - Subcommands: []*cli.Command{ - schedule.CreateCmd, - schedule.ListCmd, - schedule.UpdateCmd, - schedule.PauseCmd, - schedule.ResumeCmd, - schedule.RemoveCmd, - }, - }, - deal.SendManualCmd, - deal.ListCmd, - }, - }, - { - Name: "deal-schedule-template", - Aliases: []string{"dst"}, - Usage: "Deal schedule template management", - Category: "Operations", - Subcommands: []*cli.Command{ - dealtemplate.CreateCmd, - dealtemplate.ListCmd, - dealtemplate.GetCmd, - dealtemplate.DeleteCmd, - }, - }, - { - Name: "run", - Category: "Daemons", - Usage: "run different singularity components", - Subcommands: []*cli.Command{ - run.APICmd, - run.DatasetWorkerCmd, - run.ContentProviderCmd, - run.DealTrackerCmd, - run.DealPusherCmd, - run.DownloadServerCmd, - run.UnifiedServiceCmd, - }, - }, - { - Name: "wallet", - Category: "Operations", - Usage: "Wallet management", - Subcommands: []*cli.Command{ - wallet.BalanceCmd, - wallet.CreateCmd, - wallet.ImportCmd, - wallet.InitCmd, - wallet.ListCmd, - wallet.RemoveCmd, - wallet.UpdateCmd, - }, - }, - { - Name: "storage", - Category: "Operations", - Usage: "Create and manage storage system connections", - Subcommands: []*cli.Command{ - storage.CreateCmd, - storage.ExploreCmd, - storage.ListCmd, - storage.RemoveCmd, - storage.UpdateCmd, - storage.RenameCmd, - }, - }, - { - Name: "prep", - Category: "Operations", - Usage: "Create and manage dataset preparations", - Subcommands: []*cli.Command{ - dataprep.CreateCmd, - dataprep.ListCmd, - dataprep.StatusCmd, - dataprep.RenameCmd, - dataprep.AttachSourceCmd, - dataprep.AttachOutputCmd, - dataprep.DetachOutputCmd, - dataprep.StartScanCmd, - dataprep.PauseScanCmd, - dataprep.StartPackCmd, - dataprep.PausePackCmd, - dataprep.StartDagGenCmd, - dataprep.PauseDagGenCmd, - dataprep.ListPiecesCmd, - dataprep.AddPieceCmd, - dataprep.ExploreCmd, - dataprep.AttachWalletCmd, - dataprep.ListWalletsCmd, - dataprep.DetachWalletCmd, - dataprep.RemoveCmd, - }, - }, - }, + EnableBashCompletion: true, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "database-connection-string", + Usage: "Connection string to the database", + DefaultText: "sqlite:" + "./singularity.db", + Value: "sqlite:" + "./singularity.db", + EnvVars: []string{"DATABASE_CONNECTION_STRING"}, + }, + &cli.BoolFlag{ + Name: "json", + Usage: "Enable JSON output", + Value: false, + }, + &cli.BoolFlag{ + Name: "verbose", + Usage: "Enable verbose output. This will print more columns for the result as well as full error trace", + Value: false, + }, + &cli.StringFlag{ + Name: "lotus-api", + Category: "Lotus", + Usage: "Lotus RPC API endpoint", + Value: "https://api.node.glif.io/rpc/v1", + EnvVars: []string{"LOTUS_API"}, + }, + &cli.StringFlag{ + Name: "lotus-token", + Category: "Lotus", + Usage: "Lotus RPC API token", + Value: "", + EnvVars: []string{"LOTUS_TOKEN"}, + }, + &cli.BoolFlag{ + Name: "lotus-test", + Category: "Lotus", + Usage: "Whether the runtime environment is using Testnet.", + EnvVars: []string{"LOTUS_TEST"}, + }, + }, + Before: func(c *cli.Context) error { + if c.Bool("lotus-test") { + address.CurrentNetwork = address.Testnet + logger.Infow("Current network is set to Testnet") + } else { + address.CurrentNetwork = address.Mainnet + } + return nil + }, + Commands: []*cli.Command{ + OnboardCmd, + ez.PrepCmd, + VersionCmd, + { + Name: "admin", + Usage: "Admin commands", + Category: "Operations", + Subcommands: []*cli.Command{ + admin.InitCmd, + admin.ResetCmd, + admin.MigrateCmd, + admin.MigrateDatasetCmd, + admin.MigrateScheduleCmd, + }, + }, + DownloadCmd, + tool.ExtractCarCmd, + { + Name: "deal", + Usage: "Replication / Deal making management", + Category: "Operations", + Subcommands: []*cli.Command{ + { + Name: "schedule", + Usage: "Schedule deals", + Subcommands: []*cli.Command{ + schedule.CreateCmd, + schedule.ListCmd, + schedule.UpdateCmd, + schedule.PauseCmd, + schedule.ResumeCmd, + schedule.RemoveCmd, + }, + }, + deal.SendManualCmd, + deal.ListCmd, + }, + }, + { + Name: "deal-schedule-template", + Aliases: []string{"dst"}, + Usage: "Deal schedule template management", + Category: "Operations", + Subcommands: []*cli.Command{ + dealtemplate.CreateCmd, + dealtemplate.ListCmd, + dealtemplate.GetCmd, + dealtemplate.DeleteCmd, + }, + }, + { + Name: "run", + Category: "Daemons", + Usage: "run different singularity components", + Subcommands: []*cli.Command{ + run.APICmd, + run.DatasetWorkerCmd, + run.ContentProviderCmd, + run.DealTrackerCmd, + run.DealPusherCmd, + run.DownloadServerCmd, + run.UnifiedServiceCmd, + }, + }, + { + Name: "wallet", + Category: "Operations", + Usage: "Wallet management", + Subcommands: []*cli.Command{ + wallet.BalanceCmd, + wallet.CreateCmd, + wallet.ImportCmd, + wallet.InitCmd, + wallet.ListCmd, + wallet.RemoveCmd, + wallet.UpdateCmd, + }, + }, + { + Name: "storage", + Category: "Operations", + Usage: "Create and manage storage system connections", + Subcommands: []*cli.Command{ + storage.CreateCmd, + storage.ExploreCmd, + storage.ListCmd, + storage.RemoveCmd, + storage.UpdateCmd, + storage.RenameCmd, + }, + }, + { + Name: "prep", + Category: "Operations", + Usage: "Create and manage dataset preparations", + Subcommands: []*cli.Command{ + dataprep.CreateCmd, + dataprep.ListCmd, + dataprep.StatusCmd, + dataprep.RenameCmd, + dataprep.AttachSourceCmd, + dataprep.AttachOutputCmd, + dataprep.DetachOutputCmd, + dataprep.StartScanCmd, + dataprep.PauseScanCmd, + dataprep.StartPackCmd, + dataprep.PausePackCmd, + dataprep.StartDagGenCmd, + dataprep.PauseDagGenCmd, + dataprep.ListPiecesCmd, + dataprep.AddPieceCmd, + dataprep.ExploreCmd, + dataprep.AttachWalletCmd, + dataprep.ListWalletsCmd, + dataprep.DetachWalletCmd, + dataprep.RemoveCmd, + }, + }, + // Deal proof tracker command + dealprooftracker.CLICommands(), + }, } var originalHelpPrinter = cli.HelpPrinter diff --git a/lotusclient/lotusclient.go b/lotusclient/lotusclient.go new file mode 100644 index 00000000..1e067e17 --- /dev/null +++ b/lotusclient/lotusclient.go @@ -0,0 +1,17 @@ +package lotusclient + +import ( + "github.com/ybbus/jsonrpc/v3" +) + +// NewLotusClient creates a new JSON-RPC client for interacting with a Lotus node. +func NewLotusClient(lotusAPI string, lotusToken string) jsonrpc.RPCClient { + if lotusToken == "" { + return jsonrpc.NewClient(lotusAPI) + } + return jsonrpc.NewClientWithOpts(lotusAPI, &jsonrpc.RPCClientOpts{ + CustomHeaders: map[string]string{ + "Authorization": "Bearer " + lotusToken, + }, + }) +} diff --git a/migrate/migrations/202507210010_create_deal_proof_trackings.go b/migrate/migrations/202507210010_create_deal_proof_trackings.go new file mode 100644 index 00000000..e5239d08 --- /dev/null +++ b/migrate/migrations/202507210010_create_deal_proof_trackings.go @@ -0,0 +1,31 @@ +package migrations + +import ( + "github.com/go-gormigrate/gormigrate/v2" + "gorm.io/gorm" +) + +func CreateDealProofTrackings202507210010() *gormigrate.Migration { + return &gormigrate.Migration{ + ID: "202507210010", + Migrate: func(tx *gorm.DB) error { + return tx.Exec(` + CREATE TABLE IF NOT EXISTS deal_proof_trackings ( + deal_id INTEGER PRIMARY KEY, + provider TEXT NOT NULL, + sector_id INTEGER NOT NULL, + sector_start_epoch INTEGER NOT NULL, + current_deadline_index INTEGER NOT NULL, + period_start_epoch INTEGER NOT NULL, + estimated_next_proof_time DATETIME NOT NULL, + faults INTEGER NOT NULL, + recoveries INTEGER NOT NULL, + last_updated_at DATETIME NOT NULL + ) + `).Error + }, + Rollback: func(tx *gorm.DB) error { + return tx.Exec(`DROP TABLE IF EXISTS deal_proof_trackings`).Error + }, + } +} diff --git a/migrate/migrations/migrations.go b/migrate/migrations/migrations.go index 934b33e2..2675941d 100644 --- a/migrate/migrations/migrations.go +++ b/migrate/migrations/migrations.go @@ -1,3 +1,4 @@ + package migrations import ( @@ -14,5 +15,6 @@ func GetMigrations() []*gormigrate.Migration { _202507090900_add_missing_deal_template_fields(), _202507090915_add_not_null_defaults(), _202507091000_add_schedule_fields_to_deal_templates(), + CreateDealProofTrackings202507210010(), } } diff --git a/model/migrate.go b/model/migrate.go index 036b948a..2b6b3054 100644 --- a/model/migrate.go +++ b/model/migrate.go @@ -4,6 +4,9 @@ import ( "crypto/rand" "encoding/base64" + // For registering DealProofTracking in migrations + "github.com/data-preservation-programs/singularity/service/dealprooftracker" + "github.com/cockroachdb/errors" "github.com/data-preservation-programs/singularity/migrate/migrations" "github.com/go-gormigrate/gormigrate/v2" @@ -31,6 +34,8 @@ var Tables = []any{ &Deal{}, &Schedule{}, &Wallet{}, + // --- Added for dealprooftracker --- + &dealprooftracker.DealProofTracking{}, } var logger = logging.Logger("model") diff --git a/service/dealprooftracker/api.go b/service/dealprooftracker/api.go new file mode 100644 index 00000000..a3bfebbb --- /dev/null +++ b/service/dealprooftracker/api.go @@ -0,0 +1,39 @@ +package dealprooftracker + +import ( + "net/http" + "strconv" + "github.com/gin-gonic/gin" +) + +// RegisterRoutes registers dealprooftracker API endpoints +func RegisterRoutes(r *gin.Engine, tracker *ProofTracker) { + r.GET("/api/v1/dealproofs/:deal_id", func(c *gin.Context) { + dealID, err := strconv.ParseUint(c.Param("deal_id"), 10, 64) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid deal_id"}) + return + } + ctx := c.Request.Context() + info, err := tracker.GetDBProofInfo(ctx, dealID) + if err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, info) + }) + r.GET("/api/v1/dealproofs/:deal_id/live", func(c *gin.Context) { + dealID, err := strconv.ParseUint(c.Param("deal_id"), 10, 64) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid deal_id"}) + return + } + ctx := c.Request.Context() + info, err := tracker.GetLiveProofInfo(ctx, dealID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, info) + }) +} diff --git a/service/dealprooftracker/cli.go b/service/dealprooftracker/cli.go new file mode 100644 index 00000000..871fb1ca --- /dev/null +++ b/service/dealprooftracker/cli.go @@ -0,0 +1,197 @@ +package dealprooftracker + +import ( + "encoding/json" + "fmt" + "os" + "strconv" + "time" + + "github.com/urfave/cli/v2" + "github.com/data-preservation-programs/singularity/database" +) + +const defaultPollingInterval = 5 * time.Minute + +// CLICommands returns the CLI commands for dealprooftracker +func CLICommands() *cli.Command { + return &cli.Command{ + Name: "dealproof", + Usage: "Deal Proof Tracker commands", + Subcommands: []*cli.Command{ + { + Name: "run", + Usage: "Start the deal proof tracker service loop", + Action: runTracker, + Flags: commonFlags(), + }, + { + Name: "live", + Usage: "Fetch proof info in real time from Lotus", + ArgsUsage: "", + Action: liveProof, + Flags: commonFlags(), + }, + { + Name: "db", + Usage: "Fetch proof info from the local DB", + ArgsUsage: "", + Action: dbProof, + Flags: commonFlags(), + }, + { + Name: "health", + Usage: "Check DB and Lotus connectivity", + Action: healthCheck, + Flags: commonFlags(), + }, + }, + } +} + +func commonFlags() []cli.Flag { + return []cli.Flag{ + &cli.StringFlag{ + Name: "database-connection-string", + EnvVars: []string{"DATABASE_CONNECTION_STRING"}, + Usage: "Database connection string", + }, + &cli.StringFlag{ + Name: "lotus-api", + EnvVars: []string{"LOTUS_API"}, + Usage: "Lotus API endpoint", + }, + &cli.StringFlag{ + Name: "lotus-token", + EnvVars: []string{"LOTUS_TOKEN"}, + Usage: "Lotus API token", + }, + &cli.StringFlag{ + Name: "output", + Usage: "Output format: plain|json", + Value: "plain", + }, + &cli.DurationFlag{ + Name: "interval", + Usage: "Polling interval for the tracker service", + Value: defaultPollingInterval, + }, + } +} + +// Helper to open DB and initialize tracker +func resolveDeps(c *cli.Context) (*ProofTracker, error) { + connStr := c.String("database-connection-string") + if connStr == "" { + return nil, fmt.Errorf("database connection string is required") + } + + db, _, err := database.OpenWithLogger(connStr) + if err != nil { + return nil, fmt.Errorf("failed to open DB: %w", err) + } + + lotusAPI := c.String("lotus-api") + if lotusAPI == "" { + return nil, fmt.Errorf("lotus-api is required") + } + + lotusToken := c.String("lotus-token") + interval := c.Duration("interval") + + // Create and return the tracker + return NewProofTracker(db, lotusAPI, lotusToken, interval), nil +} + +func runTracker(c *cli.Context) error { + tracker, err := resolveDeps(c) + if err != nil { + return err + } + tracker.Start(c.Context) + select {} // Block forever +} + +func liveProof(c *cli.Context) error { + tracker, err := resolveDeps(c) + if err != nil { + return err + } + + if c.NArg() < 1 { + return fmt.Errorf("dealID required") + } + + dealID, err := strconv.ParseUint(c.Args().Get(0), 10, 64) + if err != nil { + return fmt.Errorf("invalid dealID: %w", err) + } + + info, err := tracker.GetLiveProofInfo(c.Context, dealID) + if err != nil { + return fmt.Errorf("failed to get live proof info: %w", err) + } + + return printOutput(c, info) +} + +func dbProof(c *cli.Context) error { + tracker, err := resolveDeps(c) + if err != nil { + return err + } + + if c.NArg() < 1 { + return fmt.Errorf("dealID required") + } + + dealID, err := strconv.ParseUint(c.Args().Get(0), 10, 64) + if err != nil { + return fmt.Errorf("invalid dealID: %w", err) + } + + info, err := tracker.GetDBProofInfo(c.Context, dealID) + if err != nil { + return fmt.Errorf("failed to get DB proof info: %w", err) + } + + return printOutput(c, info) +} + +func healthCheck(c *cli.Context) error { + tracker, err := resolveDeps(c) + if err != nil { + return err + } + + // Check DB connection + if err := tracker.db.Exec("SELECT 1").Error; err != nil { + return fmt.Errorf("DB health check failed: %w", err) + } + + // Check Lotus connection by attempting to list deals (simplified health check) + info, checkErr := tracker.GetLiveProofInfo(c.Context, 1) // Use deal ID 1 as a test + if checkErr != nil { + return fmt.Errorf("Lotus health check failed: %w", checkErr) + } + _ = info // Ignore the result, we just care about connectivity + + fmt.Println("Health check: OK") + return nil +} + +func printOutput(c *cli.Context, v interface{}) error { + switch c.String("output") { + case "json": + return printJSON(v) + default: + fmt.Printf("%+v\n", v) + return nil + } +} + +func printJSON(v interface{}) error { + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + return enc.Encode(v) +} diff --git a/service/dealprooftracker/dealprooftracker.go b/service/dealprooftracker/dealprooftracker.go new file mode 100644 index 00000000..153c3134 --- /dev/null +++ b/service/dealprooftracker/dealprooftracker.go @@ -0,0 +1,202 @@ +package dealprooftracker + +import ( + "context" + "log" + "time" + "gorm.io/gorm" + "github.com/data-preservation-programs/singularity/lotusclient" +) + +type ProofTracker struct { + db *gorm.DB + lotusURL string + lotusToken string + interval time.Duration + stopCh chan struct{} +} + +func NewProofTracker(db *gorm.DB, lotusURL, lotusToken string, interval time.Duration) *ProofTracker { + return &ProofTracker{ + db: db, + lotusURL: lotusURL, + lotusToken: lotusToken, + interval: interval, + stopCh: make(chan struct{}), + } +} + +// Start runs the tracker loop +func (pt *ProofTracker) Start(ctx context.Context) { + ticker := time.NewTicker(pt.interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + pt.pollAndUpdate(ctx) + case <-pt.stopCh: + return + case <-ctx.Done(): + return + } + } +} + +// pollAndUpdate queries Lotus and updates DB for tracked deals (MVP: single deal example) +func (pt *ProofTracker) pollAndUpdate(ctx context.Context) { + log.Println("[ProofTracker] Polling Lotus and updating DB...") + lotus := lotusclient.NewLotusClient(pt.lotusURL, pt.lotusToken) + + // Example: fetch a list of deal IDs to track (MVP: hardcoded or from DB) + var trackedDeals []uint64 + // TODO: Replace with DB query for all deals to track + trackedDeals = []uint64{113718925} // Example deal + + for _, dealID := range trackedDeals { + var dealInfo map[string]interface{} + err := lotus.CallFor(ctx, &dealInfo, "Filecoin.StateMarketStorageDeal", dealID, nil) + if err != nil { + log.Printf("[ProofTracker] Failed to get deal info for %d: %v", dealID, err) + continue + } + state := dealInfo["State"].(map[string]interface{}) + sectorNumber := int64(state["SectorNumber"].(float64)) + sectorStartEpoch := int32(state["SectorStartEpoch"].(float64)) + + proposal := dealInfo["Proposal"].(map[string]interface{}) + provider := proposal["Provider"].(string) + + // Get proving deadline info + var deadlineInfo map[string]interface{} + err = lotus.CallFor(ctx, &deadlineInfo, "Filecoin.StateMinerProvingDeadline", provider, nil) + if err != nil { + log.Printf("[ProofTracker] Failed to get proving deadline for %s: %v", provider, err) + continue + } + currentDeadline := int32(deadlineInfo["Index"].(float64)) + periodStartEpoch := int32(deadlineInfo["PeriodStart"].(float64)) + + // Estimate next proof time (MVP: now + 30min) + estimatedNextProofTime := time.Now().Add(30 * time.Minute) + + // Get partition info (faults/recoveries) + var partitions []map[string]interface{} + err = lotus.CallFor(ctx, &partitions, "Filecoin.StateMinerPartitions", provider, currentDeadline, nil) + faults := 0 + recoveries := 0 + if err == nil && len(partitions) > 0 { + for _, p := range partitions { + if f, ok := p["FaultySectors"].(float64); ok { + faults += int(f) + } + if r, ok := p["RecoveringSectors"].(float64); ok { + recoveries += int(r) + } + } + } + + // Upsert into DB + tracking := DealProofTracking{ + DealID: dealID, + Provider: provider, + SectorID: sectorNumber, + SectorStartEpoch: sectorStartEpoch, + CurrentDeadlineIndex: currentDeadline, + PeriodStartEpoch: periodStartEpoch, + EstimatedNextProofTime: estimatedNextProofTime, + Faults: faults, + Recoveries: recoveries, + LastUpdatedAt: time.Now(), + } + err = pt.db.WithContext(ctx).Save(&tracking).Error + if err != nil { + log.Printf("[ProofTracker] Failed to upsert deal proof tracking for %d: %v", dealID, err) + } else { + log.Printf("[ProofTracker] Updated deal proof tracking for %d", dealID) + } + } +} + +// (Removed duplicate Stop() method) + +// GetLiveProofInfo queries Lotus for real-time proof info for a deal +func (pt *ProofTracker) GetLiveProofInfo(ctx context.Context, dealID uint64) (*DealProofTracking, error) { + lotus := NewLotusClient(pt.lotusURL, pt.lotusToken) + var dealInfo map[string]interface{} + err := lotus.CallFor(ctx, &dealInfo, "Filecoin.StateMarketStorageDeal", dealID, nil) + if err != nil { + return nil, err + } + state := dealInfo["State"].(map[string]interface{}) + sectorNumber := int64(state["SectorNumber"].(float64)) + sectorStartEpoch := int32(state["SectorStartEpoch"].(float64)) + proposal := dealInfo["Proposal"].(map[string]interface{}) + provider := proposal["Provider"].(string) + var deadlineInfo map[string]interface{} + err = lotus.CallFor(ctx, &deadlineInfo, "Filecoin.StateMinerProvingDeadline", provider, nil) + if err != nil { + return nil, err + } + currentDeadline := int32(deadlineInfo["Index"].(float64)) + periodStartEpoch := int32(deadlineInfo["PeriodStart"].(float64)) + estimatedNextProofTime := time.Now().Add(30 * time.Minute) + var partitions []map[string]interface{} + err = lotus.CallFor(ctx, &partitions, "Filecoin.StateMinerPartitions", provider, currentDeadline, nil) + faults := 0 + recoveries := 0 + if err == nil && len(partitions) > 0 { + for _, p := range partitions { + if f, ok := p["FaultySectors"].(float64); ok { + faults += int(f) + } + if r, ok := p["RecoveringSectors"].(float64); ok { + recoveries += int(r) + } + } + } + tracking := &DealProofTracking{ + DealID: dealID, + Provider: provider, + SectorID: sectorNumber, + SectorStartEpoch: sectorStartEpoch, + CurrentDeadlineIndex: currentDeadline, + PeriodStartEpoch: periodStartEpoch, + EstimatedNextProofTime: estimatedNextProofTime, + Faults: faults, + Recoveries: recoveries, + LastUpdatedAt: time.Now(), + } + return tracking, nil +} + +// GetDBProofInfo fetches proof info from the DB for a deal +func (pt *ProofTracker) GetDBProofInfo(ctx context.Context, dealID uint64) (*DealProofTracking, error) { + var tracking DealProofTracking + err := pt.db.WithContext(ctx).First(&tracking, "deal_id = ?", dealID).Error + if err != nil { + return nil, err + } + return &tracking, nil +} + +// HealthCheck returns nil if the tracker can reach Lotus and DB +func (pt *ProofTracker) HealthCheck(ctx context.Context) error { + // Check DB + if err := pt.db.WithContext(ctx).Exec("SELECT 1").Error; err != nil { + return err + } + // Check Lotus + lotus := lotusclient.NewLotusClient(pt.lotusURL, pt.lotusToken) + var chainHead map[string]interface{} + err := lotus.CallFor(ctx, &chainHead, "Filecoin.ChainHead") + if err != nil { + return err + } + return nil +} +// (Removed stray code block that was outside any function) + +// Stop signals the tracker to stop +func (pt *ProofTracker) Stop() { + close(pt.stopCh) +} diff --git a/service/dealprooftracker/lotusclient.go b/service/dealprooftracker/lotusclient.go new file mode 100644 index 00000000..f4302d32 --- /dev/null +++ b/service/dealprooftracker/lotusclient.go @@ -0,0 +1,17 @@ +package dealprooftracker + +import ( + "github.com/ybbus/jsonrpc/v3" +) + +// NewLotusClient creates a new JSON-RPC client for interacting with a Lotus node. +func NewLotusClient(lotusAPI string, lotusToken string) jsonrpc.RPCClient { + if lotusToken == "" { + return jsonrpc.NewClient(lotusAPI) + } + return jsonrpc.NewClientWithOpts(lotusAPI, &jsonrpc.RPCClientOpts{ + CustomHeaders: map[string]string{ + "Authorization": "Bearer " + lotusToken, + }, + }) +} diff --git a/service/dealprooftracker/model.go b/service/dealprooftracker/model.go new file mode 100644 index 00000000..5326dee9 --- /dev/null +++ b/service/dealprooftracker/model.go @@ -0,0 +1,30 @@ +package dealprooftracker + +import ( + "time" +) + + +// DealProofTracking stores proof tracking info for a deal/sector +type DealProofTracking struct { + DealID uint64 `gorm:"primaryKey;column:deal_id"` + Provider string `gorm:"column:provider;index"` + SectorID int64 `gorm:"column:sector_id;index"` + SectorStartEpoch int32 `gorm:"column:sector_start_epoch"` + CurrentDeadlineIndex int32 `gorm:"column:current_deadline_index"` + PeriodStartEpoch int32 `gorm:"column:period_start_epoch"` + EstimatedNextProofTime time.Time `gorm:"column:estimated_next_proof_time"` + Faults int `gorm:"column:faults"` + Recoveries int `gorm:"column:recoveries"` + LastUpdatedAt time.Time `gorm:"column:last_updated_at;autoUpdateTime"` +} + +// TableName sets the table name for GORM +func (DealProofTracking) TableName() string { + return "deal_proof_trackings" +} + +// AutoMigrateDealProofTracking migrates the table +func AutoMigrateDealProofTracking(db interface{ AutoMigrate(...interface{}) error }) error { + return db.AutoMigrate(&DealProofTracking{}) +}