diff --git a/pkg/commands/lab_xatu_cbt_generate_transformation.go b/pkg/commands/lab_xatu_cbt_generate_transformation.go index dccf32b..10f7cef 100644 --- a/pkg/commands/lab_xatu_cbt_generate_transformation.go +++ b/pkg/commands/lab_xatu_cbt_generate_transformation.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "strings" + "sync" "github.com/ethpandaops/xcli/pkg/config" "github.com/ethpandaops/xcli/pkg/constants" @@ -15,21 +16,45 @@ import ( "github.com/spf13/cobra" ) +// generateTransformationTestOptions holds all options for the generate-transformation-test command. +type generateTransformationTestOptions struct { + model string + models []string + network string + spec string + rangeColumn string + from string + to string + limit int + upload bool + aiAssertions bool + skipExisting bool + sanitizeIPs bool + duration string + yes bool + skipAIDiscovery bool + parallel int +} + // NewLabXatuCBTGenerateTransformationTestCommand creates the command. func NewLabXatuCBTGenerateTransformationTestCommand(log logrus.FieldLogger, configPath string) *cobra.Command { var ( - model string - network string - spec string - rangeColumn string - from string - to string - limit int - upload bool - aiAssertions bool - skipExisting bool - noSanitizeIPs bool - duration string + model string + models []string + network string + spec string + rangeColumn string + from string + to string + limit int + upload bool + aiAssertions bool + skipExisting bool + noSanitizeIPs bool + duration string + yes bool + skipAIDiscovery bool + parallel int ) cmd := &cobra.Command{ @@ -68,12 +93,31 @@ S3 Upload Configuration (defaults to Cloudflare R2): S3_ENDPOINT Override endpoint (default: ethpandaops R2) S3_BUCKET Override bucket (default: ethpandaops-platform-production-public)`, RunE: func(cmd *cobra.Command, args []string) error { - return runGenerateTransformationTest(cmd.Context(), log, configPath, - model, network, spec, rangeColumn, from, to, limit, upload, aiAssertions, skipExisting, !noSanitizeIPs, duration) + opts := generateTransformationTestOptions{ + model: model, + models: models, + network: network, + spec: spec, + rangeColumn: rangeColumn, + from: from, + to: to, + limit: limit, + upload: upload, + aiAssertions: aiAssertions, + skipExisting: skipExisting, + sanitizeIPs: !noSanitizeIPs, + duration: duration, + yes: yes, + skipAIDiscovery: skipAIDiscovery, + parallel: parallel, + } + + return runGenerateTransformationTest(cmd.Context(), log, configPath, opts) }, } - cmd.Flags().StringVar(&model, "model", "", "Transformation model name") + cmd.Flags().StringVar(&model, "model", "", "Transformation model name (single model)") + cmd.Flags().StringSliceVar(&models, "models", nil, "Comma-separated list of models to process (batch mode)") cmd.Flags().StringVar(&network, "network", "", "Network name (mainnet, sepolia, etc.)") cmd.Flags().StringVar(&spec, "spec", "", "Fork spec (pectra, fusaka, etc.)") cmd.Flags().StringVar(&rangeColumn, "range-column", "", "Override detected range column") @@ -85,6 +129,9 @@ S3 Upload Configuration (defaults to Cloudflare R2): cmd.Flags().BoolVar(&skipExisting, "skip-existing", false, "Skip generating seed data for existing S3 files") cmd.Flags().BoolVar(&noSanitizeIPs, "no-sanitize-ips", false, "Disable IP address sanitization (IPs are sanitized by default)") cmd.Flags().StringVar(&duration, "duration", "", "Time range duration (e.g., 1m, 5m, 10m, 30m)") + cmd.Flags().BoolVarP(&yes, "yes", "y", false, "Non-interactive mode (auto-accept all prompts)") + cmd.Flags().BoolVar(&skipAIDiscovery, "skip-ai-discovery", false, "Skip Claude range analysis, use heuristics only") + cmd.Flags().IntVar(¶llel, "parallel", 1, "Number of models to process in parallel (batch mode)") return cmd } @@ -94,11 +141,23 @@ func runGenerateTransformationTest( ctx context.Context, log logrus.FieldLogger, configPath string, - model, network, spec, rangeColumn, from, to string, - limit int, - upload, aiAssertions, skipExisting, sanitizeIPs bool, - duration string, + opts generateTransformationTestOptions, ) error { + // Extract options for easier access + model := opts.model + network := opts.network + spec := opts.spec + duration := opts.duration + upload := opts.upload + aiAssertions := opts.aiAssertions + skipExisting := opts.skipExisting + sanitizeIPs := opts.sanitizeIPs + limit := opts.limit + yes := opts.yes + skipAIDiscovery := opts.skipAIDiscovery + models := opts.models + parallel := opts.parallel + // Load configuration labCfg, _, err := config.LoadLabConfig(configPath) if err != nil { @@ -111,6 +170,29 @@ func runGenerateTransformationTest( "Run 'xcli lab mode hybrid' to switch to hybrid mode", labCfg.Mode) } + // Handle batch mode + if len(models) > 0 { + return runBatchGenerateTransformationTest(ctx, log, labCfg, opts) + } + + // Validate non-interactive mode requirements + if yes { + if network == "" { + return fmt.Errorf("--network is required in non-interactive mode (--yes)") + } + + if spec == "" { + return fmt.Errorf("--spec is required in non-interactive mode (--yes)") + } + + if duration == "" { + duration = "5m" // Default duration in non-interactive mode + } + } + + // Silence unused variable warnings for parallel (only used in batch mode) + _ = parallel + // Create generator gen := seeddata.NewGenerator(log, labCfg) @@ -118,6 +200,10 @@ func runGenerateTransformationTest( var promptErr error if model == "" { + if yes { + return fmt.Errorf("--model is required in non-interactive mode (--yes)") + } + model, promptErr = promptForTransformationModel(labCfg.Repos.XatuCBT) if promptErr != nil { return promptErr @@ -196,13 +282,29 @@ func runGenerateTransformationTest( } ui.Info(fmt.Sprintf("Using %s time range", duration)) - ui.Info("This may take a few minutes for models with many dependencies - grab a coffee ☕") + + if !skipAIDiscovery { + ui.Info("This may take a few minutes for models with many dependencies - grab a coffee ☕") + } var discoveryResult *seeddata.DiscoveryResult - // Try AI discovery first - discoveryClient, discoveryErr := seeddata.NewClaudeDiscoveryClient(log, gen) - if discoveryErr != nil { + // Skip AI discovery if requested or use heuristics + if skipAIDiscovery { + ui.Info("Skipping AI discovery, using heuristic range detection") + + var rangeInfos map[string]*seeddata.RangeColumnInfo + + rangeInfos, err = seeddata.DetectRangeColumnsForModels(externalModels, labCfg.Repos.XatuCBT) + if err != nil { + return fmt.Errorf("failed to detect range columns: %w", err) + } + + discoveryResult, err = seeddata.FallbackRangeDiscovery(ctx, gen, externalModels, network, rangeInfos, duration, labCfg.Repos.XatuCBT) + if err != nil { + return fmt.Errorf("fallback range discovery failed: %w", err) + } + } else if discoveryClient, discoveryErr := seeddata.NewClaudeDiscoveryClient(log, gen); discoveryErr != nil { ui.Warning(fmt.Sprintf("Claude CLI not available: %v", discoveryErr)) ui.Info("Falling back to heuristic range detection") @@ -334,15 +436,19 @@ func runGenerateTransformationTest( ui.Blank() - proceedMissing, missErr := ui.Confirm("Proceed anyway?") - if missErr != nil { - return missErr - } + if !yes { + proceedMissing, missErr := ui.Confirm("Proceed anyway?") + if missErr != nil { + return missErr + } - if !proceedMissing { - ui.Info("Aborted. Try regenerating with clearer model names.") + if !proceedMissing { + ui.Info("Aborted. Try regenerating with clearer model names.") - return nil + return nil + } + } else { + ui.Info("Non-interactive mode: proceeding with missing models") } } @@ -473,15 +579,19 @@ func runGenerateTransformationTest( ui.Warning("These queries timed out - the tables may be too large or the range too wide.") ui.Warning("Consider narrowing the block/time range, or proceed if you believe the data exists.") - proceedWithErrors, errErr := ui.Confirm("Proceed anyway (assuming data exists)?") - if errErr != nil { - return errErr - } + if !yes { + proceedWithErrors, errErr := ui.Confirm("Proceed anyway (assuming data exists)?") + if errErr != nil { + return errErr + } - if !proceedWithErrors { - ui.Info("Aborted by user. Try a narrower range.") + if !proceedWithErrors { + ui.Info("Aborted by user. Try a narrower range.") - return nil + return nil + } + } else { + ui.Info("Non-interactive mode: proceeding despite query errors") } } @@ -497,43 +607,49 @@ func runGenerateTransformationTest( ui.Blank() ui.Warning("Empty parquets will be generated for these models, which may cause test failures.") - expandWindow, expandErr := ui.Confirm("Would you like to expand the time window and retry?") - if expandErr != nil { - return expandErr - } + if !yes { + expandWindow, expandErr := ui.Confirm("Would you like to expand the time window and retry?") + if expandErr != nil { + return expandErr + } - if expandWindow { - ui.Info("Please re-run the command with a larger time window or different range.") - ui.Info("Tip: Some tables (like canonical_execution_contracts) may have sparse data.") + if expandWindow { + ui.Info("Please re-run the command with a larger time window or different range.") + ui.Info("Tip: Some tables (like canonical_execution_contracts) may have sparse data.") - return nil - } + return nil + } - // Let user proceed anyway if they want - proceedAnyway, proceedErr := ui.Confirm("Proceed anyway with potentially empty data?") - if proceedErr != nil { - return proceedErr - } + // Let user proceed anyway if they want + proceedAnyway, proceedErr := ui.Confirm("Proceed anyway with potentially empty data?") + if proceedErr != nil { + return proceedErr + } - if !proceedAnyway { - ui.Info("Aborted by user") + if !proceedAnyway { + ui.Info("Aborted by user") - return nil + return nil + } + } else { + ui.Info("Non-interactive mode: proceeding with potentially empty data") } } // User confirmation - ui.Blank() + if !yes { + ui.Blank() - proceed, confirmErr := ui.Confirm("Proceed with this strategy?") - if confirmErr != nil { - return confirmErr - } + proceed, confirmErr := ui.Confirm("Proceed with this strategy?") + if confirmErr != nil { + return confirmErr + } - if !proceed { - ui.Info("Aborted by user") + if !proceed { + ui.Info("Aborted by user") - return nil + return nil + } } // Row limit handling: @@ -546,20 +662,19 @@ func runGenerateTransformationTest( ui.Info("Using unlimited rows (AI discovery already optimized the range)") } else if limit == defaultRowLimit { - // Fallback/manual mode: prompt for safety - limit, promptErr = promptForLimit() - if promptErr != nil { - return promptErr + if yes { + // Non-interactive mode: use unlimited + limit = 0 + } else { + // Fallback/manual mode: prompt for safety + limit, promptErr = promptForLimit() + if promptErr != nil { + return promptErr + } } } - // Prompt for upload - if !upload { - upload, promptErr = ui.Confirm("Upload to S3?") - if promptErr != nil { - return promptErr - } - } + // In non-interactive mode, upload flag already set via CLI // S3 preflight check if uploading var uploader *seeddata.S3Uploader @@ -764,6 +879,9 @@ func runGenerateTransformationTest( assertions = seeddata.GetDefaultAssertions(model) } + } else if yes { + // Non-interactive mode: use default assertions + assertions = seeddata.GetDefaultAssertions(model) } else { // Prompt for AI assertions useAI, confirmErr := ui.Confirm("Generate assertions with Claude?") @@ -799,10 +917,16 @@ func runGenerateTransformationTest( return fmt.Errorf("failed to generate YAML: %w", err) } - // Prompt to write YAML to xatu-cbt - writeYAML, writeErr := ui.Confirm("Write test YAML to xatu-cbt?") - if writeErr != nil { - return writeErr + // Write YAML to xatu-cbt + writeYAML := yes // In non-interactive mode, always write + + if !yes { + var writeErr error + + writeYAML, writeErr = ui.Confirm("Write test YAML to xatu-cbt?") + if writeErr != nil { + return writeErr + } } if writeYAML { @@ -884,3 +1008,410 @@ func generateAIAssertions(ctx context.Context, log logrus.FieldLogger, model str return assertions, nil } + +// batchModelResult holds the result of processing a single model in batch mode. +type batchModelResult struct { + Model string + Success bool + Error error +} + +// runBatchGenerateTransformationTest processes multiple models with parallel support. +// +//nolint:funlen // Batch processing handler +func runBatchGenerateTransformationTest( + ctx context.Context, + log logrus.FieldLogger, + labCfg *config.LabConfig, + opts generateTransformationTestOptions, +) error { + models := opts.models + parallel := opts.parallel + + if parallel < 1 { + parallel = 1 + } + + // Validate required options for batch mode + if opts.network == "" { + return fmt.Errorf("--network is required in batch mode") + } + + if opts.spec == "" { + return fmt.Errorf("--spec is required in batch mode") + } + + // Set defaults for batch mode + if opts.duration == "" { + opts.duration = "5m" + } + + // Force non-interactive mode for batch + opts.yes = true + + ui.Header(fmt.Sprintf("Batch mode: processing %d models (parallelism: %d)", len(models), parallel)) + ui.Blank() + + // Create result channel and semaphore + results := make([]batchModelResult, len(models)) + sem := make(chan struct{}, parallel) + + // Process models + var wg sync.WaitGroup + + for i, model := range models { + wg.Add(1) + + go func(idx int, modelName string) { + defer wg.Done() + + // Acquire semaphore + sem <- struct{}{} + + defer func() { <-sem }() + + ui.Info(fmt.Sprintf("Starting: %s", modelName)) + + // Create a copy of opts for this model + modelOpts := opts + modelOpts.model = modelName + modelOpts.models = nil // Clear batch list + + // Run the single model generation + err := runSingleModelGeneration(ctx, log, labCfg, modelOpts) + + results[idx] = batchModelResult{ + Model: modelName, + Success: err == nil, + Error: err, + } + if err != nil { + ui.Error(fmt.Sprintf("Failed: %s - %v", modelName, err)) + } else { + ui.Success(fmt.Sprintf("Completed: %s", modelName)) + } + }(i, model) + } + + wg.Wait() + + // Summary + ui.Blank() + ui.Header("Batch Summary") + + var succeeded, failed int + + for _, result := range results { + if result.Success { + succeeded++ + + ui.Success(fmt.Sprintf(" ✓ %s", result.Model)) + } else { + failed++ + + ui.Error(fmt.Sprintf(" ✗ %s: %v", result.Model, result.Error)) + } + } + + ui.Blank() + ui.Info(fmt.Sprintf("Total: %d succeeded, %d failed", succeeded, failed)) + + if failed > 0 { + return fmt.Errorf("%d model(s) failed to process", failed) + } + + return nil +} + +// runSingleModelGeneration is the core logic for generating a single model's test. +// It's extracted to be called both from single model mode and batch mode. +// +//nolint:funlen,cyclop,gocyclo,gocognit // Core generation logic +func runSingleModelGeneration( + ctx context.Context, + log logrus.FieldLogger, + labCfg *config.LabConfig, + opts generateTransformationTestOptions, +) error { + model := opts.model + network := opts.network + spec := opts.spec + duration := opts.duration + upload := opts.upload + aiAssertions := opts.aiAssertions + skipExisting := opts.skipExisting + sanitizeIPs := opts.sanitizeIPs + limit := opts.limit + skipAIDiscovery := opts.skipAIDiscovery + + // Helper for prefixed logging (useful in batch mode) + prefix := fmt.Sprintf("[%s] ", model) + logInfo := func(msg string) { ui.Info(prefix + msg) } + + gen := seeddata.NewGenerator(log, labCfg) + + // Resolve dependency tree + logInfo("Resolving dependency tree") + + tree, err := seeddata.ResolveDependencyTree(model, labCfg.Repos.XatuCBT, nil) + if err != nil { + return fmt.Errorf("failed to resolve dependencies: %w", err) + } + + // Get external dependencies + externalModels := tree.GetExternalDependencies() + if len(externalModels) == 0 { + return fmt.Errorf("no external dependencies found for %s", model) + } + + logInfo(fmt.Sprintf("Found %d external dependencies: %v", len(externalModels), externalModels)) + + // Range discovery + var discoveryResult *seeddata.DiscoveryResult + + if skipAIDiscovery { + logInfo("Using heuristic range detection (--skip-ai-discovery)") + + rangeInfos, rangeErr := seeddata.DetectRangeColumnsForModels(externalModels, labCfg.Repos.XatuCBT) + if rangeErr != nil { + return fmt.Errorf("failed to detect range columns: %w", rangeErr) + } + + discoveryResult, err = seeddata.FallbackRangeDiscovery(ctx, gen, externalModels, network, rangeInfos, duration, labCfg.Repos.XatuCBT) + if err != nil { + return fmt.Errorf("fallback range discovery failed: %w", err) + } + } else if discoveryClient, discoveryErr := seeddata.NewClaudeDiscoveryClient(log, gen); discoveryErr != nil { + // Claude not available, use heuristics + logInfo("Claude unavailable, using heuristic range detection") + + rangeInfos, rangeErr := seeddata.DetectRangeColumnsForModels(externalModels, labCfg.Repos.XatuCBT) + if rangeErr != nil { + return fmt.Errorf("failed to detect range columns: %w", rangeErr) + } + + discoveryResult, err = seeddata.FallbackRangeDiscovery(ctx, gen, externalModels, network, rangeInfos, duration, labCfg.Repos.XatuCBT) + if err != nil { + return fmt.Errorf("fallback range discovery failed: %w", err) + } + } else { + // Use Claude for analysis + logInfo("Analyzing with Claude AI") + + schemaInfo, schemaErr := discoveryClient.GatherSchemaInfo(ctx, externalModels, network, labCfg.Repos.XatuCBT) + if schemaErr != nil { + return fmt.Errorf("failed to gather schema info: %w", schemaErr) + } + + transformationSQL, sqlErr := seeddata.ReadTransformationSQL(model, labCfg.Repos.XatuCBT) + if sqlErr != nil { + return fmt.Errorf("failed to read transformation SQL: %w", sqlErr) + } + + intermediateSQL, _ := seeddata.ReadIntermediateSQL(tree, labCfg.Repos.XatuCBT) + + var intermediateModels []seeddata.IntermediateSQL + for modelName, sql := range intermediateSQL { + intermediateModels = append(intermediateModels, seeddata.IntermediateSQL{ + Model: modelName, + SQL: sql, + }) + } + + discoveryResult, err = discoveryClient.AnalyzeRanges(ctx, seeddata.DiscoveryInput{ + TransformationModel: model, + TransformationSQL: transformationSQL, + IntermediateModels: intermediateModels, + Network: network, + Duration: duration, + ExternalModels: schemaInfo, + }) + if err != nil { + // Fallback to heuristics + logInfo("Claude analysis failed, falling back to heuristics") + + rangeInfos, rangeErr := seeddata.DetectRangeColumnsForModels(externalModels, labCfg.Repos.XatuCBT) + if rangeErr != nil { + return fmt.Errorf("failed to detect range columns: %w", rangeErr) + } + + discoveryResult, err = seeddata.FallbackRangeDiscovery(ctx, gen, externalModels, network, rangeInfos, duration, labCfg.Repos.XatuCBT) + if err != nil { + return fmt.Errorf("fallback range discovery failed: %w", err) + } + } else { + logInfo(fmt.Sprintf("Claude strategy generated (confidence: %.0f%%)", discoveryResult.OverallConfidence*100)) + } + } + + // Use unlimited rows if using AI discovery + if discoveryResult != nil && limit == defaultRowLimit { + limit = 0 + } + + // Log the discovery result + if discoveryResult != nil { + logInfo(fmt.Sprintf("Range: %s [%s → %s]", discoveryResult.PrimaryRangeColumn, discoveryResult.FromValue, discoveryResult.ToValue)) + + for _, strategy := range discoveryResult.Strategies { + if strategy.RangeColumn == "" || strategy.ColumnType == seeddata.RangeColumnTypeNone { + logInfo(fmt.Sprintf(" • %s: (dimension table - all data)", strategy.Model)) + } else { + logInfo(fmt.Sprintf(" • %s: %s [%s → %s]", strategy.Model, strategy.RangeColumn, strategy.FromValue, strategy.ToValue)) + } + } + } + + // S3 uploader setup + var uploader *seeddata.S3Uploader + + if upload { + uploader, err = seeddata.NewS3Uploader(ctx, log) + if err != nil { + return fmt.Errorf("failed to create S3 uploader: %w", err) + } + + if accessErr := uploader.CheckAccess(ctx); accessErr != nil { + return fmt.Errorf("S3 preflight check failed: %w", accessErr) + } + + logInfo("S3 uploader ready") + } + + // Generate salt for IP sanitization + var salt string + + if sanitizeIPs { + salt, err = seeddata.GenerateSalt() + if err != nil { + return fmt.Errorf("failed to generate salt for IP sanitization: %w", err) + } + } + + // Generate seed data for all external models + urls := make(map[string]string, len(externalModels)) + + for _, extModel := range externalModels { + strategy := discoveryResult.GetStrategy(extModel) + if strategy == nil { + detectedCol, detectErr := seeddata.DetectRangeColumnForModel(extModel, labCfg.Repos.XatuCBT) + if detectErr != nil { + detectedCol = seeddata.DefaultRangeColumn + } + + colLower := strings.ToLower(detectedCol) + isTimeColumn := strings.Contains(colLower, "date") || strings.Contains(colLower, "time") + primaryIsTime := discoveryResult.PrimaryRangeType == seeddata.RangeColumnTypeTime + + if isTimeColumn != primaryIsTime { + return fmt.Errorf("cannot generate %s: range column type mismatch", extModel) + } + + strategy = &seeddata.TableRangeStrategy{ + Model: extModel, + RangeColumn: detectedCol, + FromValue: discoveryResult.FromValue, + ToValue: discoveryResult.ToValue, + } + } + + filename := seeddata.GetParquetFilename(model, extModel) + outputPath := fmt.Sprintf("./%s", filename) + + // Check if we should skip existing + if upload && skipExisting && uploader != nil { + exists, existsErr := uploader.ObjectExists(ctx, network, spec, filename[:len(filename)-8]) + if existsErr == nil && exists { + logInfo(fmt.Sprintf("Skipping %s (already exists)", extModel)) + + urls[extModel] = uploader.GetPublicURL(network, spec, filename[:len(filename)-8]) + + continue + } + } + + logInfo(fmt.Sprintf("Generating %s (%s: %s → %s)", extModel, strategy.RangeColumn, strategy.FromValue, strategy.ToValue)) + + result, genErr := gen.Generate(ctx, seeddata.GenerateOptions{ + Model: extModel, + Network: network, + Spec: spec, + RangeColumn: strategy.RangeColumn, + From: strategy.FromValue, + To: strategy.ToValue, + FilterSQL: strategy.FilterSQL, + CorrelationFilter: strategy.CorrelationFilter, + Limit: limit, + OutputPath: outputPath, + SanitizeIPs: sanitizeIPs, + Salt: salt, + }) + if genErr != nil { + return fmt.Errorf("failed to generate seed data for %s: %w", extModel, genErr) + } + + // Upload if requested + if upload && uploader != nil { + logInfo(fmt.Sprintf("Uploading %s to S3", extModel)) + + uploadResult, uploadErr := uploader.Upload(ctx, seeddata.UploadOptions{ + LocalPath: outputPath, + Network: network, + Spec: spec, + Model: extModel, + Filename: filename[:len(filename)-8], + }) + if uploadErr != nil { + return fmt.Errorf("failed to upload %s: %w", extModel, uploadErr) + } + + urls[extModel] = uploadResult.PublicURL + + // Clean up local file + _ = os.Remove(outputPath) + } else { + urls[extModel] = fmt.Sprintf("https://%s/%s/%s/%s/%s", + seeddata.DefaultS3PublicDomain, seeddata.DefaultS3Prefix, network, spec, filename) + } + + _ = result // Silence unused warning + } + + // Generate assertions (default in batch mode) + var assertions []seeddata.Assertion + + if aiAssertions { + assertions, err = generateAIAssertions(ctx, log, model, externalModels, labCfg.Repos.XatuCBT) + if err != nil { + assertions = seeddata.GetDefaultAssertions(model) + } + } else { + assertions = seeddata.GetDefaultAssertions(model) + } + + // Generate test YAML + yamlContent, err := seeddata.GenerateTransformationTestYAML(seeddata.TransformationTemplateData{ + Model: model, + Network: network, + Spec: spec, + ExternalModels: externalModels, + URLs: urls, + Assertions: assertions, + }) + if err != nil { + return fmt.Errorf("failed to generate YAML: %w", err) + } + + // Write YAML + yamlPath := filepath.Join(labCfg.Repos.XatuCBT, "tests", network, spec, "models", model+".yaml") + + logInfo(fmt.Sprintf("Writing test YAML to %s", yamlPath)) + + if yamlWriteErr := writeTestYAML(yamlPath, yamlContent); yamlWriteErr != nil { + return yamlWriteErr + } + + logInfo("Done") + + return nil +} diff --git a/pkg/seeddata/discovery.go b/pkg/seeddata/discovery.go index 3519964..a54a1a7 100644 --- a/pkg/seeddata/discovery.go +++ b/pkg/seeddata/discovery.go @@ -34,6 +34,9 @@ const ( RangeColumnTypeNone RangeColumnType = "none" // RangeColumnTypeUnknown represents an unclassified column type. RangeColumnTypeUnknown RangeColumnType = "unknown" + + // BlockNumberColumn is the standard column name for block-based tables. + BlockNumberColumn = "block_number" ) // TableRangeStrategy describes how to filter a single table for seed data extraction. @@ -669,7 +672,7 @@ func ClassifyRangeColumn(column string, schema []ColumnInfo) RangeColumnType { return RangeColumnTypeTime case strings.Contains(colLower, "timestamp"): return RangeColumnTypeTime - case colLower == "block_number" || strings.HasSuffix(colLower, "_block_number"): + case colLower == BlockNumberColumn || strings.HasSuffix(colLower, "_block_number"): return RangeColumnTypeBlock case colLower == "slot" || strings.HasSuffix(colLower, "_slot"): return RangeColumnTypeSlot @@ -816,6 +819,8 @@ func categorizeModelsByType( } // FallbackRangeDiscovery provides heuristic-based range discovery when Claude is unavailable. +// +//nolint:funlen,gocognit,cyclop,gocyclo // Complex heuristic logic requires length func FallbackRangeDiscovery( ctx context.Context, gen *Generator, @@ -835,11 +840,25 @@ func FallbackRangeDiscovery( } // Group models by interval type - timeModels, blockModels, entityModels, unknownModels := categorizeModelsByType(models, intervalTypes, rangeInfos) + _, blockModels, entityModels, unknownModels := categorizeModelsByType(models, intervalTypes, rangeInfos) + + // Track time ranges and block ranges separately + var latestTimeMin, earliestTimeMax time.Time + + var latestBlockMin, earliestBlockMax int64 - // Query ranges for models - var latestMin, earliestMax time.Time + hasTimeRanges := false + hasBlockRanges := false + + // Store per-model range info for later assignment + type modelRangeInfo struct { + rangeCol string + colType RangeColumnType + reasoning string + confidence float64 + } + modelRanges := make(map[string]*modelRangeInfo, len(models)) strategies := make([]TableRangeStrategy, 0, len(models)) for _, model := range models { @@ -847,51 +866,40 @@ func FallbackRangeDiscovery( // Check model category isEntity := contains(entityModels, model) + isBlock := contains(blockModels, model) isUnknown := contains(unknownModels, model) var rangeCol string var colType RangeColumnType - // For entity models, find a time column in the schema + // For entity models, they typically don't have time-based ranges + // Mark them as "none" type - they'll get all data or use correlation if isEntity { - // Query schema to find a time column - columns, schemaErr := gen.DescribeTable(ctx, model) - if schemaErr != nil { - gen.log.WithError(schemaErr).WithField("model", model).Warn("failed to describe entity table") - - strategies = append(strategies, TableRangeStrategy{ - Model: model, - ColumnType: RangeColumnTypeTime, - Confidence: 0.3, - Reasoning: fmt.Sprintf("Entity model (schema query failed: %v)", schemaErr), - }) + gen.log.WithField("model", model).Info("entity/dimension table - will query without range filter") - continue - } - - timeCol := findTimeColumnInSchema(columns) - if timeCol == "" { - gen.log.WithField("model", model).Warn("no time column found for entity model") - - strategies = append(strategies, TableRangeStrategy{ - Model: model, - ColumnType: RangeColumnTypeTime, - Confidence: 0.3, - Reasoning: "Entity model - no time column found in schema", - }) + strategies = append(strategies, TableRangeStrategy{ + Model: model, + ColumnType: RangeColumnTypeNone, + Confidence: 0.7, + Reasoning: "Entity/dimension table - no range filtering (all data)", + }) - continue + continue + } else if isBlock { + // Block-based models use block_number + rangeCol = BlockNumberColumn + if info != nil && info.RangeColumn != "" { + rangeCol = info.RangeColumn } - rangeCol = timeCol - colType = RangeColumnTypeTime + colType = RangeColumnTypeBlock } else if isUnknown { // Unknown models - try default range column rangeCol = DefaultRangeColumn colType = RangeColumnTypeTime } else { - // Time or block models - use detected range column + // Time models - use detected range column rangeCol = DefaultRangeColumn if info != nil { rangeCol = info.RangeColumn @@ -915,86 +923,190 @@ func FallbackRangeDiscovery( continue } - if latestMin.IsZero() || modelRange.Min.After(latestMin) { - latestMin = modelRange.Min - } + // Track ranges based on column type + if colType == RangeColumnTypeBlock { + // Parse raw values as block numbers + var minBlock, maxBlock int64 - if earliestMax.IsZero() || modelRange.Max.Before(earliestMax) { - earliestMax = modelRange.Max - } + if _, scanErr := fmt.Sscanf(modelRange.MinRaw, "%d", &minBlock); scanErr != nil { + gen.log.WithError(scanErr).WithField("model", model).Warn("failed to parse min block number") - reasoning := "Heuristic-based detection (Claude unavailable)" - if isEntity { - reasoning = fmt.Sprintf("Entity model - using %s for time filtering", rangeCol) - } + continue + } - strategies = append(strategies, TableRangeStrategy{ - Model: model, - RangeColumn: rangeCol, - ColumnType: colType, - Confidence: 0.7, - Reasoning: reasoning, - }) - } + if _, scanErr := fmt.Sscanf(modelRange.MaxRaw, "%d", &maxBlock); scanErr != nil { + gen.log.WithError(scanErr).WithField("model", model).Warn("failed to parse max block number") + + continue + } + + if !hasBlockRanges || minBlock > latestBlockMin { + latestBlockMin = minBlock + } + + if !hasBlockRanges || maxBlock < earliestBlockMax { + earliestBlockMax = maxBlock + } + + hasBlockRanges = true + + modelRanges[model] = &modelRangeInfo{ + rangeCol: rangeCol, + colType: colType, + reasoning: "Block-based model", + confidence: 0.7, + } + } else { + // Time-based range + if latestTimeMin.IsZero() || modelRange.Min.After(latestTimeMin) { + latestTimeMin = modelRange.Min + } + + if earliestTimeMax.IsZero() || modelRange.Max.Before(earliestTimeMax) { + earliestTimeMax = modelRange.Max + } - // Handle case where no models have valid ranges - hasRanges := !latestMin.IsZero() && !earliestMax.IsZero() + hasTimeRanges = true + modelRanges[model] = &modelRangeInfo{ + rangeCol: rangeCol, + colType: colType, + reasoning: "Time-based model", + confidence: 0.7, + } + } + } + + // Determine primary type and compute range values var fromValue, toValue string var primaryType RangeColumnType var primaryColumn string - if hasRanges { - // Check for intersection - if latestMin.After(earliestMax) { - return nil, fmt.Errorf("no intersecting range found across all models") + // Calculate time-based range values if we have time models + var timeFromValue, timeToValue string + + if hasTimeRanges { + if latestTimeMin.After(earliestTimeMax) { + gen.log.Warn("no intersecting time range found, using latest available data") + + earliestTimeMax = latestTimeMin.Add(5 * time.Minute) } - // Parse duration string (e.g., "5m", "10m", "1h") rangeDuration, parseErr := time.ParseDuration(duration) if parseErr != nil { - rangeDuration = 5 * time.Minute // Default to 5 minutes if parsing fails + rangeDuration = 5 * time.Minute } - // Use the last N minutes/hours of available data - effectiveMax := earliestMax.Add(-1 * time.Minute) // Account for ingestion lag + effectiveMax := earliestTimeMax.Add(-1 * time.Minute) effectiveMin := effectiveMax.Add(-rangeDuration) - if effectiveMin.Before(latestMin) { - effectiveMin = latestMin + if effectiveMin.Before(latestTimeMin) { + effectiveMin = latestTimeMin } - fromValue = effectiveMin.Format("2006-01-02 15:04:05") - toValue = effectiveMax.Format("2006-01-02 15:04:05") + timeFromValue = effectiveMin.Format("2006-01-02 15:04:05") + timeToValue = effectiveMax.Format("2006-01-02 15:04:05") + } + + // Calculate block-based range values if we have block models + var blockFromValue, blockToValue string - // Determine primary range type based on majority - if len(timeModels)+len(entityModels) >= len(blockModels) { - primaryType = RangeColumnTypeTime - primaryColumn = DefaultRangeColumn - } else { - primaryType = RangeColumnTypeBlock - primaryColumn = "block_number" + if hasBlockRanges { + if latestBlockMin > earliestBlockMax { + gen.log.Warn("no intersecting block range found, using latest available data") + + earliestBlockMax = latestBlockMin + 1000 } - // Update strategies with range values (skip strategies without valid range column) - for i := range strategies { - if strategies[i].RangeColumn != "" { - strategies[i].FromValue = fromValue - strategies[i].ToValue = toValue - } + // For blocks, use a reasonable range (e.g., last 1000 blocks or based on duration) + // Approximate: 1 block every 12 seconds, so 5 minutes = ~25 blocks + rangeDuration, parseErr := time.ParseDuration(duration) + if parseErr != nil { + rangeDuration = 5 * time.Minute + } + + blocksPerDuration := int64(rangeDuration.Seconds() / 12) // ~12 second block time + if blocksPerDuration < 100 { + blocksPerDuration = 100 // Minimum 100 blocks + } + + effectiveMax := earliestBlockMax - 10 // Account for reorgs + effectiveMin := effectiveMax - blocksPerDuration + + if effectiveMin < latestBlockMin { + effectiveMin = latestBlockMin } + + blockFromValue = fmt.Sprintf("%d", effectiveMin) + blockToValue = fmt.Sprintf("%d", effectiveMax) + } + + // Set primary type based on what we have + if hasTimeRanges && !hasBlockRanges { + primaryType = RangeColumnTypeTime + primaryColumn = DefaultRangeColumn + fromValue = timeFromValue + toValue = timeToValue + } else if hasBlockRanges && !hasTimeRanges { + primaryType = RangeColumnTypeBlock + primaryColumn = BlockNumberColumn + fromValue = blockFromValue + toValue = blockToValue + } else if hasTimeRanges && hasBlockRanges { + // Mixed - prefer time as primary + primaryType = RangeColumnTypeTime + primaryColumn = DefaultRangeColumn + fromValue = timeFromValue + toValue = timeToValue } else { - // No valid ranges found - this is an error condition - return nil, fmt.Errorf("no valid range columns found for any model") + // No valid ranges found - check if we have entity-only models + if len(strategies) > 0 { + // All models are entity tables - proceed without primary range + primaryType = RangeColumnTypeNone + primaryColumn = "" + fromValue = "" + toValue = "" + } else { + return nil, fmt.Errorf("no valid range columns found for any model") + } + } + + // Create strategies for models with ranges + for model, rangeInfo := range modelRanges { + strategy := TableRangeStrategy{ + Model: model, + RangeColumn: rangeInfo.rangeCol, + ColumnType: rangeInfo.colType, + Confidence: rangeInfo.confidence, + Reasoning: rangeInfo.reasoning, + } + + // Assign appropriate range values based on column type + if rangeInfo.colType == RangeColumnTypeBlock { + strategy.FromValue = blockFromValue + strategy.ToValue = blockToValue + } else { + strategy.FromValue = timeFromValue + strategy.ToValue = timeToValue + } + + strategies = append(strategies, strategy) } warnings := make([]string, 0) - if len(blockModels) > 0 && len(timeModels) > 0 { + + if hasBlockRanges && hasTimeRanges { warnings = append(warnings, "Mixed range column types detected (time and block). "+ - "Block-based tables may not correlate correctly with time-based filtering.") + "Each table type will use its appropriate range values.") + } + + if len(entityModels) > 0 { + warnings = append(warnings, + fmt.Sprintf("Entity/dimension tables detected (%v). These will query all data without range filtering.", + entityModels)) } return &DiscoveryResult{ @@ -1064,7 +1176,6 @@ func (g *Generator) ValidateStrategyHasData( Strategy: &strategy, Error: err, } - if err != nil { modelCount.HasData = false diff --git a/pkg/seeddata/rangedetect.go b/pkg/seeddata/rangedetect.go index dc9efd4..6b4421c 100644 --- a/pkg/seeddata/rangedetect.go +++ b/pkg/seeddata/rangedetect.go @@ -16,11 +16,12 @@ const ( // rangeColumnPatterns are regex patterns to detect range columns from external model SQL. // Order matters - more specific patterns should come first. +// Note: patterns are lowercase since SQL content is lowercased before matching. var rangeColumnPatterns = []*regexp.Regexp{ - // Pattern: toUnixTimestamp(min(column_name)) as min - regexp.MustCompile(`toUnixTimestamp\s*\(\s*min\s*\(\s*(\w+)\s*\)\s*\)`), - // Pattern: toUnixTimestamp(max(column_name)) as max - regexp.MustCompile(`toUnixTimestamp\s*\(\s*max\s*\(\s*(\w+)\s*\)\s*\)`), + // Pattern: tounixtimestamp(min(column_name)) as min + regexp.MustCompile(`tounixtimestamp\s*\(\s*min\s*\(\s*(\w+)\s*\)\s*\)`), + // Pattern: tounixtimestamp(max(column_name)) as max + regexp.MustCompile(`tounixtimestamp\s*\(\s*max\s*\(\s*(\w+)\s*\)\s*\)`), // Pattern: min(column_name) as min regexp.MustCompile(`(?:^|[^(])\bmin\s*\(\s*(\w+)\s*\)\s+as\s+min`), // Pattern: max(column_name) as max