diff --git a/cmd/AUTO_PREP_DEALS_INTEGRATION_TEST.md b/cmd/AUTO_PREP_DEALS_INTEGRATION_TEST.md deleted file mode 100644 index 3b6ec820..00000000 --- a/cmd/AUTO_PREP_DEALS_INTEGRATION_TEST.md +++ /dev/null @@ -1,167 +0,0 @@ -# Auto-Prep-Deals Integration Test - -This document describes the comprehensive integration test for the auto-prep-deals functionality in Singularity. - -## Overview - -The integration test validates the complete auto-prep-deals workflow from preparation creation to deal schedule generation. It ensures that all components work together correctly and provides confidence in the implementation. - -## Test Files - -- `auto_prep_deals_integration_test.go` - Main integration test implementation - -## What the Test Validates - -### Core Workflow (`TestAutoPrepDealsIntegration`) - -1. **Preparation Creation with Auto-Deals** - - Creates preparation with `--auto-create-deals` flag - - Verifies deal configuration is correctly stored - - Tests various deal parameters (provider, price, verification) - -2. **Auto-Storage Creation** - - Validates that source and output storages are automatically created - - Checks storage configuration and paths - -3. **Deal Configuration Validation** - - Verifies `AutoCreateDeals` is enabled - - Checks deal provider, pricing, and verification settings - - Validates configuration persistence in database - -4. **Job Progression** - - Runs dataset worker to process scan/pack/daggen jobs - - Validates job completion and error handling - - Tests worker orchestration - -5. **Deal Schedule Auto-Creation** - - Checks if deal schedules are automatically created - - Validates scheduling conditions and triggers - - Tests async deal creation workflow - -6. **Manual Triggering** - - Tests manual deal schedule creation - - Validates manual override capabilities - - Checks wallet validation requirements - -### Error Scenarios (`TestAutoPrepDealsErrorScenarios`) - -1. **Invalid Provider Validation** - - Tests behavior with invalid storage provider IDs - - Validates error handling and user feedback - -2. **Insufficient Balance** - - Tests high pricing scenarios - - Validates balance checking (when enabled) - -3. **Invalid Storage Provider** - - Tests malformed provider IDs - - Validates input validation - -4. **Auto-Create-Deals Disabled** - - Verifies that deal schedules are NOT created when disabled - - Tests configuration isolation - -## Test Architecture - -### Test Data Setup -- Creates realistic test files of various sizes (1KB to 10MB) -- Uses temporary directories for source and output -- Generates deterministic test data using `testutil.GenerateFixedBytes` - -### Database Testing -- Uses existing `testutil.All()` infrastructure -- Tests against SQLite (and MySQL/PostgreSQL if available) -- Each test gets isolated database instance -- Automatic cleanup after tests - -### Error Handling -- Tests gracefully handle missing dependencies (Lotus API, databases) -- Validation failures are logged but don't fail tests when external services are unavailable -- Clear error messages distinguish between expected and unexpected failures - -## Running the Tests - -```bash -# Run the main integration test -go test -v ./cmd -run TestAutoPrepDealsIntegration -timeout 10m - -# Run error scenario tests -go test -v ./cmd -run TestAutoPrepDealsErrorScenarios -timeout 5m - -# Run both tests -go test -v ./cmd -run "TestAutoPrepDeals.*" -timeout 10m -``` - -## Expected Results - -### Successful Test Run - -When all components are working correctly: -- ✅ Preparation created with auto-deals enabled -- ✅ 2+ local storages auto-created (source and output) -- ✅ Deal configuration correctly stored and validated -- ✅ Dataset worker completes without errors -- ✅ Deal schedules may or may not be created (depends on conditions) -- ✅ Manual schedule creation works (when wallets are configured) - -### Test Logs - -The test provides detailed logging: -- Storage creation details (IDs, names, paths) -- Deal configuration validation -- Worker execution output -- Schedule creation status -- Error conditions and handling - -## Understanding Test Results - -### "No deal schedules created yet" -This is **expected** when: -- Preparation hasn't reached the required threshold for auto-deals -- Wallet validation is enabled but no wallets are attached -- External dependencies (Lotus API) are not available in test environment - -### "Manual trigger failed" -This is **expected** when: -- No wallets are attached to the preparation (`no wallet attached to preparation: not found`) -- Lotus API is not available for validation -- Deal provider validation fails - -### "Validation may be disabled in test environment" -This indicates that: -- External API calls (Lotus) are not available during testing -- Validation is bypassed for test environment -- Configuration is still properly stored and processed - -## Building Confidence - -This integration test builds confidence by: - -1. **End-to-End Validation**: Tests the complete workflow from CLI command to database state -2. **Real Data Processing**: Uses realistic file sizes and structures -3. **Error Handling**: Validates graceful degradation when external services unavailable -4. **Configuration Testing**: Ensures all deal parameters are correctly processed and stored -5. **Database Integration**: Validates data persistence and retrieval -6. **Worker Integration**: Tests job orchestration and processing -7. **Multiple Scenarios**: Covers both happy path and error conditions - -## Extending the Test - -To add new test scenarios: - -1. **Add new test functions** following the pattern `test` -2. **Use the existing setup utilities** (`createTestFiles`, `Runner`, etc.) -3. **Follow the assertion patterns** using `require` for critical checks -4. **Add logging** with `t.Logf()` for debugging -5. **Handle expected failures** gracefully to avoid false negatives - -## Integration with CI/CD - -The test is designed to work in CI environments: -- Uses temporary directories for isolation -- Handles missing external dependencies gracefully -- Provides clear pass/fail criteria -- Includes timeout protection -- Generates detailed logs for debugging - -This integration test provides comprehensive validation of the auto-prep-deals functionality and gives developers confidence that the complete workflow operates correctly. \ No newline at end of file diff --git a/cmd/app.go b/cmd/app.go index 0196c5c7..6253b359 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -15,6 +15,7 @@ import ( "github.com/data-preservation-programs/singularity/cmd/dataprep" "github.com/data-preservation-programs/singularity/cmd/deal" "github.com/data-preservation-programs/singularity/cmd/deal/schedule" + "github.com/data-preservation-programs/singularity/cmd/deal/state" "github.com/data-preservation-programs/singularity/cmd/dealtemplate" "github.com/data-preservation-programs/singularity/cmd/errorlog" "github.com/data-preservation-programs/singularity/cmd/ez" @@ -147,6 +148,21 @@ Upgrading: schedule.RemoveCmd, }, }, + { + Name: "state", + Usage: "Deal state management and monitoring", + Description: `Comprehensive deal state management tools including: +- View and filter deal state changes +- Export state history to CSV/JSON +- Manual recovery and repair operations +- State change statistics and analytics`, + Subcommands: []*cli.Command{ + state.ListCmd, + state.GetCmd, + state.StatsCmd, + state.RepairCmd, + }, + }, deal.SendManualCmd, deal.ListCmd, }, diff --git a/cmd/deal/state/export.go b/cmd/deal/state/export.go new file mode 100644 index 00000000..4b1c7193 --- /dev/null +++ b/cmd/deal/state/export.go @@ -0,0 +1,145 @@ +package state + +import ( + "encoding/csv" + "encoding/json" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/cockroachdb/errors" + "github.com/data-preservation-programs/singularity/model" +) + +// exportStateChanges exports state changes to the specified format and file path +func exportStateChanges(stateChanges []model.DealStateChange, format, outputPath string) error { + // Validate and clean the output path to prevent directory traversal + cleanPath := filepath.Clean(outputPath) + if filepath.IsAbs(cleanPath) { + return errors.New("absolute paths are not allowed for security reasons") + } + // Check for directory traversal attempts + if strings.Contains(cleanPath, "..") { + return errors.New("directory traversal using '..' is not allowed") + } + if len(cleanPath) > 255 { + return errors.New("output path is too long") + } + + switch format { + case "csv": + return exportToCSV(stateChanges, cleanPath) + case "json": + return exportToJSON(stateChanges, cleanPath) + default: + return errors.Errorf("unsupported export format: %s", format) + } +} + +// exportToCSV exports state changes to a CSV file +func exportToCSV(stateChanges []model.DealStateChange, outputPath string) (err error) { + file, err := os.Create(outputPath) // #nosec G304 -- path is validated in exportStateChanges + if err != nil { + return errors.Wrap(err, "failed to create CSV file") + } + defer func() { + if closeErr := file.Close(); closeErr != nil && err == nil { + err = errors.Wrap(closeErr, "failed to close CSV file") + } + }() + + writer := csv.NewWriter(file) + defer writer.Flush() + + // Write CSV header + // Note: Both ID and DealID are database IDs, not CIDs + // ID = state change record database ID, DealID = internal singularity deal database ID + header := []string{ + "StateChangeID", // Database ID of the state change record + "DealID", // Internal singularity deal database ID + "PreviousState", + "NewState", + "Timestamp", + "EpochHeight", + "SectorID", + "ProviderID", + "ClientAddress", + "Metadata", + } + if err := writer.Write(header); err != nil { + return errors.Wrap(err, "failed to write CSV header") + } + + // Write state change records + for _, change := range stateChanges { + record := []string{ + strconv.FormatUint(change.ID, 10), + strconv.FormatUint(uint64(change.DealID), 10), + string(change.PreviousState), + string(change.NewState), + change.Timestamp.Format("2006-01-02 15:04:05"), + formatOptionalInt32(change.EpochHeight), + formatOptionalString(change.SectorID), + change.ProviderID, + change.ClientAddress, + change.Metadata, + } + if err := writer.Write(record); err != nil { + return errors.Wrap(err, "failed to write CSV record") + } + } + + return nil +} + +// exportToJSON exports state changes to a JSON file +func exportToJSON(stateChanges []model.DealStateChange, outputPath string) (err error) { + file, err := os.Create(outputPath) // #nosec G304 -- path is validated in exportStateChanges + if err != nil { + return errors.Wrap(err, "failed to create JSON file") + } + defer func() { + if closeErr := file.Close(); closeErr != nil && err == nil { + err = errors.Wrap(closeErr, "failed to close JSON file") + } + }() + + // Create export structure with metadata + exportData := struct { + Metadata struct { + ExportTime string `json:"exportTime"` + TotalCount int `json:"totalCount"` + } `json:"metadata"` + StateChanges []model.DealStateChange `json:"stateChanges"` + }{ + StateChanges: stateChanges, + } + + exportData.Metadata.ExportTime = time.Now().Format(time.RFC3339) + exportData.Metadata.TotalCount = len(stateChanges) + + encoder := json.NewEncoder(file) + encoder.SetIndent("", " ") + if err := encoder.Encode(exportData); err != nil { + return errors.Wrap(err, "failed to encode JSON") + } + + return nil +} + +// Helper functions for formatting optional fields +func formatOptionalInt32(value *int32) string { + if value == nil { + return "" + } + return strconv.FormatInt(int64(*value), 10) +} + +func formatOptionalString(value *string) string { + if value == nil { + return "" + } + return *value +} diff --git a/cmd/deal/state/get.go b/cmd/deal/state/get.go new file mode 100644 index 00000000..12c4d9aa --- /dev/null +++ b/cmd/deal/state/get.go @@ -0,0 +1,105 @@ +package state + +import ( + "strconv" + "time" + + "github.com/cockroachdb/errors" + "github.com/data-preservation-programs/singularity/cmd/cliutil" + "github.com/data-preservation-programs/singularity/database" + "github.com/data-preservation-programs/singularity/handler/statechange" + "github.com/data-preservation-programs/singularity/model" + "github.com/urfave/cli/v2" +) + +// getDealStateChangesAction handles the get command action +func getDealStateChangesAction(c *cli.Context) error { + if c.NArg() != 1 { + return errors.New("deal ID is required") + } + + dealIDStr := c.Args().Get(0) + dealID, err := strconv.ParseUint(dealIDStr, 10, 64) + if err != nil { + return errors.Wrap(err, "invalid deal ID format") + } + + db, closer, err := database.OpenFromCLI(c) + if err != nil { + return errors.WithStack(err) + } + defer func() { _ = closer.Close() }() + + // Get state changes for the specific deal + stateChanges, err := statechange.Default.GetDealStateChangesHandler(c.Context, db, model.DealID(dealID)) + if err != nil { + return errors.WithStack(err) + } + + // Handle export if requested + exportFormat := c.String("export") + if exportFormat != "" { + return handleDealStateExport(c, stateChanges, dealIDStr, exportFormat) + } + + // Print results to console + if len(stateChanges) == 0 { + cliutil.Print(c, map[string]interface{}{ + "message": "No state changes found for deal " + dealIDStr, + }) + return nil + } + + cliutil.Print(c, stateChanges) + return nil +} + +// handleDealStateExport handles exporting deal state changes to file +func handleDealStateExport(c *cli.Context, stateChanges []model.DealStateChange, dealIDStr string, exportFormat string) error { + outputPath := c.String("output") + if outputPath == "" { + timestamp := time.Now().Format("20060102-150405") + switch exportFormat { + case "csv": + outputPath = "deal-" + dealIDStr + "-states-" + timestamp + ".csv" + case "json": + outputPath = "deal-" + dealIDStr + "-states-" + timestamp + ".json" + default: + return errors.Errorf("unsupported export format: %s (supported: csv, json)", exportFormat) + } + } + + err := exportStateChanges(stateChanges, exportFormat, outputPath) + if err != nil { + return errors.WithStack(err) + } + + cliutil.Print(c, map[string]interface{}{ + "message": "Deal state changes exported successfully", + "dealId": dealIDStr, + "format": exportFormat, + "outputPath": outputPath, + "count": len(stateChanges), + }) + return nil +} + +var GetCmd = &cli.Command{ + Name: "get", + Usage: "Get state changes for a specific deal", + ArgsUsage: "", + Description: `Get all state changes for a specific deal ordered by timestamp. +This command shows the complete state transition history for a given deal. +Note: deal-id refers to the internal database ID, not a content CID.`, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "export", + Usage: "Export format (csv, json). If specified, results will be exported to a file instead of displayed", + }, + &cli.StringFlag{ + Name: "output", + Usage: "Output file path for export (optional, defaults to deal--states-.csv/json)", + }, + }, + Action: getDealStateChangesAction, +} diff --git a/cmd/deal/state/list.go b/cmd/deal/state/list.go new file mode 100644 index 00000000..e31785bf --- /dev/null +++ b/cmd/deal/state/list.go @@ -0,0 +1,210 @@ +package state + +import ( + "strconv" + "time" + + "github.com/cockroachdb/errors" + "github.com/data-preservation-programs/singularity/cmd/cliutil" + "github.com/data-preservation-programs/singularity/database" + "github.com/data-preservation-programs/singularity/handler/statechange" + "github.com/data-preservation-programs/singularity/model" + "github.com/urfave/cli/v2" +) + +// listStateChangesAction handles the list command action +func listStateChangesAction(c *cli.Context) error { + db, closer, err := database.OpenFromCLI(c) + if err != nil { + return errors.WithStack(err) + } + defer func() { _ = closer.Close() }() + + query, err := buildStateChangeQuery(c) + if err != nil { + return err + } + + // Get state changes + response, err := statechange.Default.ListStateChangesHandler(c.Context, db, *query) + if err != nil { + return errors.WithStack(err) + } + + // Handle export if requested + exportFormat := c.String("export") + if exportFormat != "" { + return handleStateChangeExport(c, response.StateChanges, exportFormat, response.Total) + } + + // Print results to console + cliutil.Print(c, response) + return nil +} + +// buildStateChangeQuery builds the query from CLI flags +func buildStateChangeQuery(c *cli.Context) (*model.DealStateChangeQuery, error) { + query := &model.DealStateChangeQuery{} + + // Parse deal ID if provided + if dealIDStr := c.String("deal-id"); dealIDStr != "" { + dealID, err := strconv.ParseUint(dealIDStr, 10, 64) + if err != nil { + return nil, errors.Wrap(err, "invalid deal ID format") + } + dealIDValue := model.DealID(dealID) + query.DealID = &dealIDValue + } + + // Parse state if provided + if stateStr := c.String("state"); stateStr != "" { + state := model.DealState(stateStr) + query.State = &state + } + + // Parse provider ID if provided + if providerStr := c.String("provider"); providerStr != "" { + query.ProviderID = &providerStr + } + + // Parse client address if provided + if clientStr := c.String("client"); clientStr != "" { + query.ClientAddress = &clientStr + } + + // Parse start time if provided + if startTimeStr := c.String("start-time"); startTimeStr != "" { + startTime, err := time.Parse(time.RFC3339, startTimeStr) + if err != nil { + return nil, errors.Wrap(err, "invalid start-time format, expected RFC3339 (e.g., 2023-01-01T00:00:00Z)") + } + query.StartTime = &startTime + } + + // Parse end time if provided + if endTimeStr := c.String("end-time"); endTimeStr != "" { + endTime, err := time.Parse(time.RFC3339, endTimeStr) + if err != nil { + return nil, errors.Wrap(err, "invalid end-time format, expected RFC3339 (e.g., 2023-12-31T23:59:59Z)") + } + query.EndTime = &endTime + } + + // Set pagination + if c.IsSet("offset") { + offset := c.Int("offset") + query.Offset = &offset + } + + if c.IsSet("limit") { + limit := c.Int("limit") + query.Limit = &limit + } + + // Set sorting + if orderBy := c.String("order-by"); orderBy != "" { + query.OrderBy = &orderBy + } + + if order := c.String("order"); order != "" { + query.Order = &order + } + + return query, nil +} + +// handleStateChangeExport handles exporting state changes to file +func handleStateChangeExport(c *cli.Context, stateChanges []model.DealStateChange, exportFormat string, totalInDB int64) error { + outputPath := c.String("output") + if outputPath == "" { + timestamp := time.Now().Format("20060102-150405") + switch exportFormat { + case "csv": + outputPath = "statechanges-" + timestamp + ".csv" + case "json": + outputPath = "statechanges-" + timestamp + ".json" + default: + return errors.Errorf("unsupported export format: %s (supported: csv, json)", exportFormat) + } + } + + err := exportStateChanges(stateChanges, exportFormat, outputPath) + if err != nil { + return errors.WithStack(err) + } + + cliutil.Print(c, map[string]interface{}{ + "message": "State changes exported successfully", + "format": exportFormat, + "outputPath": outputPath, + "totalCount": len(stateChanges), + "totalInDB": totalInDB, + }) + return nil +} + +var ListCmd = &cli.Command{ + Name: "list", + Aliases: []string{"ls"}, + Usage: "List deal state changes with optional filtering and pagination", + Description: `List deal state changes with comprehensive filtering options: +- Filter by deal ID (internal database ID), state, provider, client address, and time range +- Support for pagination and sorting +- Export results to CSV or JSON formats +- Note: Deal IDs and state change IDs are database IDs, not content CIDs`, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "deal-id", + Usage: "Filter by specific deal ID (internal database ID, not CID)", + }, + &cli.StringFlag{ + Name: "state", + Usage: "Filter by deal state (proposed, published, active, expired, proposal_expired, rejected, slashed, error)", + }, + &cli.StringFlag{ + Name: "provider", + Usage: "Filter by storage provider ID (e.g., f01234)", + }, + &cli.StringFlag{ + Name: "client", + Usage: "Filter by client wallet address", + }, + &cli.StringFlag{ + Name: "start-time", + Usage: "Filter changes after this time (RFC3339 format, e.g., 2023-01-01T00:00:00Z)", + }, + &cli.StringFlag{ + Name: "end-time", + Usage: "Filter changes before this time (RFC3339 format, e.g., 2023-12-31T23:59:59Z)", + }, + &cli.IntFlag{ + Name: "offset", + Usage: "Number of records to skip for pagination", + DefaultText: "0", + }, + &cli.IntFlag{ + Name: "limit", + Usage: "Maximum number of records to return", + DefaultText: "100", + }, + &cli.StringFlag{ + Name: "order-by", + Usage: "Field to sort by (timestamp, dealId, newState, providerId, clientAddress)", + DefaultText: "timestamp", + }, + &cli.StringFlag{ + Name: "order", + Usage: "Sort order (asc, desc)", + DefaultText: "desc", + }, + &cli.StringFlag{ + Name: "export", + Usage: "Export format (csv, json). If specified, results will be exported to a file instead of displayed", + }, + &cli.StringFlag{ + Name: "output", + Usage: "Output file path for export (optional, defaults to statechanges-.csv/json)", + }, + }, + Action: listStateChangesAction, +} diff --git a/cmd/deal/state/repair.go b/cmd/deal/state/repair.go new file mode 100644 index 00000000..6d43f412 --- /dev/null +++ b/cmd/deal/state/repair.go @@ -0,0 +1,438 @@ +package state + +import ( + "fmt" + "strconv" + + "github.com/cockroachdb/errors" + "github.com/data-preservation-programs/singularity/cmd/cliutil" + "github.com/data-preservation-programs/singularity/database" + "github.com/data-preservation-programs/singularity/model" + "github.com/data-preservation-programs/singularity/service/statetracker" + "github.com/urfave/cli/v2" + "gorm.io/gorm" +) + +// forceTransitionAction handles the force-transition subcommand +func forceTransitionAction(c *cli.Context) error { + if c.NArg() != 2 { + return errors.New("deal ID and new state are required") + } + + dealIDStr := c.Args().Get(0) + newStateStr := c.Args().Get(1) + + dealID, err := strconv.ParseUint(dealIDStr, 10, 64) + if err != nil { + return errors.Wrap(err, "invalid deal ID format") + } + + newState, err := validateDealState(newStateStr) + if err != nil { + return err + } + + db, closer, err := database.OpenFromCLI(c) + if err != nil { + return errors.WithStack(err) + } + defer func() { _ = closer.Close() }() + + // Get current deal information + var deal model.Deal + err = db.First(&deal, dealID).Error + if err != nil { + return errors.Wrap(err, "failed to find deal") + } + + if c.Bool("dry-run") { + cliutil.Print(c, map[string]interface{}{ + "message": "DRY RUN: Would force state transition", + "dealId": dealIDStr, + "currentState": deal.State, + "newState": newState, + "provider": deal.Provider, + "clientAddress": deal.ClientActorID, + "reason": c.String("reason"), + }) + return nil + } + + // Parse optional parameters + epochHeight, sectorID, err := parseOptionalTransitionParams(c) + if err != nil { + return err + } + + // Create state tracker and record the forced transition + tracker := statetracker.NewStateChangeTracker(db) + metadata := &statetracker.StateChangeMetadata{ + Reason: c.String("reason"), + AdditionalFields: map[string]interface{}{ + "operationType": "manual_force_transition", + "operator": "cli", + }, + } + + previousState := &deal.State + err = tracker.TrackStateChangeWithDetails( + c.Context, + model.DealID(dealID), + previousState, + newState, + epochHeight, + sectorID, + deal.Provider, + deal.ClientActorID, + metadata, + ) + if err != nil { + return errors.Wrap(err, "failed to record state change") + } + + // Update the deal state in the database + err = db.Model(&deal).Update("state", newState).Error + if err != nil { + return errors.Wrap(err, "failed to update deal state") + } + + cliutil.Print(c, map[string]interface{}{ + "message": "Deal state transition forced successfully", + "dealId": dealIDStr, + "previousState": *previousState, + "newState": newState, + "reason": c.String("reason"), + }) + return nil +} + +// resetErrorDealsAction handles the reset-error-deals subcommand +func resetErrorDealsAction(c *cli.Context) error { + db, closer, err := database.OpenFromCLI(c) + if err != nil { + return errors.WithStack(err) + } + defer func() { _ = closer.Close() }() + + resetToState := model.DealState(c.String("reset-to-state")) + + // Build query for error deals + query, err := buildErrorDealsQuery(c, db) + if err != nil { + return err + } + + // Get deals to reset + var deals []model.Deal + err = query.Find(&deals).Error + if err != nil { + return errors.Wrap(err, "failed to find error deals") + } + + if len(deals) == 0 { + cliutil.Print(c, map[string]interface{}{ + "message": "No error deals found matching the criteria", + }) + return nil + } + + if c.Bool("dry-run") { + cliutil.Print(c, map[string]interface{}{ + "message": "DRY RUN: Would reset the following deals", + "dealCount": len(deals), + "resetToState": resetToState, + "deals": deals, + }) + return nil + } + + // Reset deals + resetCount := performDealResets(c, db, deals, resetToState) + + cliutil.Print(c, map[string]interface{}{ + "message": "Error deals reset successfully", + "totalFound": len(deals), + "successfulReset": resetCount, + "resetToState": resetToState, + }) + return nil +} + +// cleanupOrphanedChangesAction handles the cleanup-orphaned-changes subcommand +func cleanupOrphanedChangesAction(c *cli.Context) error { + db, closer, err := database.OpenFromCLI(c) + if err != nil { + return errors.WithStack(err) + } + defer func() { _ = closer.Close() }() + + // Find orphaned state changes + orphanedChanges, err := findOrphanedStateChanges(db) + if err != nil { + return err + } + + if len(orphanedChanges) == 0 { + cliutil.Print(c, map[string]interface{}{ + "message": "No orphaned state changes found", + }) + return nil + } + + if c.Bool("dry-run") { + cliutil.Print(c, map[string]interface{}{ + "message": "DRY RUN: Would delete orphaned state changes", + "orphanCount": len(orphanedChanges), + "orphanedIds": extractStateChangeIds(orphanedChanges), + }) + return nil + } + + // Delete orphaned state changes + err = deleteOrphanedChanges(db, orphanedChanges) + if err != nil { + return err + } + + cliutil.Print(c, map[string]interface{}{ + "message": "Orphaned state changes cleaned up successfully", + "deletedCount": len(orphanedChanges), + }) + return nil +} + +// Helper functions + +// validateDealState validates that the provided state is valid +func validateDealState(stateStr string) (model.DealState, error) { + newState := model.DealState(stateStr) + validStates := []model.DealState{ + "proposed", "published", "active", "expired", + "proposal_expired", "rejected", "slashed", "error", + } + + for _, validState := range validStates { + if newState == validState { + return newState, nil + } + } + return "", errors.Errorf("invalid state: %s. Valid states: %v", stateStr, validStates) +} + +// parseOptionalTransitionParams parses optional epoch and sector ID parameters +func parseOptionalTransitionParams(c *cli.Context) (*int32, *string, error) { + var epochHeight *int32 + if epochStr := c.String("epoch"); epochStr != "" { + epoch, err := strconv.ParseInt(epochStr, 10, 32) + if err != nil { + return nil, nil, errors.Wrap(err, "invalid epoch format") + } + epochInt32 := int32(epoch) + epochHeight = &epochInt32 + } + + var sectorID *string + if sector := c.String("sector-id"); sector != "" { + sectorID = §or + } + + return epochHeight, sectorID, nil +} + +// buildErrorDealsQuery builds the query for finding error deals +func buildErrorDealsQuery(c *cli.Context, db *gorm.DB) (*gorm.DB, error) { + query := db.Where("state = ?", "error") + + // Filter by specific deal IDs if provided + dealIDs := c.StringSlice("deal-id") + if len(dealIDs) > 0 { + var dealIDValues []uint64 + for _, idStr := range dealIDs { + id, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + return nil, errors.Wrapf(err, "invalid deal ID: %s", idStr) + } + dealIDValues = append(dealIDValues, id) + } + query = query.Where("id IN ?", dealIDValues) + } + + // Filter by provider if specified + if provider := c.String("provider"); provider != "" { + query = query.Where("provider = ?", provider) + } + + // Apply limit + query = query.Limit(c.Int("limit")) + + return query, nil +} + +// performDealResets resets the provided deals to the specified state +func performDealResets(c *cli.Context, db *gorm.DB, deals []model.Deal, resetToState model.DealState) int { + tracker := statetracker.NewStateChangeTracker(db) + resetCount := 0 + + for _, deal := range deals { + metadata := &statetracker.StateChangeMetadata{ + Reason: "Manual error state reset", + AdditionalFields: map[string]interface{}{ + "operationType": "error_state_reset", + "operator": "cli", + }, + } + + previousState := &deal.State + err := tracker.TrackStateChangeWithDetails( + c.Context, + deal.ID, + previousState, + resetToState, + nil, + nil, + deal.Provider, + deal.ClientActorID, + metadata, + ) + if err != nil { + cliutil.Print(c, map[string]interface{}{ + "warning": fmt.Sprintf("Failed to track state change for deal %d: %v", deal.ID, err), + }) + continue + } + + // Update the deal state + err = db.Model(&deal).Update("state", resetToState).Error + if err != nil { + cliutil.Print(c, map[string]interface{}{ + "warning": fmt.Sprintf("Failed to update deal %d state: %v", deal.ID, err), + }) + continue + } + + resetCount++ + } + + return resetCount +} + +// findOrphanedStateChanges finds state changes without corresponding deals +func findOrphanedStateChanges(db *gorm.DB) ([]model.DealStateChange, error) { + var orphanedChanges []model.DealStateChange + err := db.Table("deal_state_changes"). + Select("deal_state_changes.*"). + Joins("LEFT JOIN deals ON deals.id = deal_state_changes.deal_id"). + Where("deals.id IS NULL"). + Find(&orphanedChanges).Error + if err != nil { + return nil, errors.Wrap(err, "failed to find orphaned state changes") + } + return orphanedChanges, nil +} + +// deleteOrphanedChanges deletes the provided orphaned state changes +func deleteOrphanedChanges(db *gorm.DB, orphanedChanges []model.DealStateChange) error { + var orphanedIds []uint64 + for _, change := range orphanedChanges { + orphanedIds = append(orphanedIds, change.ID) + } + + err := db.Where("id IN ?", orphanedIds).Delete(&model.DealStateChange{}).Error + if err != nil { + return errors.Wrap(err, "failed to delete orphaned state changes") + } + return nil +} + +// extractStateChangeIds extracts state change IDs from a slice of state changes +func extractStateChangeIds(changes []model.DealStateChange) []uint64 { + ids := make([]uint64, len(changes)) + for i, change := range changes { + ids[i] = change.ID + } + return ids +} + +var RepairCmd = &cli.Command{ + Name: "repair", + Usage: "Manual recovery and repair commands for deal state management", + Description: `Provides manual recovery and repair capabilities for deal state management: +- Force state transitions for stuck deals +- Reset deal states to allow retry +- Repair corrupted state transitions +- Bulk operations for multiple deals`, + Subcommands: []*cli.Command{ + { + Name: "force-transition", + Usage: "Force a state transition for a specific deal", + ArgsUsage: " ", + Description: `Force a deal to transition to a new state. Use with caution! +Valid states: proposed, published, active, expired, proposal_expired, rejected, slashed, error +Note: deal-id refers to the internal database ID, not a content CID.`, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "reason", + Usage: "Reason for the forced state transition", + Value: "Manual repair operation", + }, + &cli.StringFlag{ + Name: "epoch", + Usage: "Filecoin epoch height for the state change", + }, + &cli.StringFlag{ + Name: "sector-id", + Usage: "Storage provider sector ID", + }, + &cli.BoolFlag{ + Name: "dry-run", + Usage: "Show what would be done without making changes", + }, + }, + Action: forceTransitionAction, + }, + { + Name: "reset-error-deals", + Usage: "Reset deals in error state to allow retry", + Description: `Reset deals that are in error state back to their previous valid state. +This allows the system to retry operations that may have failed temporarily.`, + Flags: []cli.Flag{ + &cli.StringSliceFlag{ + Name: "deal-id", + Usage: "Specific deal IDs to reset (internal database IDs, not CIDs, can be specified multiple times)", + }, + &cli.StringFlag{ + Name: "provider", + Usage: "Reset error deals for a specific provider", + }, + &cli.StringFlag{ + Name: "reset-to-state", + Usage: "State to reset deals to (default: proposed)", + Value: "proposed", + }, + &cli.IntFlag{ + Name: "limit", + Usage: "Maximum number of deals to reset", + Value: 100, + }, + &cli.BoolFlag{ + Name: "dry-run", + Usage: "Show what would be done without making changes", + }, + }, + Action: resetErrorDealsAction, + }, + { + Name: "cleanup-orphaned-changes", + Usage: "Clean up orphaned state changes without corresponding deals", + Description: `Remove state change records that reference deals that no longer exist. +This helps maintain database consistency and reduce storage usage.`, + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "dry-run", + Usage: "Show what would be deleted without making changes", + }, + }, + Action: cleanupOrphanedChangesAction, + }, + }, +} diff --git a/cmd/deal/state/state_test.go b/cmd/deal/state/state_test.go new file mode 100644 index 00000000..80dd55d8 --- /dev/null +++ b/cmd/deal/state/state_test.go @@ -0,0 +1,519 @@ +package state + +import ( + "context" + "os" + "testing" + "time" + + "github.com/data-preservation-programs/singularity/database" + "github.com/data-preservation-programs/singularity/model" + "github.com/data-preservation-programs/singularity/service/statetracker" + "github.com/stretchr/testify/require" + "gorm.io/gorm" +) + +func setupTestDBClean(t *testing.T) (*gorm.DB, func()) { + connStr := "sqlite:" + t.TempDir() + "/test.db" + db, closer, err := database.OpenWithLogger(connStr) + require.NoError(t, err) + + err = model.GetMigrator(db).Migrate() + require.NoError(t, err) + + return db, func() { _ = closer.Close() } +} + +func createTestDealClean(t *testing.T, db *gorm.DB, dealID uint64, provider string) *model.Deal { + deal := &model.Deal{ + ID: model.DealID(dealID), + State: model.DealProposed, + Provider: provider, + ClientActorID: "f01000", + PieceCID: model.CID{}, + PieceSize: 1024, + Price: "1000000000000000000", + Verified: false, + } + err := db.Create(deal).Error + require.NoError(t, err) + return deal +} + +func TestStateChangeTracking(t *testing.T) { + db, cleanup := setupTestDBClean(t) + defer cleanup() + + ctx := context.Background() + tracker := statetracker.NewStateChangeTracker(db) + + deal := createTestDealClean(t, db, 123, "f01234") + + metadata := &statetracker.StateChangeMetadata{ + Reason: "Test state change", + StoragePrice: deal.Price, + } + + prevState := deal.State + err := tracker.TrackStateChange(ctx, deal, &prevState, model.DealPublished, metadata) + require.NoError(t, err) + + var stateChanges []model.DealStateChange + err = db.Where("deal_id = ?", deal.ID).Find(&stateChanges).Error + require.NoError(t, err) + require.Len(t, stateChanges, 1) + require.Equal(t, model.DealPublished, stateChanges[0].NewState) +} + +func TestStateChangeFiltering(t *testing.T) { + db, cleanup := setupTestDBClean(t) + defer cleanup() + + ctx := context.Background() + tracker := statetracker.NewStateChangeTracker(db) + + deal1 := createTestDealClean(t, db, 100, "f01234") + deal2 := createTestDealClean(t, db, 101, "f01235") + + metadata := &statetracker.StateChangeMetadata{ + Reason: "Test filtering", + StoragePrice: "1000000000000000000", + } + + prevState1 := deal1.State + err := tracker.TrackStateChange(ctx, deal1, &prevState1, model.DealActive, metadata) + require.NoError(t, err) + + prevState2 := deal2.State + err = tracker.TrackStateChange(ctx, deal2, &prevState2, model.DealPublished, metadata) + require.NoError(t, err) + + // Test provider filtering + var f01234Changes []model.DealStateChange + err = db.Where("provider_id = ?", "f01234").Find(&f01234Changes).Error + require.NoError(t, err) + require.Len(t, f01234Changes, 1) + + // Test state filtering + var activeChanges []model.DealStateChange + err = db.Where("new_state = ?", model.DealActive).Find(&activeChanges).Error + require.NoError(t, err) + require.Len(t, activeChanges, 1) + require.Equal(t, model.DealID(100), activeChanges[0].DealID) +} + +func TestStateChangeStatistics(t *testing.T) { + db, cleanup := setupTestDBClean(t) + defer cleanup() + + ctx := context.Background() + tracker := statetracker.NewStateChangeTracker(db) + + deal1 := createTestDealClean(t, db, 200, "f01234") + deal2 := createTestDealClean(t, db, 201, "f01234") + deal3 := createTestDealClean(t, db, 202, "f01235") + + metadata := &statetracker.StateChangeMetadata{ + Reason: "Stats test", + StoragePrice: "1000000000000000000", + } + + prevState := model.DealProposed + err := tracker.TrackStateChange(ctx, deal1, &prevState, model.DealActive, metadata) + require.NoError(t, err) + + err = tracker.TrackStateChange(ctx, deal2, &prevState, model.DealActive, metadata) + require.NoError(t, err) + + err = tracker.TrackStateChange(ctx, deal3, &prevState, model.DealPublished, metadata) + require.NoError(t, err) + + // Test overall statistics + var totalCount int64 + err = db.Model(&model.DealStateChange{}).Count(&totalCount).Error + require.NoError(t, err) + require.Equal(t, int64(3), totalCount) + + var activeCount int64 + err = db.Model(&model.DealStateChange{}).Where("new_state = ?", model.DealActive).Count(&activeCount).Error + require.NoError(t, err) + require.Equal(t, int64(2), activeCount) + + // Test provider-specific stats + var providerStats map[string]int64 = make(map[string]int64) + rows, err := db.Model(&model.DealStateChange{}). + Select("provider_id, COUNT(*) as count"). + Group("provider_id"). + Rows() + require.NoError(t, err) + defer rows.Close() + + for rows.Next() { + var provider string + var count int64 + err = rows.Scan(&provider, &count) + require.NoError(t, err) + providerStats[provider] = count + } + + require.Equal(t, int64(2), providerStats["f01234"]) // deal1, deal2 + require.Equal(t, int64(1), providerStats["f01235"]) // deal3 +} + +func TestExportFunctions(t *testing.T) { + stateChanges := []model.DealStateChange{ + { + ID: 1, + DealID: model.DealID(123), + PreviousState: model.DealProposed, + NewState: model.DealActive, + Timestamp: time.Now(), + ProviderID: "f01234", + ClientAddress: "f01000", + }, + } + + t.Run("CSV Export", func(t *testing.T) { + tempFile := "test.csv" + defer os.Remove(tempFile) + + err := exportStateChanges(stateChanges, "csv", tempFile) + require.NoError(t, err) + + _, err = os.Stat(tempFile) + require.NoError(t, err) + }) + + t.Run("JSON Export", func(t *testing.T) { + tempFile := "test.json" + defer os.Remove(tempFile) + + err := exportStateChanges(stateChanges, "json", tempFile) + require.NoError(t, err) + + _, err = os.Stat(tempFile) + require.NoError(t, err) + }) + + t.Run("Unsupported Format", func(t *testing.T) { + err := exportStateChanges(stateChanges, "unsupported", "test.txt") + require.Error(t, err) + require.Contains(t, err.Error(), "unsupported export format") + }) +} + +func TestRepairOperations(t *testing.T) { + db, cleanup := setupTestDBClean(t) + defer cleanup() + + ctx := context.Background() + tracker := statetracker.NewStateChangeTracker(db) + + t.Run("Force Transition Logic", func(t *testing.T) { + deal := createTestDealClean(t, db, 456, "f01234") + + metadata := &statetracker.StateChangeMetadata{ + Reason: "Test force transition", + AdditionalFields: map[string]interface{}{ + "operationType": "manual_force_transition", + "operator": "test", + }, + } + + previousState := &deal.State + err := tracker.TrackStateChangeWithDetails( + ctx, + deal.ID, + previousState, + model.DealActive, + nil, + nil, + deal.Provider, + deal.ClientActorID, + metadata, + ) + require.NoError(t, err) + + // Update the deal state + err = db.Model(&deal).Update("state", model.DealActive).Error + require.NoError(t, err) + + // Verify deal state has changed + var updatedDeal model.Deal + err = db.First(&updatedDeal, deal.ID).Error + require.NoError(t, err) + require.Equal(t, model.DealActive, updatedDeal.State) + + // Verify state change was tracked + var stateChange model.DealStateChange + err = db.Where("deal_id = ? AND new_state = ?", deal.ID, model.DealActive).First(&stateChange).Error + require.NoError(t, err) + require.Equal(t, model.DealProposed, stateChange.PreviousState) + }) + + t.Run("Reset Error Deal Logic", func(t *testing.T) { + deal := createTestDealClean(t, db, 300, "f01234") + deal.State = model.DealErrored + err := db.Save(&deal).Error + require.NoError(t, err) + + metadata := &statetracker.StateChangeMetadata{ + Reason: "Manual error state reset", + AdditionalFields: map[string]interface{}{ + "operationType": "error_state_reset", + "operator": "test", + }, + } + + previousState := &deal.State + err = tracker.TrackStateChangeWithDetails( + ctx, + deal.ID, + previousState, + model.DealProposed, + nil, + nil, + deal.Provider, + deal.ClientActorID, + metadata, + ) + require.NoError(t, err) + + // Update the deal state + err = db.Model(&deal).Update("state", model.DealProposed).Error + require.NoError(t, err) + + // Verify deal was reset + var updatedDeal model.Deal + err = db.First(&updatedDeal, deal.ID).Error + require.NoError(t, err) + require.Equal(t, model.DealProposed, updatedDeal.State) + + // Verify state change was tracked + var stateChange model.DealStateChange + err = db.Where("deal_id = ? AND previous_state = ? AND new_state = ?", + deal.ID, model.DealErrored, model.DealProposed).First(&stateChange).Error + require.NoError(t, err) + }) +} + +func TestFormatHelpers(t *testing.T) { + require.Equal(t, "", formatOptionalInt32(nil)) + value := int32(123) + require.Equal(t, "123", formatOptionalInt32(&value)) + + require.Equal(t, "", formatOptionalString(nil)) + str := "test" + require.Equal(t, "test", formatOptionalString(&str)) +} + +func TestIntegrationWorkflow(t *testing.T) { + db, cleanup := setupTestDBClean(t) + defer cleanup() + + ctx := context.Background() + tracker := statetracker.NewStateChangeTracker(db) + + // Step 1: Create multiple deals with various states + deals := []struct { + id uint64 + provider string + }{ + {500, "f01234"}, + {501, "f01234"}, + {502, "f01235"}, + } + + for _, d := range deals { + createTestDealClean(t, db, d.id, d.provider) + } + + // Step 2: Simulate comprehensive state transitions + metadata := &statetracker.StateChangeMetadata{ + Reason: "Integration test state changes", + StoragePrice: "1000000000000000000", + } + + // Deal 500: proposed -> published -> active (2 transitions) + deal500 := &model.Deal{} + err := db.First(deal500, 500).Error + require.NoError(t, err) + + prevState := deal500.State + err = tracker.TrackStateChange(ctx, deal500, &prevState, model.DealPublished, metadata) + require.NoError(t, err) + deal500.State = model.DealPublished + err = db.Save(deal500).Error + require.NoError(t, err) + + prevState = deal500.State + err = tracker.TrackStateChange(ctx, deal500, &prevState, model.DealActive, metadata) + require.NoError(t, err) + deal500.State = model.DealActive + err = db.Save(deal500).Error + require.NoError(t, err) + + // Deal 501: proposed -> error (1 transition) + deal501 := &model.Deal{} + err = db.First(deal501, 501).Error + require.NoError(t, err) + + prevState = deal501.State + err = tracker.TrackStateChange(ctx, deal501, &prevState, model.DealErrored, metadata) + require.NoError(t, err) + deal501.State = model.DealErrored + err = db.Save(deal501).Error + require.NoError(t, err) + + // Deal 502: proposed -> published (1 transition) + deal502 := &model.Deal{} + err = db.First(deal502, 502).Error + require.NoError(t, err) + + prevState = deal502.State + err = tracker.TrackStateChange(ctx, deal502, &prevState, model.DealPublished, metadata) + require.NoError(t, err) + deal502.State = model.DealPublished + err = db.Save(deal502).Error + require.NoError(t, err) + + // Step 3: Verify comprehensive functionality + t.Run("Verify State Change Tracking", func(t *testing.T) { + var totalChanges int64 + err = db.Model(&model.DealStateChange{}).Count(&totalChanges).Error + require.NoError(t, err) + require.Equal(t, int64(4), totalChanges) // Exactly 4 state changes + + // Verify provider-specific changes + var f01234Changes int64 + err = db.Model(&model.DealStateChange{}).Where("provider_id = ?", "f01234").Count(&f01234Changes).Error + require.NoError(t, err) + require.Equal(t, int64(3), f01234Changes) // 2 for deal 500, 1 for deal 501 + + // Verify state-specific changes + var activeChanges int64 + err = db.Model(&model.DealStateChange{}).Where("new_state = ?", model.DealActive).Count(&activeChanges).Error + require.NoError(t, err) + require.Equal(t, int64(1), activeChanges) // Only deal 500 went active + }) + + t.Run("Export State Changes to Files", func(t *testing.T) { + var stateChanges []model.DealStateChange + err = db.Find(&stateChanges).Error + require.NoError(t, err) + require.Len(t, stateChanges, 4) + + // Test CSV export + csvFile := "integration-test.csv" + defer os.Remove(csvFile) + err = exportStateChanges(stateChanges, "csv", csvFile) + require.NoError(t, err) + + fileInfo, err := os.Stat(csvFile) + require.NoError(t, err) + require.Greater(t, fileInfo.Size(), int64(0)) + + // Test JSON export + jsonFile := "integration-test.json" + defer os.Remove(jsonFile) + err = exportStateChanges(stateChanges, "json", jsonFile) + require.NoError(t, err) + + fileInfo, err = os.Stat(jsonFile) + require.NoError(t, err) + require.Greater(t, fileInfo.Size(), int64(0)) + }) + + t.Run("Error Deal Reset Simulation", func(t *testing.T) { + // Find error deals for a specific provider + var errorDeals []model.Deal + err = db.Where("state = ? AND provider = ?", model.DealErrored, "f01234").Find(&errorDeals).Error + require.NoError(t, err) + require.Len(t, errorDeals, 1) // Deal 501 + + // Simulate reset operation + for _, deal := range errorDeals { + metadata := &statetracker.StateChangeMetadata{ + Reason: "Manual error state reset", + AdditionalFields: map[string]interface{}{ + "operationType": "error_state_reset", + "operator": "test", + }, + } + + previousState := &deal.State + err = tracker.TrackStateChangeWithDetails( + ctx, + deal.ID, + previousState, + model.DealProposed, + nil, + nil, + deal.Provider, + deal.ClientActorID, + metadata, + ) + require.NoError(t, err) + + // Update the deal state + err = db.Model(&deal).Update("state", model.DealProposed).Error + require.NoError(t, err) + } + + // Verify reset worked + var updatedDeal model.Deal + err = db.First(&updatedDeal, 501).Error + require.NoError(t, err) + require.Equal(t, model.DealProposed, updatedDeal.State) + + // Verify state change was tracked (now 5 total changes) + var totalChanges int64 + err = db.Model(&model.DealStateChange{}).Count(&totalChanges).Error + require.NoError(t, err) + require.Equal(t, int64(5), totalChanges) + }) + + t.Run("State Change Statistics", func(t *testing.T) { + // Test overall stats + var stats struct { + TotalChanges int64 + UniqueDeals int64 + UniqueProviders int64 + } + + err = db.Model(&model.DealStateChange{}).Count(&stats.TotalChanges).Error + require.NoError(t, err) + + err = db.Model(&model.DealStateChange{}).Distinct("deal_id").Count(&stats.UniqueDeals).Error + require.NoError(t, err) + + err = db.Model(&model.DealStateChange{}).Distinct("provider_id").Count(&stats.UniqueProviders).Error + require.NoError(t, err) + + require.Equal(t, int64(5), stats.TotalChanges) + require.Equal(t, int64(3), stats.UniqueDeals) // deals 500, 501, 502 + require.Equal(t, int64(2), stats.UniqueProviders) // f01234, f01235 + + // Test state distribution + stateDistribution := make(map[string]int64) + rows, err := db.Model(&model.DealStateChange{}). + Select("new_state, COUNT(*) as count"). + Group("new_state"). + Rows() + require.NoError(t, err) + defer rows.Close() + + for rows.Next() { + var state string + var count int64 + err = rows.Scan(&state, &count) + require.NoError(t, err) + stateDistribution[state] = count + } + + require.Equal(t, int64(2), stateDistribution["published"]) // deals 500, 502 + require.Equal(t, int64(1), stateDistribution["active"]) // deal 500 + require.Equal(t, int64(1), stateDistribution["error"]) // deal 501 (original) + require.Equal(t, int64(1), stateDistribution["proposed"]) // deal 501 (reset) + }) +} diff --git a/cmd/deal/state/stats.go b/cmd/deal/state/stats.go new file mode 100644 index 00000000..57865881 --- /dev/null +++ b/cmd/deal/state/stats.go @@ -0,0 +1,40 @@ +package state + +import ( + "github.com/cockroachdb/errors" + "github.com/data-preservation-programs/singularity/cmd/cliutil" + "github.com/data-preservation-programs/singularity/database" + "github.com/data-preservation-programs/singularity/handler/statechange" + "github.com/urfave/cli/v2" +) + +// getStateChangeStatsAction handles the stats command action +func getStateChangeStatsAction(c *cli.Context) error { + db, closer, err := database.OpenFromCLI(c) + if err != nil { + return errors.WithStack(err) + } + defer func() { _ = closer.Close() }() + + // Get state change statistics + stats, err := statechange.Default.GetStateChangeStatsHandler(c.Context, db) + if err != nil { + return errors.WithStack(err) + } + + // Print statistics + cliutil.Print(c, stats) + return nil +} + +var StatsCmd = &cli.Command{ + Name: "stats", + Usage: "Get statistics about deal state changes", + Description: `Get comprehensive statistics about deal state changes including: +- Total number of state changes +- Distribution by state +- Recent activity +- Provider statistics +- Client statistics`, + Action: getStateChangeStatsAction, +} diff --git a/docs/en/cli-reference/README.md b/docs/en/cli-reference/README.md index cbd579af..7241586a 100644 --- a/docs/en/cli-reference/README.md +++ b/docs/en/cli-reference/README.md @@ -54,6 +54,7 @@ COMMANDS: storage Create and manage storage system connections prep Create and manage dataset preparations error Error log management + state Deal state management and monitoring Utility: ez-prep Prepare a dataset from a local path download Download a CAR file from the metadata API diff --git a/model/basetypes.go b/model/basetypes.go index db47fe5b..e948ed14 100644 --- a/model/basetypes.go +++ b/model/basetypes.go @@ -62,26 +62,26 @@ func (d TimeDuration) Duration() time.Duration { } type ClientConfig struct { - ConnectTimeout *int64 `cbor:"1,keyasint,omitempty" json:"connectTimeout,omitempty"` // HTTP Client Connect timeout in nanoseconds - Timeout *int64 `cbor:"2,keyasint,omitempty" json:"timeout,omitempty"` // IO idle timeout in nanoseconds - ExpectContinueTimeout *int64 `cbor:"3,keyasint,omitempty" json:"expectContinueTimeout,omitempty"` // Timeout when using expect / 100-continue in HTTP in nanoseconds - InsecureSkipVerify *bool `cbor:"4,keyasint,omitempty" json:"insecureSkipVerify,omitempty"` // Do not verify the server SSL certificate (insecure) - NoGzip *bool `cbor:"5,keyasint,omitempty" json:"noGzip,omitempty"` // Don't set Accept-Encoding: gzip - UserAgent *string `cbor:"6,keyasint,omitempty" json:"userAgent,omitempty"` // Set the user-agent to a specified string - CaCert []string `cbor:"7,keyasint,omitempty" json:"caCert,omitempty"` // Paths to CA certificate used to verify servers - ClientCert *string `cbor:"8,keyasint,omitempty" json:"clientCert,omitempty"` // Path to Client SSL certificate (PEM) for mutual TLS auth - ClientKey *string `cbor:"9,keyasint,omitempty" json:"clientKey,omitempty"` // Path to Client SSL private key (PEM) for mutual TLS auth - Headers map[string]string `cbor:"10,keyasint,omitempty" json:"headers,omitempty"` // Set HTTP header for all transactions - DisableHTTP2 *bool `cbor:"11,keyasint,omitempty" json:"disableHttp2,omitempty"` // Disable HTTP/2 in the transport - DisableHTTPKeepAlives *bool `cbor:"12,keyasint,omitempty" json:"disableHttpKeepAlives,omitempty"` // Disable HTTP keep-alives and use each connection once. - RetryMaxCount *int `cbor:"13,keyasint,omitempty" json:"retryMaxCount,omitempty"` // Maximum number of retries. Default is 10 retries. - RetryDelay *int64 `cbor:"14,keyasint,omitempty" json:"retryDelay,omitempty"` // Delay between retries in nanoseconds. Default is 1s. - RetryBackoff *int64 `cbor:"15,keyasint,omitempty" json:"retryBackoff,omitempty"` // Constant backoff between retries in nanoseconds. Default is 1s. - RetryBackoffExponential *float64 `cbor:"16,keyasint,omitempty" json:"retryBackoffExponential,omitempty"` // Exponential backoff between retries. Default is 1.0. - SkipInaccessibleFile *bool `cbor:"17,keyasint,omitempty" json:"skipInaccessibleFile,omitempty"` // Skip inaccessible files. Default is false. - UseServerModTime *bool `cbor:"18,keyasint,omitempty" json:"useServerModTime,omitempty"` // Use server modified time instead of object metadata - LowLevelRetries *int `cbor:"19,keyasint,omitempty" json:"lowlevelRetries,omitempty"` // Maximum number of retries for low-level client errors. Default is 10 retries. - ScanConcurrency *int `cbor:"20,keyasint,omitempty" json:"scanConcurrency,omitempty"` // Maximum number of concurrent scan requests. Default is 1. + ConnectTimeout *int64 `cbor:"1,keyasint,omitempty" json:"connectTimeout,omitempty"` // HTTP Client Connect timeout in nanoseconds + Timeout *int64 `cbor:"2,keyasint,omitempty" json:"timeout,omitempty"` // IO idle timeout in nanoseconds + ExpectContinueTimeout *int64 `cbor:"3,keyasint,omitempty" json:"expectContinueTimeout,omitempty"` // Timeout when using expect / 100-continue in HTTP in nanoseconds + InsecureSkipVerify *bool `cbor:"4,keyasint,omitempty" json:"insecureSkipVerify,omitempty"` // Do not verify the server SSL certificate (insecure) + NoGzip *bool `cbor:"5,keyasint,omitempty" json:"noGzip,omitempty"` // Don't set Accept-Encoding: gzip + UserAgent *string `cbor:"6,keyasint,omitempty" json:"userAgent,omitempty"` // Set the user-agent to a specified string + CaCert []string `cbor:"7,keyasint,omitempty" json:"caCert,omitempty"` // Paths to CA certificate used to verify servers + ClientCert *string `cbor:"8,keyasint,omitempty" json:"clientCert,omitempty"` // Path to Client SSL certificate (PEM) for mutual TLS auth + ClientKey *string `cbor:"9,keyasint,omitempty" json:"clientKey,omitempty"` // Path to Client SSL private key (PEM) for mutual TLS auth + Headers map[string]string `cbor:"10,keyasint,omitempty" json:"headers,omitempty"` // Set HTTP header for all transactions + DisableHTTP2 *bool `cbor:"11,keyasint,omitempty" json:"disableHttp2,omitempty"` // Disable HTTP/2 in the transport + DisableHTTPKeepAlives *bool `cbor:"12,keyasint,omitempty" json:"disableHttpKeepAlives,omitempty"` // Disable HTTP keep-alives and use each connection once. + RetryMaxCount *int `cbor:"13,keyasint,omitempty" json:"retryMaxCount,omitempty"` // Maximum number of retries. Default is 10 retries. + RetryDelay *int64 `cbor:"14,keyasint,omitempty" json:"retryDelay,omitempty"` // Delay between retries in nanoseconds. Default is 1s. + RetryBackoff *int64 `cbor:"15,keyasint,omitempty" json:"retryBackoff,omitempty"` // Constant backoff between retries in nanoseconds. Default is 1s. + RetryBackoffExponential *float64 `cbor:"16,keyasint,omitempty" json:"retryBackoffExponential,omitempty"` // Exponential backoff between retries. Default is 1.0. + SkipInaccessibleFile *bool `cbor:"17,keyasint,omitempty" json:"skipInaccessibleFile,omitempty"` // Skip inaccessible files. Default is false. + UseServerModTime *bool `cbor:"18,keyasint,omitempty" json:"useServerModTime,omitempty"` // Use server modified time instead of object metadata + LowLevelRetries *int `cbor:"19,keyasint,omitempty" json:"lowlevelRetries,omitempty"` // Maximum number of retries for low-level client errors. Default is 10 retries. + ScanConcurrency *int `cbor:"20,keyasint,omitempty" json:"scanConcurrency,omitempty"` // Maximum number of concurrent scan requests. Default is 1. } func (c CID) MarshalBinary() ([]byte, error) { diff --git a/service/statetracker/benchmark_test.go b/service/statetracker/benchmark_test.go index 895bfb7d..050340f2 100644 --- a/service/statetracker/benchmark_test.go +++ b/service/statetracker/benchmark_test.go @@ -16,7 +16,7 @@ func BenchmarkStateChangeInsert(b *testing.B) { connStr := "sqlite:" + b.TempDir() + "/benchmark.db" db, closer, err := database.OpenWithLogger(connStr) require.NoError(b, err) - defer closer.Close() + defer func() { _ = closer.Close() }() err = model.GetMigrator(db).Migrate() require.NoError(b, err) @@ -63,7 +63,7 @@ func BenchmarkDealLifecycle(b *testing.B) { connStr := "sqlite:" + b.TempDir() + "/benchmark.db" db, closer, err := database.OpenWithLogger(connStr) require.NoError(b, err) - defer closer.Close() + defer func() { _ = closer.Close() }() err = model.GetMigrator(db).Migrate() require.NoError(b, err) @@ -122,7 +122,7 @@ func TestStateTrackingPerformanceImpact(t *testing.T) { connStr := "sqlite:" + t.TempDir() + "/performance.db" db, closer, err := database.OpenWithLogger(connStr) require.NoError(t, err) - defer closer.Close() + defer func() { _ = closer.Close() }() err = model.GetMigrator(db).Migrate() require.NoError(t, err) diff --git a/service/statetracker/statetracker.go b/service/statetracker/statetracker.go index c1112483..4ae4b081 100644 --- a/service/statetracker/statetracker.go +++ b/service/statetracker/statetracker.go @@ -23,12 +23,19 @@ type StateChangeTracker struct { // StateChangeMetadata represents additional metadata that can be stored with a state change type StateChangeMetadata struct { - // Basic state change information + Reason string `json:"reason,omitempty"` // Reason for the state change + Error string `json:"error,omitempty"` // Error message if applicable + TransactionID string `json:"transactionId,omitempty"` // On-chain transaction ID + PublishCID string `json:"publishCid,omitempty"` // Message CID for deal publication // Deal lifecycle epochs + ActivationEpoch *int32 `json:"activationEpoch,omitempty"` // Epoch when deal was activated + ExpirationEpoch *int32 `json:"expirationEpoch,omitempty"` // Epoch when deal expires + SlashingEpoch *int32 `json:"slashingEpoch,omitempty"` // Epoch when deal was slashed // Deal pricing and terms + StoragePrice string `json:"storagePrice,omitempty"` // Storage price per epoch // Enhanced error categorization fields ErrorCategory string `json:"errorCategory,omitempty"` // Categorized error type (e.g., "network_timeout", "deal_rejected") @@ -59,17 +66,7 @@ type StateChangeMetadata struct { DatabaseHealth string `json:"databaseHealth,omitempty"` // Database health status // Flexible additional fields for future extensibility - - Reason string `json:"reason,omitempty"` // Reason for the state change - Error string `json:"error,omitempty"` // Error message if applicable - TransactionID string `json:"transactionId,omitempty"` // On-chain transaction ID - PublishCID string `json:"publishCid,omitempty"` // Message CID for deal publication - ActivationEpoch *int32 `json:"activationEpoch,omitempty"` // Epoch when deal was activated - ExpirationEpoch *int32 `json:"expirationEpoch,omitempty"` // Epoch when deal expires - SlashingEpoch *int32 `json:"slashingEpoch,omitempty"` // Epoch when deal was slashed - StoragePrice string `json:"storagePrice,omitempty"` // Storage price per epoch AdditionalFields map[string]interface{} `json:"additionalFields,omitempty"` // Any additional custom fields - } // NewStateChangeTracker creates a new instance of StateChangeTracker diff --git a/util/testutil/testutils.go b/util/testutil/testutils.go index 340c039d..753edcbe 100644 --- a/util/testutil/testutils.go +++ b/util/testutil/testutils.go @@ -183,7 +183,7 @@ func doOne(t *testing.T, backend string, testFunc func(ctx context.Context, t *t // Clear any existing data from tables with unique constraints tables := []string{ "output_attachments", - "source_attachments", + "source_attachments", "storages", "wallets", "deal_schedules", @@ -200,7 +200,7 @@ func doOne(t *testing.T, backend string, testFunc func(ctx context.Context, t *t err = db.Exec("DELETE FROM " + table).Error } if err != nil { - t.Logf("Warning: Failed to clear table %s: %v", table, err) + t.Logf("Warning: Failed to clear table %s: %v", table, err) // Don't fail the test, as table may not exist yet } }