diff --git a/pkg/commands/lab_xatu_cbt_generate_transformation.go b/pkg/commands/lab_xatu_cbt_generate_transformation.go index 57b0d2b..dccf32b 100644 --- a/pkg/commands/lab_xatu_cbt_generate_transformation.go +++ b/pkg/commands/lab_xatu_cbt_generate_transformation.go @@ -5,7 +5,7 @@ import ( "fmt" "os" "path/filepath" - "time" + "strings" "github.com/ethpandaops/xcli/pkg/config" "github.com/ethpandaops/xcli/pkg/constants" @@ -15,42 +15,6 @@ import ( "github.com/spf13/cobra" ) -// rangePreset defines a time range preset for seed data generation. -type rangePreset struct { - Label string // Display text (e.g., "Last 5 minutes") - Value string // Internal identifier (e.g., "5m") - Duration time.Duration // Duration to subtract from effective max -} - -// rangePresets defines the available range presets for seed data generation. -// Presets are ordered from shortest to longest duration. -var rangePresets = []rangePreset{ - // Minutes - {Label: "Last 1 minute", Value: "1m", Duration: 1 * time.Minute}, - {Label: "Last 5 minutes", Value: "5m", Duration: 5 * time.Minute}, - {Label: "Last 15 minutes", Value: "15m", Duration: 15 * time.Minute}, - {Label: "Last 30 minutes", Value: "30m", Duration: 30 * time.Minute}, - // Hours - {Label: "Last 1 hour", Value: "1h", Duration: 1 * time.Hour}, - {Label: "Last 6 hours", Value: "6h", Duration: 6 * time.Hour}, - {Label: "Last 12 hours", Value: "12h", Duration: 12 * time.Hour}, - // Days/Weeks/Months - {Label: "Last 1 day", Value: "1d", Duration: 24 * time.Hour}, - {Label: "Last 1 week", Value: "1w", Duration: 7 * 24 * time.Hour}, - {Label: "Last 1 month", Value: "1mo", Duration: 30 * 24 * time.Hour}, - {Label: "Last 3 months", Value: "3mo", Duration: 90 * 24 * time.Hour}, - {Label: "Last 6 months", Value: "6mo", Duration: 180 * 24 * time.Hour}, - // Custom - {Label: "Custom range", Value: "custom", Duration: 0}, -} - -const ( - // defaultRangePreset is the default range preset value. - defaultRangePreset = "5m" - // ingestionLagBuffer accounts for data ingestion delay when calculating effective max time. - ingestionLagBuffer = 1 * time.Minute -) - // NewLabXatuCBTGenerateTransformationTestCommand creates the command. func NewLabXatuCBTGenerateTransformationTestCommand(log logrus.FieldLogger, configPath string) *cobra.Command { var ( @@ -65,6 +29,7 @@ func NewLabXatuCBTGenerateTransformationTestCommand(log logrus.FieldLogger, conf aiAssertions bool skipExisting bool noSanitizeIPs bool + duration string ) cmd := &cobra.Command{ @@ -104,7 +69,7 @@ S3 Upload Configuration (defaults to Cloudflare R2): S3_BUCKET Override bucket (default: ethpandaops-platform-production-public)`, RunE: func(cmd *cobra.Command, args []string) error { return runGenerateTransformationTest(cmd.Context(), log, configPath, - model, network, spec, rangeColumn, from, to, limit, upload, aiAssertions, skipExisting, !noSanitizeIPs) + model, network, spec, rangeColumn, from, to, limit, upload, aiAssertions, skipExisting, !noSanitizeIPs, duration) }, } @@ -119,6 +84,7 @@ S3 Upload Configuration (defaults to Cloudflare R2): cmd.Flags().BoolVar(&aiAssertions, "ai-assertions", false, "Use Claude to generate assertions") cmd.Flags().BoolVar(&skipExisting, "skip-existing", false, "Skip generating seed data for existing S3 files") cmd.Flags().BoolVar(&noSanitizeIPs, "no-sanitize-ips", false, "Disable IP address sanitization (IPs are sanitized by default)") + cmd.Flags().StringVar(&duration, "duration", "", "Time range duration (e.g., 1m, 5m, 10m, 30m)") return cmd } @@ -131,6 +97,7 @@ func runGenerateTransformationTest( model, network, spec, rangeColumn, from, to string, limit int, upload, aiAssertions, skipExisting, sanitizeIPs bool, + duration string, ) error { // Load configuration labCfg, _, err := config.LoadLabConfig(configPath) @@ -205,93 +172,381 @@ func runGenerateTransformationTest( } } - // Detect range columns for external models - ui.Header("Detecting range columns") + // AI-assisted range discovery + ui.Blank() + ui.Header("Analyzing range strategies") + + // Prompt for duration if not specified + if duration == "" { + durationOpts := []ui.SelectOption{ + {Label: "5m", Description: "recommended", Value: "5m"}, + {Label: "30s", Description: "minimal test", Value: "30s"}, + {Label: "1m", Description: "quick test", Value: "1m"}, + {Label: "10m", Description: "", Value: "10m"}, + {Label: "30m", Description: "", Value: "30m"}, + {Label: "1h", Description: "large dataset", Value: "1h"}, + } - rangeInfos, err := seeddata.DetectRangeColumnsForModels(externalModels, labCfg.Repos.XatuCBT) - if err != nil { - return fmt.Errorf("failed to detect range columns: %w", err) + selectedDuration, durationErr := ui.Select("Time range duration", durationOpts) + if durationErr != nil { + return durationErr + } + + duration = selectedDuration } - // Track detection status for prompting - anyDefault := false - detectedColumns := make(map[string]bool) + ui.Info(fmt.Sprintf("Using %s time range", duration)) + ui.Info("This may take a few minutes for models with many dependencies - grab a coffee ☕") + + var discoveryResult *seeddata.DiscoveryResult + + // Try AI discovery first + discoveryClient, discoveryErr := seeddata.NewClaudeDiscoveryClient(log, gen) + if discoveryErr != nil { + ui.Warning(fmt.Sprintf("Claude CLI not available: %v", discoveryErr)) + ui.Info("Falling back to heuristic range detection") - for _, info := range rangeInfos { - status := "detected" - if !info.Detected { - status = "default" - anyDefault = true + // Fallback to heuristic detection + var rangeInfos map[string]*seeddata.RangeColumnInfo + + rangeInfos, err = seeddata.DetectRangeColumnsForModels(externalModels, labCfg.Repos.XatuCBT) + if err != nil { + return fmt.Errorf("failed to detect range columns: %w", err) } - detectedColumns[info.RangeColumn] = true - ui.Info(fmt.Sprintf(" • %s: %s (%s)", info.Model, info.RangeColumn, status)) - } + discoveryResult, err = seeddata.FallbackRangeDiscovery(ctx, gen, externalModels, network, rangeInfos, duration, labCfg.Repos.XatuCBT) + if err != nil { + return fmt.Errorf("fallback range discovery failed: %w", err) + } + } else { + // Gather schema information + schemaSpinner := ui.NewSpinner("Gathering schema information") + + schemaInfo, schemaErr := discoveryClient.GatherSchemaInfo(ctx, externalModels, network, labCfg.Repos.XatuCBT) + if schemaErr != nil { + schemaSpinner.Fail("Failed to gather schema info") + + return fmt.Errorf("failed to gather schema info: %w", schemaErr) + } + + schemaSpinner.Success(fmt.Sprintf("Schema info gathered for %d models", len(schemaInfo))) + + // Display detected range info + for _, schema := range schemaInfo { + if schema.RangeInfo != nil { + status := "detected" + if !schema.RangeInfo.Detected { + status = "default" + } + + rangeStr := "" + if schema.RangeInfo.MinValue != "" && schema.RangeInfo.MaxValue != "" { + rangeStr = fmt.Sprintf(" [%s → %s]", schema.RangeInfo.MinValue, schema.RangeInfo.MaxValue) + } - // Use common range column or override - if rangeColumn == "" { - rangeColumn = seeddata.FindCommonRangeColumn(rangeInfos) + ui.Info(fmt.Sprintf(" • %s: %s (%s)%s", schema.Model, schema.RangeInfo.Column, status, rangeStr)) + } + } + + // Read transformation SQL + transformationSQL, sqlErr := seeddata.ReadTransformationSQL(model, labCfg.Repos.XatuCBT) + if sqlErr != nil { + return fmt.Errorf("failed to read transformation SQL: %w", sqlErr) + } + + // Read intermediate dependency SQL (for WHERE clause analysis) + intermediateSQL, intErr := seeddata.ReadIntermediateSQL(tree, labCfg.Repos.XatuCBT) + if intErr != nil { + ui.Warning(fmt.Sprintf("Could not read intermediate SQL: %v", intErr)) + // Continue without intermediate SQL - not critical + } + + // Convert to IntermediateSQL slice + var intermediateModels []seeddata.IntermediateSQL + for modelName, sql := range intermediateSQL { + intermediateModels = append(intermediateModels, seeddata.IntermediateSQL{ + Model: modelName, + SQL: sql, + }) + } + + if len(intermediateModels) > 0 { + ui.Info(fmt.Sprintf("Including %d intermediate model(s) for WHERE clause analysis", len(intermediateModels))) + } + + // Invoke Claude for analysis + ui.Blank() - // Prompt user if detection used defaults or found mismatches - shouldPrompt := anyDefault || len(detectedColumns) > 1 + analysisSpinner := ui.NewSpinner("Analyzing correlation strategy with Claude") - if shouldPrompt { - if len(detectedColumns) > 1 { - ui.Warning("Different range columns detected across models") + discoveryResult, err = discoveryClient.AnalyzeRanges(ctx, seeddata.DiscoveryInput{ + TransformationModel: model, + TransformationSQL: transformationSQL, + IntermediateModels: intermediateModels, + Network: network, + Duration: duration, + ExternalModels: schemaInfo, + }) + if err != nil { + analysisSpinner.Fail("AI analysis failed") + ui.Warning(fmt.Sprintf("Claude analysis failed: %v", err)) + ui.Info("Falling back to heuristic range detection") + + // Fallback to heuristic detection + rangeInfos, rangeErr := seeddata.DetectRangeColumnsForModels(externalModels, labCfg.Repos.XatuCBT) + if rangeErr != nil { + return fmt.Errorf("failed to detect range columns: %w", rangeErr) } - rangeColumn, promptErr = promptForRangeColumn(rangeColumn) - if promptErr != nil { - return promptErr + discoveryResult, err = seeddata.FallbackRangeDiscovery(ctx, gen, externalModels, network, rangeInfos, duration, labCfg.Repos.XatuCBT) + if err != nil { + return fmt.Errorf("fallback range discovery failed: %w", err) } + } else { + analysisSpinner.Success(fmt.Sprintf("Strategy generated (confidence: %.0f%%)", discoveryResult.OverallConfidence*100)) } + } + + // Validate that Claude's strategies cover all expected models + // This catches cases where Claude named a model differently + var missingModels []string - ui.Info(fmt.Sprintf("Using range column: %s", rangeColumn)) + for _, extModel := range externalModels { + if discoveryResult.GetStrategy(extModel) == nil { + missingModels = append(missingModels, extModel) + } } - // Query available ranges + if len(missingModels) > 0 { + ui.Blank() + ui.Warning("The following models are NOT covered by Claude's strategy:") + + for _, m := range missingModels { + ui.Warning(fmt.Sprintf(" • %s", m)) + } + + ui.Warning("These will use the primary range column, which may be incorrect.") + ui.Info("Claude's strategies cover these models:") + + for _, s := range discoveryResult.Strategies { + ui.Info(fmt.Sprintf(" • %s", s.Model)) + } + + ui.Blank() + + proceedMissing, missErr := ui.Confirm("Proceed anyway?") + if missErr != nil { + return missErr + } + + if !proceedMissing { + ui.Info("Aborted. Try regenerating with clearer model names.") + + return nil + } + } + + // Display the proposed strategy + ui.Blank() + ui.Header("Proposed Strategy") + ui.Info(fmt.Sprintf("Summary: %s", discoveryResult.Summary)) + ui.Blank() + ui.Info(fmt.Sprintf("Primary Range: %s (%s)", discoveryResult.PrimaryRangeColumn, discoveryResult.PrimaryRangeType)) + ui.Info(fmt.Sprintf(" From: %s", discoveryResult.FromValue)) + ui.Info(fmt.Sprintf(" To: %s", discoveryResult.ToValue)) ui.Blank() - ui.Header("Querying available data ranges") - rangeSpinner := ui.NewSpinner("Querying external ClickHouse") + ui.Info("Per-Table Strategies:") - ranges, err := gen.QueryModelRanges(ctx, externalModels, network, rangeInfos, rangeColumn) - if err != nil { - rangeSpinner.Fail("Failed to query ranges") + for _, strategy := range discoveryResult.Strategies { + confidence := fmt.Sprintf("%.0f%%", strategy.Confidence*100) + bridgeInfo := "" + + if strategy.RequiresBridge { + bridgeInfo = fmt.Sprintf(" (via %s)", strategy.BridgeTable) + } + + // Handle dimension tables (no range) vs regular tables + if strategy.RangeColumn == "" || strategy.ColumnType == seeddata.RangeColumnTypeNone { + ui.Info(fmt.Sprintf(" • %s: (dimension table - all data) %s%s", + strategy.Model, + confidence, + bridgeInfo, + )) + } else { + ui.Info(fmt.Sprintf(" • %s: %s [%s → %s] %s%s", + strategy.Model, + strategy.RangeColumn, + strategy.FromValue, + strategy.ToValue, + confidence, + bridgeInfo, + )) + } + + // Display additional filters if present + if strategy.FilterSQL != "" { + ui.Info(fmt.Sprintf(" Filter: %s", strategy.FilterSQL)) + } + + // Display correlation filter if present (for dimension tables) + if strategy.CorrelationFilter != "" { + // Truncate long subqueries for display + corrFilter := strategy.CorrelationFilter + if len(corrFilter) > 80 { + corrFilter = corrFilter[:77] + "..." + } + + ui.Info(fmt.Sprintf(" Correlation: %s", corrFilter)) + } - return fmt.Errorf("failed to query ranges: %w", err) + // Display if optional + if strategy.Optional { + ui.Info(" (optional - LEFT JOIN)") + } } - rangeSpinner.Success("Range data retrieved") + // Display warnings + if len(discoveryResult.Warnings) > 0 { + ui.Blank() - for _, r := range ranges { - ui.Info(fmt.Sprintf(" • %s: %s", r.Model, r.FormatRange())) + for _, warning := range discoveryResult.Warnings { + ui.Warning(warning) + } } - // Find intersection - intersection, err := seeddata.FindIntersection(ranges) - if err != nil { - ui.Error("No intersecting range found across all models") + // Warn if low confidence + if discoveryResult.OverallConfidence < 0.5 { + ui.Blank() + ui.Warning("Low confidence score - manual review recommended") + } - return fmt.Errorf("range intersection failed: %w", err) + // Validate that each model has data in the proposed range + ui.Blank() + ui.Header("Validating data availability") + + validationSpinner := ui.NewSpinner("Checking row counts for each model") + + validation, validationErr := gen.ValidateStrategyHasData(ctx, discoveryResult, network) + if validationErr != nil { + validationSpinner.Fail("Validation failed") + + return fmt.Errorf("failed to validate strategy: %w", validationErr) } + validationSpinner.Success("Validation complete") + + // Display row counts ui.Blank() - ui.Success(fmt.Sprintf("Intersecting range: %s", intersection.FormatRange())) + ui.Info("Data availability per model:") - // Prompt for range within intersection - if from == "" || to == "" { - from, to, promptErr = promptForRangeWithinIntersection(intersection) - if promptErr != nil { - return promptErr + for _, count := range validation.Counts { + status := "✓" + if !count.HasData { + status = "✗" + } + + if count.Error != nil { + ui.Warning(fmt.Sprintf(" %s %s: error - %v", status, count.Model, count.Error)) + } else { + ui.Info(fmt.Sprintf(" %s %s: %d rows", status, count.Model, count.RowCount)) + } + } + + ui.Blank() + ui.Info(fmt.Sprintf("Total rows across all models: %d", validation.TotalRows)) + + if validation.MinRowModel != "" { + ui.Info(fmt.Sprintf("Model with fewest rows: %s (%d rows)", validation.MinRowModel, validation.MinRowCount)) + } + + // Handle errored models (timeouts, etc.) + if len(validation.ErroredModels) > 0 { + ui.Blank() + ui.Error("The following models FAILED to query (timeout or error):") + + for _, model := range validation.ErroredModels { + ui.Error(fmt.Sprintf(" • %s", model)) + } + + ui.Blank() + ui.Warning("These queries timed out - the tables may be too large or the range too wide.") + ui.Warning("Consider narrowing the block/time range, or proceed if you believe the data exists.") + + proceedWithErrors, errErr := ui.Confirm("Proceed anyway (assuming data exists)?") + if errErr != nil { + return errErr + } + + if !proceedWithErrors { + ui.Info("Aborted by user. Try a narrower range.") + + return nil + } + } + + // Handle empty models (zero rows) + if len(validation.EmptyModels) > 0 { + ui.Blank() + ui.Error("The following models have NO DATA in the proposed range:") + + for _, model := range validation.EmptyModels { + ui.Error(fmt.Sprintf(" • %s", model)) + } + + ui.Blank() + ui.Warning("Empty parquets will be generated for these models, which may cause test failures.") + + expandWindow, expandErr := ui.Confirm("Would you like to expand the time window and retry?") + if expandErr != nil { + return expandErr + } + + if expandWindow { + ui.Info("Please re-run the command with a larger time window or different range.") + ui.Info("Tip: Some tables (like canonical_execution_contracts) may have sparse data.") + + return nil + } + + // Let user proceed anyway if they want + proceedAnyway, proceedErr := ui.Confirm("Proceed anyway with potentially empty data?") + if proceedErr != nil { + return proceedErr + } + + if !proceedAnyway { + ui.Info("Aborted by user") + + return nil } } + // User confirmation ui.Blank() - ui.Info(fmt.Sprintf("Querying range: %s to %s", from, to)) - // Prompt for limit - if limit == defaultRowLimit { + proceed, confirmErr := ui.Confirm("Proceed with this strategy?") + if confirmErr != nil { + return confirmErr + } + + if !proceed { + ui.Info("Aborted by user") + + return nil + } + + // Row limit handling: + // - With AI discovery: use unlimited (0) since Claude already picked sensible ranges + // - Manual/fallback: prompt for limit to avoid accidentally pulling too much data + // - Explicit --limit flag always respected + if discoveryResult != nil && limit == defaultRowLimit { + // AI discovery mode: no limit needed, Claude picked appropriate ranges + limit = 0 + + ui.Info("Using unlimited rows (AI discovery already optimized the range)") + } else if limit == defaultRowLimit { + // Fallback/manual mode: prompt for safety limit, promptErr = promptForLimit() if promptErr != nil { return promptErr @@ -347,6 +602,41 @@ func runGenerateTransformationTest( urls := make(map[string]string, len(externalModels)) for _, extModel := range externalModels { + // Get the strategy for this model + strategy := discoveryResult.GetStrategy(extModel) + if strategy == nil { + ui.Warning(fmt.Sprintf("No strategy found for %s, using defaults", extModel)) + + // Detect the correct range column for this model instead of blindly using primary + detectedCol, detectErr := seeddata.DetectRangeColumnForModel(extModel, labCfg.Repos.XatuCBT) + if detectErr != nil { + ui.Warning(fmt.Sprintf("Could not detect range column for %s: %v", extModel, detectErr)) + + detectedCol = seeddata.DefaultRangeColumn + } + + // Check if detected column type matches primary range type + colLower := strings.ToLower(detectedCol) + isTimeColumn := strings.Contains(colLower, "date") || strings.Contains(colLower, "time") + primaryIsTime := discoveryResult.PrimaryRangeType == seeddata.RangeColumnTypeTime + + if isTimeColumn != primaryIsTime { + // Column types don't match - we can't use primary range values + ui.Error(fmt.Sprintf(" %s uses %s but primary range is %s - cannot convert automatically", + extModel, detectedCol, discoveryResult.PrimaryRangeColumn)) + ui.Error(" Please re-run with Claude to get proper correlation, or use --range-column to override") + + return fmt.Errorf("cannot generate %s: range column type mismatch", extModel) + } + + strategy = &seeddata.TableRangeStrategy{ + Model: extModel, + RangeColumn: detectedCol, + FromValue: discoveryResult.FromValue, + ToValue: discoveryResult.ToValue, + } + } + filename := seeddata.GetParquetFilename(model, extModel) outputPath := fmt.Sprintf("./%s", filename) @@ -362,19 +652,48 @@ func runGenerateTransformationTest( } } + // Show query parameters (helps debug empty parquets) + filterInfo := "" + if strategy.FilterSQL != "" { + filterInfo = fmt.Sprintf(" + filter: %s", strategy.FilterSQL) + } + + if strategy.CorrelationFilter != "" { + // Truncate long subqueries for display + corrFilter := strategy.CorrelationFilter + if len(corrFilter) > 60 { + corrFilter = corrFilter[:57] + "..." + } + + filterInfo += fmt.Sprintf(" + correlation: %s", corrFilter) + } + + // Handle dimension tables (no range) vs regular tables + if strategy.RangeColumn == "" || strategy.ColumnType == seeddata.RangeColumnTypeNone { + if strategy.CorrelationFilter != "" { + ui.Info(fmt.Sprintf(" %s: (correlated dimension table)%s", extModel, filterInfo)) + } else { + ui.Info(fmt.Sprintf(" %s: (dimension table - all data)%s", extModel, filterInfo)) + } + } else { + ui.Info(fmt.Sprintf(" %s: %s [%s → %s]%s", extModel, strategy.RangeColumn, strategy.FromValue, strategy.ToValue, filterInfo)) + } + genSpinner := ui.NewSpinner(fmt.Sprintf("Generating %s", extModel)) result, genErr := gen.Generate(ctx, seeddata.GenerateOptions{ - Model: extModel, - Network: network, - Spec: spec, - RangeColumn: rangeColumn, - From: from, - To: to, - Limit: limit, - OutputPath: outputPath, - SanitizeIPs: sanitizeIPs, - Salt: salt, + Model: extModel, + Network: network, + Spec: spec, + RangeColumn: strategy.RangeColumn, + From: strategy.FromValue, + To: strategy.ToValue, + FilterSQL: strategy.FilterSQL, + CorrelationFilter: strategy.CorrelationFilter, + Limit: limit, + OutputPath: outputPath, + SanitizeIPs: sanitizeIPs, + Salt: salt, }) if genErr != nil { genSpinner.Fail(fmt.Sprintf("Failed to generate %s", extModel)) @@ -384,6 +703,18 @@ func runGenerateTransformationTest( genSpinner.Success(fmt.Sprintf("%s (%s)", extModel, formatFileSize(result.FileSize))) + // Warn if file is too large for comfortable test imports + const largeFileThreshold = 15 * 1024 * 1024 // 15MB + if result.FileSize > largeFileThreshold { + ui.Warning(fmt.Sprintf(" Large file (%s) - may slow down tests on low-powered machines. Consider using a shorter duration.", + formatFileSize(result.FileSize))) + } + + // Show query for first model to help debug empty parquets + if extModel == externalModels[0] { + ui.Info(fmt.Sprintf(" Query: %s", result.Query)) + } + // Display sanitized columns if any if len(result.SanitizedColumns) > 0 { ui.Info(fmt.Sprintf(" Sanitized IP columns: %v", result.SanitizedColumns)) @@ -522,116 +853,6 @@ func promptForTransformationModel(xatuCBTPath string) (string, error) { return ui.Select("Select transformation model", options) } -func promptForRangeColumn(defaultColumn string) (string, error) { - // Don't pre-fill input - pterm concatenates instead of replacing - // Show default in prompt, user can press enter to accept - column, err := ui.TextInput( - fmt.Sprintf("Range column [%s]", defaultColumn), - "") - if err != nil { - return "", err - } - - if column == "" { - return defaultColumn, nil - } - - return column, nil -} - -func promptForRangeWithinIntersection(intersection *seeddata.ModelRange) (string, string, error) { - // Account for ingestion lag when calculating effective max time - effectiveMax := intersection.Max.Add(-ingestionLagBuffer) - availableDuration := effectiveMax.Sub(intersection.Min) - - // Check if we have any usable range - if availableDuration <= 0 { - return "", "", fmt.Errorf( - "intersection range too short after accounting for ingestion lag (need > %s)", - ingestionLagBuffer, - ) - } - - ui.Info(fmt.Sprintf("Effective end (accounting for lag): %s", - effectiveMax.Format("2006-01-02 15:04:05"))) - - // Build select options from presets - options := make([]ui.SelectOption, 0, len(rangePresets)) - - for _, preset := range rangePresets { - opt := ui.SelectOption{ - Label: preset.Label, - Value: preset.Value, - } - - // Mark presets that exceed available range (but still allow selection) - if preset.Duration > 0 && preset.Duration > availableDuration { - opt.Description = "(exceeds available range)" - } - - options = append(options, opt) - } - - // Show selection with default preset - selected, err := ui.SelectWithDefault("Select time range", options, defaultRangePreset) - if err != nil { - return "", "", err - } - - // Handle custom range selection - if selected == "custom" { - return promptForCustomRange(intersection) - } - - // Find the selected preset and calculate the range - for _, preset := range rangePresets { - if preset.Value == selected { - fromTime := effectiveMax.Add(-preset.Duration) - toTime := effectiveMax - - return fromTime.Format("2006-01-02 15:04:05"), toTime.Format("2006-01-02 15:04:05"), nil - } - } - - return "", "", fmt.Errorf("unknown preset: %s", selected) -} - -// promptForCustomRange handles manual From/To input for custom range selection. -func promptForCustomRange(intersection *seeddata.ModelRange) (string, string, error) { - defaultFrom := intersection.Min.Format("2006-01-02 15:04:05") - defaultTo := intersection.Max.Format("2006-01-02 15:04:05") - - ui.Info(fmt.Sprintf("Enter range within intersection (%s to %s)", - intersection.Min.Format("2006-01-02 15:04:05"), - intersection.Max.Format("2006-01-02 15:04:05"))) - - // Don't pre-fill input - pterm concatenates instead of replacing - // Show default in prompt, user can press enter to accept - from, err := ui.TextInput( - fmt.Sprintf("From [%s]", defaultFrom), - "") - if err != nil { - return "", "", err - } - - if from == "" { - from = defaultFrom - } - - to, err := ui.TextInput( - fmt.Sprintf("To [%s]", defaultTo), - "") - if err != nil { - return "", "", err - } - - if to == "" { - to = defaultTo - } - - return from, to, nil -} - func generateAIAssertions(ctx context.Context, log logrus.FieldLogger, model string, externalModels []string, xatuCBTPath string) ([]seeddata.Assertion, error) { aiSpinner := ui.NewSpinner("Analyzing transformation SQL with Claude") diff --git a/pkg/seeddata/assertions.go b/pkg/seeddata/assertions.go index 5554dd4..7f80471 100644 --- a/pkg/seeddata/assertions.go +++ b/pkg/seeddata/assertions.go @@ -220,7 +220,7 @@ func extractYAMLFromResponse(response string) string { return strings.TrimSpace(matches[1]) } - // If no code block, look for content starting with a YAML list + // If no code block, look for content starting with a YAML list or discovery YAML lines := strings.Split(response, "\n") var yamlLines []string @@ -229,13 +229,21 @@ func extractYAMLFromResponse(response string) string { for _, line := range lines { trimmed := strings.TrimSpace(line) - if strings.HasPrefix(trimmed, "- name:") { + + // Start YAML when we see assertion-style or discovery-style start markers + if strings.HasPrefix(trimmed, "- name:") || + strings.HasPrefix(trimmed, "primaryRangeType:") || + strings.HasPrefix(trimmed, "primary_range_type:") { inYAML = true } if inYAML { - // Stop if we hit non-YAML content - if trimmed != "" && !strings.HasPrefix(line, " ") && !strings.HasPrefix(line, "-") && !strings.HasPrefix(line, "\t") { + // Stop if we hit non-YAML content (text that doesn't look like YAML) + if trimmed != "" && + !strings.HasPrefix(line, " ") && + !strings.HasPrefix(line, "-") && + !strings.HasPrefix(line, "\t") && + !strings.Contains(line, ":") { break } diff --git a/pkg/seeddata/dependencies.go b/pkg/seeddata/dependencies.go index 5a41d41..d453539 100644 --- a/pkg/seeddata/dependencies.go +++ b/pkg/seeddata/dependencies.go @@ -40,10 +40,28 @@ type DependencyTree struct { // dependencyPattern matches dependency strings like "{{transformation}}.model_name" or "{{external}}.model_name". var dependencyPattern = regexp.MustCompile(`^\{\{(transformation|external)\}\}\.(.+)$`) +// IntervalType represents the interval type from external model frontmatter. +type IntervalType string + +const ( + // IntervalTypeSlot is slot-based interval (time ranges via slot_start_date_time). + IntervalTypeSlot IntervalType = "slot" + // IntervalTypeBlock is block number based interval. + IntervalTypeBlock IntervalType = "block" + // IntervalTypeEntity is for dimension/reference tables with no time range. + IntervalTypeEntity IntervalType = "entity" +) + +// intervalConfig represents the interval configuration in model frontmatter. +type intervalConfig struct { + Type IntervalType `yaml:"type"` +} + // sqlFrontmatter represents the YAML frontmatter in SQL files. type sqlFrontmatter struct { - Table string `yaml:"table"` - Dependencies []string `yaml:"dependencies"` + Table string `yaml:"table"` + Dependencies []string `yaml:"dependencies"` + Interval intervalConfig `yaml:"interval"` } // ParseDependencies parses the dependencies from a SQL file's YAML frontmatter. @@ -84,16 +102,16 @@ func ResolveDependencyTree(model string, xatuCBTPath string, visited map[string] defer func() { visited[model] = false }() - // First, try to find as transformation model - transformationPath := filepath.Join(xatuCBTPath, "models", "transformations", model+".sql") + // Try to find as transformation model (supports .sql and .yml extensions) + transformationPath := findModelFile(xatuCBTPath, "transformations", model) - if _, err := os.Stat(transformationPath); err == nil { + if transformationPath != "" { return resolveTransformationTree(model, transformationPath, xatuCBTPath, visited) } // If not found as transformation, check if it's an external model - externalPath := filepath.Join(xatuCBTPath, "models", "external", model+".sql") - if _, err := os.Stat(externalPath); err == nil { + externalPath := findModelFile(xatuCBTPath, "external", model) + if externalPath != "" { return &DependencyTree{ Model: model, Type: DependencyTypeExternal, @@ -105,6 +123,20 @@ func ResolveDependencyTree(model string, xatuCBTPath string, visited map[string] return nil, fmt.Errorf("model '%s' not found in transformations or external models", model) } +// findModelFile looks for a model file with supported extensions (.sql, .yml, .yaml). +func findModelFile(xatuCBTPath, folder, model string) string { + extensions := []string{".sql", ".yml", ".yaml"} + + for _, ext := range extensions { + path := filepath.Join(xatuCBTPath, "models", folder, model+ext) + if _, err := os.Stat(path); err == nil { + return path + } + } + + return "" +} + // resolveTransformationTree resolves a transformation model's dependency tree. func resolveTransformationTree(model, sqlPath, xatuCBTPath string, visited map[string]bool) (*DependencyTree, error) { deps, err := ParseDependencies(sqlPath) @@ -191,6 +223,67 @@ func (t *DependencyTree) PrintTree(indent string) string { return sb.String() } +// GetIntermediateDependencies returns all intermediate (transformation) model names from the +// dependency tree, excluding the root model. These are non-leaf nodes that transform external data. +// The result is deduplicated. +func (t *DependencyTree) GetIntermediateDependencies() []string { + seen := make(map[string]bool, 8) + intermediates := make([]string, 0, 8) + + t.collectIntermediateDeps(seen, &intermediates, true) + + return intermediates +} + +// collectIntermediateDeps recursively collects intermediate (transformation) dependencies. +func (t *DependencyTree) collectIntermediateDeps(seen map[string]bool, result *[]string, isRoot bool) { + // Skip external models (leaf nodes) + if t.Type == DependencyTypeExternal { + return + } + + // Add this model if it's not the root and not already seen + if !isRoot && !seen[t.Model] { + seen[t.Model] = true + *result = append(*result, t.Model) + } + + // Recurse into children + for _, child := range t.Children { + child.collectIntermediateDeps(seen, result, false) + } +} + +// ReadIntermediateSQL reads the SQL content for all intermediate dependencies. +// Returns a map of model name to SQL content. +// Note: YAML script models (.yml/.yaml) are skipped as they don't contain SQL to analyze. +func ReadIntermediateSQL(tree *DependencyTree, xatuCBTPath string) (map[string]string, error) { + intermediates := tree.GetIntermediateDependencies() + result := make(map[string]string, len(intermediates)) + + for _, model := range intermediates { + modelPath := findModelFile(xatuCBTPath, "transformations", model) + if modelPath == "" { + // Model file not found, skip + continue + } + + // Only read SQL files - YAML script models don't have SQL to analyze + if !strings.HasSuffix(modelPath, ".sql") { + continue + } + + content, err := os.ReadFile(modelPath) + if err != nil { + return nil, fmt.Errorf("failed to read SQL for %s: %w", model, err) + } + + result[model] = string(content) + } + + return result, nil +} + // ListTransformationModels returns a list of available transformation models from the xatu-cbt repo. func ListTransformationModels(xatuCBTPath string) ([]string, error) { modelsDir := filepath.Join(xatuCBTPath, "models", "transformations") @@ -208,17 +301,50 @@ func ListTransformationModels(xatuCBTPath string) ([]string, error) { } name := entry.Name() - if strings.HasSuffix(name, ".sql") { - // Remove .sql extension to get model name - models = append(models, strings.TrimSuffix(name, ".sql")) + + // Support .sql, .yml, and .yaml extensions + for _, ext := range []string{".sql", ".yml", ".yaml"} { + if strings.HasSuffix(name, ext) { + models = append(models, strings.TrimSuffix(name, ext)) + + break + } } } return models, nil } -// parseFrontmatter extracts and parses the YAML frontmatter from a SQL file. -func parseFrontmatter(sqlPath string) (*sqlFrontmatter, error) { +// parseFrontmatter extracts and parses the YAML frontmatter from a SQL file, +// or parses a pure YAML file (.yml/.yaml) directly. +func parseFrontmatter(modelPath string) (*sqlFrontmatter, error) { + // Check if this is a pure YAML file (not SQL with frontmatter) + if strings.HasSuffix(modelPath, ".yml") || strings.HasSuffix(modelPath, ".yaml") { + return parseYAMLFile(modelPath) + } + + // Parse SQL file with YAML frontmatter + return parseSQLFrontmatter(modelPath) +} + +// parseYAMLFile parses a pure YAML model file (.yml or .yaml). +func parseYAMLFile(yamlPath string) (*sqlFrontmatter, error) { + content, err := os.ReadFile(yamlPath) + if err != nil { + return nil, fmt.Errorf("failed to read file: %w", err) + } + + var fm sqlFrontmatter + + if err := yaml.Unmarshal(content, &fm); err != nil { + return nil, fmt.Errorf("failed to parse YAML file: %w", err) + } + + return &fm, nil +} + +// parseSQLFrontmatter extracts and parses the YAML frontmatter from a SQL file. +func parseSQLFrontmatter(sqlPath string) (*sqlFrontmatter, error) { file, err := os.Open(sqlPath) if err != nil { return nil, fmt.Errorf("failed to open file: %w", err) @@ -269,6 +395,53 @@ func parseFrontmatter(sqlPath string) (*sqlFrontmatter, error) { return &fm, nil } +// GetExternalModelIntervalType returns the interval type for an external model. +// Returns IntervalTypeEntity for dimension tables, or the actual type (slot, block, etc.). +func GetExternalModelIntervalType(model, xatuCBTPath string) (IntervalType, error) { + modelPath := findModelFile(xatuCBTPath, "external", model) + if modelPath == "" { + return "", fmt.Errorf("external model '%s' not found", model) + } + + fm, err := parseFrontmatter(modelPath) + if err != nil { + return "", fmt.Errorf("failed to parse frontmatter: %w", err) + } + + if fm.Interval.Type == "" { + // Default to slot if not specified + return IntervalTypeSlot, nil + } + + return fm.Interval.Type, nil +} + +// IsEntityModel checks if an external model is an entity/dimension table. +func IsEntityModel(model, xatuCBTPath string) bool { + intervalType, err := GetExternalModelIntervalType(model, xatuCBTPath) + if err != nil { + return false + } + + return intervalType == IntervalTypeEntity +} + +// GetExternalModelIntervalTypes returns interval types for multiple external models. +func GetExternalModelIntervalTypes(models []string, xatuCBTPath string) (map[string]IntervalType, error) { + result := make(map[string]IntervalType, len(models)) + + for _, model := range models { + intervalType, err := GetExternalModelIntervalType(model, xatuCBTPath) + if err != nil { + return nil, fmt.Errorf("failed to get interval type for %s: %w", model, err) + } + + result[model] = intervalType + } + + return result, nil +} + // parseDependencyString parses a dependency string like "{{transformation}}.model_name". func parseDependencyString(depStr string) (Dependency, error) { matches := dependencyPattern.FindStringSubmatch(depStr) diff --git a/pkg/seeddata/discovery.go b/pkg/seeddata/discovery.go new file mode 100644 index 0000000..cd5e129 --- /dev/null +++ b/pkg/seeddata/discovery.go @@ -0,0 +1,1211 @@ +package seeddata + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "path/filepath" + "regexp" + "strings" + "time" + + "github.com/sirupsen/logrus" + "gopkg.in/yaml.v3" +) + +// RangeColumnType identifies the semantic type of a range column. +type RangeColumnType string + +const ( + // RangeColumnTypeTime represents DateTime columns like slot_start_date_time. + RangeColumnTypeTime RangeColumnType = "time" + // RangeColumnTypeBlock represents block number columns (UInt64/Int64). + RangeColumnTypeBlock RangeColumnType = "block" + // RangeColumnTypeSlot represents slot number columns (UInt64/Int64). + RangeColumnTypeSlot RangeColumnType = "slot" + // RangeColumnTypeEpoch represents epoch number columns. + RangeColumnTypeEpoch RangeColumnType = "epoch" + // RangeColumnTypeNone represents dimension/reference tables with no time-based range. + RangeColumnTypeNone RangeColumnType = "none" + // RangeColumnTypeUnknown represents an unclassified column type. + RangeColumnTypeUnknown RangeColumnType = "unknown" +) + +// TableRangeStrategy describes how to filter a single table for seed data extraction. +type TableRangeStrategy struct { + Model string `yaml:"model"` + RangeColumn string `yaml:"rangeColumn"` + ColumnType RangeColumnType `yaml:"columnType"` + FromValue string `yaml:"fromValue"` + ToValue string `yaml:"toValue"` + FilterSQL string `yaml:"filterSql,omitempty"` + CorrelationFilter string `yaml:"correlationFilter,omitempty"` // Subquery filter for dimension tables + Optional bool `yaml:"optional,omitempty"` // True if table is optional (LEFT JOIN) + RequiresBridge bool `yaml:"requiresBridge"` + BridgeTable string `yaml:"bridgeTable,omitempty"` + BridgeJoinSQL string `yaml:"bridgeJoinSql,omitempty"` + Confidence float64 `yaml:"confidence"` + Reasoning string `yaml:"reasoning"` +} + +// DiscoveryResult contains the complete AI-generated range strategy. +type DiscoveryResult struct { + PrimaryRangeType RangeColumnType `yaml:"primaryRangeType"` + PrimaryRangeColumn string `yaml:"primaryRangeColumn"` + FromValue string `yaml:"fromValue"` + ToValue string `yaml:"toValue"` + Strategies []TableRangeStrategy `yaml:"strategies"` + OverallConfidence float64 `yaml:"overallConfidence"` + Summary string `yaml:"summary"` + Warnings []string `yaml:"warnings,omitempty"` +} + +// GetStrategy returns the strategy for a specific model. +// Uses case-insensitive matching and trims whitespace to handle variations in Claude's output. +func (d *DiscoveryResult) GetStrategy(model string) *TableRangeStrategy { + modelLower := strings.ToLower(strings.TrimSpace(model)) + + for i := range d.Strategies { + strategyModel := strings.ToLower(strings.TrimSpace(d.Strategies[i].Model)) + if strategyModel == modelLower { + return &d.Strategies[i] + } + } + + return nil +} + +// TableSchemaInfo contains schema information for a table. +type TableSchemaInfo struct { + Model string `yaml:"model"` + IntervalType IntervalType `yaml:"intervalType,omitempty"` // From model frontmatter (slot, block, entity) + Columns []ColumnInfo `yaml:"columns"` + SampleData []map[string]any `yaml:"sampleData,omitempty"` + RangeInfo *DetectedRange `yaml:"rangeInfo,omitempty"` +} + +// DetectedRange contains detected range information. +type DetectedRange struct { + Column string `yaml:"column"` + ColumnType RangeColumnType `yaml:"type"` + Detected bool `yaml:"detected"` + MinValue string `yaml:"minValue,omitempty"` + MaxValue string `yaml:"maxValue,omitempty"` +} + +// DiscoveryInput contains all information gathered for Claude analysis. +type DiscoveryInput struct { + TransformationModel string `yaml:"transformationModel"` + TransformationSQL string `yaml:"transformationSql"` + IntermediateModels []IntermediateSQL `yaml:"intermediateModels,omitempty"` // SQL for intermediate deps + Network string `yaml:"network"` + Duration string `yaml:"duration"` // e.g., "5m", "10m", "1h" + ExternalModels []TableSchemaInfo `yaml:"externalModels"` +} + +// IntermediateSQL contains SQL for an intermediate transformation model. +type IntermediateSQL struct { + Model string `yaml:"model"` + SQL string `yaml:"sql"` +} + +// ClaudeDiscoveryClient handles AI-assisted range discovery. +type ClaudeDiscoveryClient struct { + log logrus.FieldLogger + claudePath string + timeout time.Duration + gen *Generator +} + +// NewClaudeDiscoveryClient creates a new discovery client. +func NewClaudeDiscoveryClient(log logrus.FieldLogger, gen *Generator) (*ClaudeDiscoveryClient, error) { + claudePath, err := findClaudeBinaryPath() + if err != nil { + return nil, fmt.Errorf("claude CLI not found: %w", err) + } + + return &ClaudeDiscoveryClient{ + log: log.WithField("component", "claude-discovery"), + claudePath: claudePath, + timeout: 5 * time.Minute, // Discovery can take longer than assertions + gen: gen, + }, nil +} + +// IsAvailable checks if Claude CLI is accessible. +func (c *ClaudeDiscoveryClient) IsAvailable() bool { + if c.claudePath == "" { + return false + } + + info, err := os.Stat(c.claudePath) + if err != nil { + return false + } + + return !info.IsDir() && info.Mode()&0111 != 0 +} + +// GatherSchemaInfo collects schema information for all external models. +// All tables are treated equally - Claude will analyze the schema to determine +// the best filtering strategy for each table. +func (c *ClaudeDiscoveryClient) GatherSchemaInfo( + ctx context.Context, + models []string, + network string, + xatuCBTPath string, +) ([]TableSchemaInfo, error) { + schemas := make([]TableSchemaInfo, 0, len(models)) + + for _, model := range models { + c.log.WithField("model", model).Debug("gathering schema info") + + // Get interval type from model frontmatter (informational context for Claude) + intervalType, err := GetExternalModelIntervalType(model, xatuCBTPath) + if err != nil { + c.log.WithError(err).WithField("model", model).Warn("failed to get interval type") + + intervalType = IntervalTypeSlot // Default to slot + } + + // Get column schema from ClickHouse + columns, err := c.gen.DescribeTable(ctx, model) + if err != nil { + return nil, fmt.Errorf("failed to describe table %s: %w", model, err) + } + + schemaInfo := TableSchemaInfo{ + Model: model, + IntervalType: intervalType, + Columns: columns, + } + + // Try to detect range column from SQL file + rangeCol, detectErr := DetectRangeColumnForModel(model, xatuCBTPath) + if detectErr != nil { + c.log.WithError(detectErr).WithField("model", model).Debug("failed to detect range column from SQL") + + // For tables without a detected range column, find any time column in schema + // This handles entity tables and other tables without explicit range definitions + rangeCol = findTimeColumnInSchema(columns) + if rangeCol == "" { + rangeCol = DefaultRangeColumn // Last resort fallback + } + } + + // Classify the range column type + colType := ClassifyRangeColumn(rangeCol, columns) + + // Query the range for this model + var minVal, maxVal string + + modelRange, rangeErr := c.gen.QueryModelRange(ctx, model, network, rangeCol) + if rangeErr != nil { + c.log.WithError(rangeErr).WithField("model", model).Warn("failed to query model range") + } else { + minVal = modelRange.MinRaw + maxVal = modelRange.MaxRaw + } + + // Get sample data (limited to 3 rows for prompt size) + sampleData, sampleErr := c.gen.QueryTableSample(ctx, model, network, 3) + if sampleErr != nil { + c.log.WithError(sampleErr).WithField("model", model).Warn("failed to query sample data") + // Continue without sample data - not critical + } + + schemaInfo.SampleData = sampleData + schemaInfo.RangeInfo = &DetectedRange{ + Column: rangeCol, + ColumnType: colType, + Detected: detectErr == nil, // True if detected from SQL, false if fallback + MinValue: minVal, + MaxValue: maxVal, + } + + schemas = append(schemas, schemaInfo) + } + + return schemas, nil +} + +// AnalyzeRanges invokes Claude to analyze range strategies. +func (c *ClaudeDiscoveryClient) AnalyzeRanges( + ctx context.Context, + input DiscoveryInput, +) (*DiscoveryResult, error) { + if !c.IsAvailable() { + return nil, fmt.Errorf("claude CLI is not available") + } + + prompt := c.buildDiscoveryPrompt(input) + + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + + //nolint:gosec // claudePath is validated in findClaudeBinaryPath + cmd := exec.CommandContext(ctx, c.claudePath, "--print") + cmd.Stdin = strings.NewReader(prompt) + + var stdout, stderr bytes.Buffer + + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + c.log.WithFields(logrus.Fields{ + "timeout": c.timeout, + "model": input.TransformationModel, + }).Debug("invoking Claude CLI for range discovery") + + if err := cmd.Run(); err != nil { + if ctx.Err() == context.DeadlineExceeded { + return nil, fmt.Errorf("claude discovery timed out after %s", c.timeout) + } + + return nil, fmt.Errorf("claude CLI failed: %w (stderr: %s)", err, stderr.String()) + } + + response := stdout.String() + if response == "" { + return nil, fmt.Errorf("claude returned empty response") + } + + c.log.WithField("response_length", len(response)).Debug("received Claude response") + + return c.parseDiscoveryResponse(response) +} + +// buildDiscoveryPrompt constructs the prompt for Claude. +func (c *ClaudeDiscoveryClient) buildDiscoveryPrompt(input DiscoveryInput) string { + var sb strings.Builder + + sb.WriteString("## Task\n") + sb.WriteString("Analyze the following ClickHouse tables and determine the best strategy for extracting correlated seed data across all tables for testing.\n\n") + + sb.WriteString("## Context\n") + sb.WriteString(fmt.Sprintf("- Transformation Model: %s\n", input.TransformationModel)) + sb.WriteString(fmt.Sprintf("- Network: %s\n", input.Network)) + sb.WriteString(fmt.Sprintf("- Requested Duration: %s\n", input.Duration)) + sb.WriteString("- Goal: Extract a consistent slice of data from all external models that can be used together for testing the transformation\n\n") + + sb.WriteString("## Problem\n") + sb.WriteString("These tables may use different range column types:\n") + sb.WriteString("- Time-based columns (slot_start_date_time - DateTime)\n") + sb.WriteString("- Numeric columns (block_number - UInt64, slot - UInt64)\n") + sb.WriteString("- **Dimension/reference tables** (no time range - static lookup data like validator entities)\n\n") + sb.WriteString("You need to determine how to correlate these ranges so we get matching data across all tables.\n\n") + + sb.WriteString("**CRITICAL**: The transformation and its intermediate dependencies may have WHERE clauses that filter data.\n") + sb.WriteString("If you extract seed data that doesn't match these filters, the transformation will produce ZERO output rows!\n") + sb.WriteString("You MUST analyze the SQL and include any necessary filters in `filterSql` for each external model.\n\n") + + sb.WriteString("**IMPORTANT**: ALL tables must be filtered to limit data volume. Look at each table's schema to find appropriate filter columns.\n\n") + + sb.WriteString("**DIMENSION/ENTITY TABLES**: For tables that are JOINed as lookups (like validator entities):\n") + sb.WriteString("1. Analyze the JOIN condition in the transformation SQL to find the join key (e.g., validator_index)\n") + sb.WriteString("2. Use `correlationFilter` to filter by values that exist in the primary data tables\n") + sb.WriteString("3. **IMPORTANT**: Use `GLOBAL IN` (not just `IN`) for subqueries - ClickHouse requires this for distributed tables\n") + sb.WriteString("4. Example: If attestations JOIN on validator_index, filter entities to only those validators\n") + sb.WriteString("5. Mark tables as `optional: true` if the transformation can produce output without them (LEFT JOINs)\n") + sb.WriteString("6. If correlation isn't possible, use a reasonable time-based filter on any available DateTime column\n\n") + + sb.WriteString("## External Models and Their Schemas\n\n") + + for _, schema := range input.ExternalModels { + sb.WriteString(fmt.Sprintf("### %s\n", schema.Model)) + + // Show interval type from frontmatter (informational context for Claude) + if schema.IntervalType != "" { + sb.WriteString(fmt.Sprintf("Interval Type: %s\n", schema.IntervalType)) + } + + if schema.RangeInfo != nil { + sb.WriteString(fmt.Sprintf("Detected Range Column: %s (type: %s)\n", + schema.RangeInfo.Column, schema.RangeInfo.ColumnType)) + + if schema.RangeInfo.MinValue != "" && schema.RangeInfo.MaxValue != "" { + sb.WriteString(fmt.Sprintf("Available Range: %s to %s\n", + schema.RangeInfo.MinValue, schema.RangeInfo.MaxValue)) + } + } + + sb.WriteString("\nColumns:\n") + + for _, col := range schema.Columns { + sb.WriteString(fmt.Sprintf("- %s: %s\n", col.Name, col.Type)) + } + + if len(schema.SampleData) > 0 { + sb.WriteString("\nSample Data (first rows):\n```yaml\n") + + sampleYAML, err := yaml.Marshal(schema.SampleData) + if err == nil { + sb.WriteString(string(sampleYAML)) + } + + sb.WriteString("```\n") + } + + sb.WriteString("\n") + } + + sb.WriteString("## Transformation SQL\n```sql\n") + sb.WriteString(input.TransformationSQL) + sb.WriteString("\n```\n\n") + + // Include intermediate dependency SQL if available + if len(input.IntermediateModels) > 0 { + sb.WriteString("## Intermediate Dependency SQL\n") + sb.WriteString("The transformation depends on these intermediate models. Their WHERE clauses affect which seed data is usable:\n\n") + + for _, intermediate := range input.IntermediateModels { + sb.WriteString(fmt.Sprintf("### %s\n```sql\n%s\n```\n\n", intermediate.Model, intermediate.SQL)) + } + } + + sb.WriteString("## Instructions\n") + sb.WriteString("1. Analyze which tables can share a common range column directly\n") + sb.WriteString("2. For tables with different range types, determine if correlation is possible via:\n") + sb.WriteString(" - Direct conversion (e.g., slot to slot_start_date_time via calculation)\n") + sb.WriteString(" - Bridge table (e.g., canonical_beacon_block has both slot and execution block info)\n") + sb.WriteString(" - Shared columns in the data itself\n") + sb.WriteString("3. Recommend a primary range specification (type + column + from/to values)\n") + sb.WriteString(fmt.Sprintf("4. Use a %s time range (as requested by the user)\n", input.Duration)) + sb.WriteString("5. For each table, specify exactly how to filter it\n") + sb.WriteString("6. **CRITICAL**: Analyze ALL WHERE clauses in the transformation and intermediate SQL.\n") + sb.WriteString(" For each external model, identify any column filters that must be applied to get usable data.\n") + sb.WriteString(" Include these as `filterSql` - a SQL fragment like \"aggregation_bits = ''\" or \"attesting_validator_index IS NOT NULL\"\n\n") + + sb.WriteString("## Output Format\n") + sb.WriteString("Output ONLY valid YAML matching this structure.\n") + sb.WriteString("**CRITICAL**: All datetime values MUST be quoted (e.g., \"2025-01-01 00:00:00\") - unquoted colons break YAML!\n\n") + sb.WriteString("```yaml\n") + sb.WriteString("primaryRangeType: time\n") + sb.WriteString("primaryRangeColumn: slot_start_date_time\n") + sb.WriteString("fromValue: \"2025-01-01 00:00:00\"\n") + sb.WriteString("toValue: \"2025-01-01 00:05:00\"\n") + sb.WriteString("strategies:\n") + sb.WriteString(" - model: beacon_api_eth_v1_events_attestation\n") + sb.WriteString(" rangeColumn: slot_start_date_time\n") + sb.WriteString(" columnType: time\n") + sb.WriteString(" fromValue: \"2025-01-01 00:00:00\"\n") + sb.WriteString(" toValue: \"2025-01-01 00:05:00\"\n") + sb.WriteString(" filterSql: \"aggregation_bits = '' AND attesting_validator_index IS NOT NULL\"\n") + sb.WriteString(" requiresBridge: false\n") + sb.WriteString(" confidence: 0.9\n") + sb.WriteString(" reasoning: \"Filters from intermediate SQL\"\n") + sb.WriteString(" - model: ethseer_validator_entity\n") + sb.WriteString(" rangeColumn: \"\"\n") + sb.WriteString(" columnType: none\n") + sb.WriteString(" fromValue: \"\"\n") + sb.WriteString(" toValue: \"\"\n") + sb.WriteString(" filterSql: \"\"\n") + sb.WriteString(" correlationFilter: \"validator_index GLOBAL IN (SELECT DISTINCT attesting_validator_index FROM default.beacon_api_eth_v1_events_attestation WHERE slot_start_date_time >= toDateTime('2025-01-01 00:00:00') AND slot_start_date_time <= toDateTime('2025-01-01 00:05:00') AND meta_network_name = 'mainnet')\"\n") + sb.WriteString(" optional: true\n") + sb.WriteString(" requiresBridge: false\n") + sb.WriteString(" confidence: 0.9\n") + sb.WriteString(" reasoning: \"Entity table filtered by correlation - only validators appearing in attestation data\"\n") + sb.WriteString("overallConfidence: 0.85\n") + sb.WriteString("summary: \"Time-based primary range with filters from dependencies\"\n") + sb.WriteString("warnings:\n") + sb.WriteString(" - \"Filters applied to ensure usable seed data\"\n") + sb.WriteString("```\n\n") + + sb.WriteString("IMPORTANT:\n") + sb.WriteString("- Use actual values from the available ranges shown above\n") + sb.WriteString("- Pick a recent time window (last hour or so) within the intersection of all available ranges\n") + sb.WriteString("- For block_number tables, estimate block numbers that correspond to the chosen time window\n") + sb.WriteString("- Include ALL external models in the strategies list\n") + sb.WriteString("- **ANALYZE ALL WHERE CLAUSES** in transformation and intermediate SQL - missing filters will cause empty test output!\n") + sb.WriteString("- Include `filterSql` for each model (empty string if no additional filters needed)\n") + sb.WriteString("- Output ONLY the YAML, no explanations before or after\n") + + return sb.String() +} + +// parseDiscoveryResponse parses Claude's YAML response. +func (c *ClaudeDiscoveryClient) parseDiscoveryResponse(response string) (*DiscoveryResult, error) { + // Extract YAML from response + yamlContent := extractYAMLFromResponse(response) + if yamlContent == "" { + // Log the raw response for debugging + c.log.WithField("response_preview", truncateString(response, 500)).Error("no valid YAML found in Claude response") + + return nil, fmt.Errorf("no valid YAML found in Claude response") + } + + // Normalize field names (Claude may output snake_case instead of camelCase) + yamlContent = normalizeDiscoveryYAMLFields(yamlContent) + + c.log.WithField("yaml_preview", truncateString(yamlContent, 300)).Debug("extracted YAML content") + + var result DiscoveryResult + + if err := yaml.Unmarshal([]byte(yamlContent), &result); err != nil { + c.log.WithFields(logrus.Fields{ + "error": err, + "yaml_content": truncateString(yamlContent, 500), + }).Error("failed to parse discovery YAML") + + return nil, fmt.Errorf("failed to parse discovery YAML: %w", err) + } + + // Validate result + if err := c.validateDiscoveryResult(&result); err != nil { + c.log.WithFields(logrus.Fields{ + "error": err, + "yaml_content": truncateString(yamlContent, 500), + "parsed": result, + }).Error("invalid discovery result") + + return nil, fmt.Errorf("invalid discovery result: %w", err) + } + + return &result, nil +} + +// normalizeDiscoveryYAMLFields converts common snake_case field names to camelCase +// and fixes common YAML formatting issues in Claude's output. +func normalizeDiscoveryYAMLFields(yamlContent string) string { + // Map of snake_case to camelCase field names + replacements := map[string]string{ + "primary_range_type:": "primaryRangeType:", + "primary_range_column:": "primaryRangeColumn:", + "from_value:": "fromValue:", + "to_value:": "toValue:", + "range_column:": "rangeColumn:", + "column_type:": "columnType:", + "filter_sql:": "filterSql:", + "correlation_filter:": "correlationFilter:", + "requires_bridge:": "requiresBridge:", + "bridge_table:": "bridgeTable:", + "bridge_join_sql:": "bridgeJoinSql:", + "overall_confidence:": "overallConfidence:", + } + + result := yamlContent + for snake, camel := range replacements { + result = strings.ReplaceAll(result, snake, camel) + } + + // Fix unquoted datetime values (e.g., "fromValue: 2025-01-01 00:00:00" -> "fromValue: \"2025-01-01 00:00:00\"") + result = fixUnquotedDatetimes(result) + + return result +} + +// fixUnquotedDatetimes finds unquoted datetime values and adds quotes. +// Matches patterns like "fromValue: 2025-01-01 00:00:00" where the datetime is not quoted. +func fixUnquotedDatetimes(yamlContent string) string { + // Pattern: key followed by colon, space, then YYYY-MM-DD HH:MM:SS (not already quoted) + // We need to be careful not to double-quote already quoted values + lines := strings.Split(yamlContent, "\n") + result := make([]string, 0, len(lines)) + + datetimePattern := regexp.MustCompile(`^(\s*\w+:\s*)(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})(\s*#.*)?$`) + + for _, line := range lines { + matches := datetimePattern.FindStringSubmatch(line) + if matches != nil { + // Found unquoted datetime - add quotes + prefix := matches[1] + datetime := matches[2] + + suffix := "" + if len(matches) > 3 { + suffix = matches[3] + } + + line = prefix + "\"" + datetime + "\"" + suffix + } + + result = append(result, line) + } + + return strings.Join(result, "\n") +} + +// truncateString truncates a string to maxLen characters, adding "..." if truncated. +func truncateString(s string, maxLen int) string { + if len(s) <= maxLen { + return s + } + + return s[:maxLen] + "..." +} + +// validateDiscoveryResult checks if the AI result is valid and complete. +func (c *ClaudeDiscoveryClient) validateDiscoveryResult(result *DiscoveryResult) error { + if result.PrimaryRangeColumn == "" { + return fmt.Errorf("primary_range_column is required") + } + + if result.FromValue == "" || result.ToValue == "" { + return fmt.Errorf("from_value and to_value are required") + } + + if len(result.Strategies) == 0 { + return fmt.Errorf("at least one strategy is required") + } + + for i, s := range result.Strategies { + if s.Model == "" { + return fmt.Errorf("strategy %d: model is required", i) + } + + // Tables can be filtered by: + // 1. Range column (rangeColumn + fromValue/toValue) + // 2. Correlation filter (subquery) + // 3. None type (dimension table - accepts all or filtered by filterSQL) + hasRangeFilter := s.RangeColumn != "" && s.FromValue != "" && s.ToValue != "" + hasCorrelationFilter := s.CorrelationFilter != "" + isNoneType := s.ColumnType == RangeColumnTypeNone + + // Must have at least one filtering mechanism + if !hasRangeFilter && !hasCorrelationFilter && !isNoneType { + return fmt.Errorf("strategy %d (%s): requires range filter (rangeColumn+from/to), correlationFilter, or columnType: none", i, s.Model) + } + + // If range column is specified, from/to are required + if s.RangeColumn != "" && !hasCorrelationFilter && (s.FromValue == "" || s.ToValue == "") { + return fmt.Errorf("strategy %d (%s): from_value and to_value are required when range_column is set without correlationFilter", i, s.Model) + } + } + + return nil +} + +// findTimeColumnInSchema looks for a time-based column in the schema. +// Prefers columns with "date_time" in the name, falls back to any DateTime column. +func findTimeColumnInSchema(columns []ColumnInfo) string { + // First, look for columns with "date_time" in the name (most common pattern) + for _, col := range columns { + colLower := strings.ToLower(col.Name) + if strings.Contains(colLower, "date_time") { + return col.Name + } + } + + // Fall back to any DateTime column + for _, col := range columns { + typeLower := strings.ToLower(col.Type) + if strings.Contains(typeLower, "datetime") { + return col.Name + } + } + + return "" +} + +// contains checks if a string slice contains a value. +func contains(slice []string, value string) bool { + for _, v := range slice { + if v == value { + return true + } + } + + return false +} + +// ClassifyRangeColumn determines the semantic type of a range column based on its name and schema type. +func ClassifyRangeColumn(column string, schema []ColumnInfo) RangeColumnType { + colLower := strings.ToLower(column) + + // Check by column name patterns + switch { + case strings.Contains(colLower, "date_time") || strings.Contains(colLower, "datetime"): + return RangeColumnTypeTime + case strings.Contains(colLower, "timestamp"): + return RangeColumnTypeTime + case colLower == "block_number" || strings.HasSuffix(colLower, "_block_number"): + return RangeColumnTypeBlock + case colLower == "slot" || strings.HasSuffix(colLower, "_slot"): + return RangeColumnTypeSlot + case colLower == "epoch" || strings.HasSuffix(colLower, "_epoch"): + return RangeColumnTypeEpoch + } + + // Check by schema type if available + for _, col := range schema { + if col.Name == column { + typeLower := strings.ToLower(col.Type) + if strings.Contains(typeLower, "datetime") { + return RangeColumnTypeTime + } + + break + } + } + + return RangeColumnTypeUnknown +} + +// QueryTableSample retrieves sample rows from a table for analysis. +func (g *Generator) QueryTableSample( + ctx context.Context, + model string, + network string, + limit int, +) ([]map[string]any, error) { + query := fmt.Sprintf(` + SELECT * + FROM default.%s + WHERE meta_network_name = '%s' + ORDER BY rand() + LIMIT %d + FORMAT JSON + `, model, network, limit) + + g.log.WithFields(logrus.Fields{ + "model": model, + "network": network, + "limit": limit, + }).Debug("querying table sample") + + chURL, err := g.buildClickHouseHTTPURL() + if err != nil { + return nil, fmt.Errorf("failed to build ClickHouse URL: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, chURL, strings.NewReader(query)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "text/plain") + + client := &http.Client{ + Timeout: 30 * time.Second, + } + + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to execute request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + + return nil, fmt.Errorf("ClickHouse returned status %d: %s", resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + + var jsonResp struct { + Data []map[string]any `json:"data"` + } + + if err := json.Unmarshal(body, &jsonResp); err != nil { + return nil, fmt.Errorf("failed to parse JSON response: %w", err) + } + + return jsonResp.Data, nil +} + +// categorizeModelsByType groups models into time, block, entity, and unknown categories. +// Uses interval types from frontmatter if available, falls back to column-based detection. +func categorizeModelsByType( + models []string, + intervalTypes map[string]IntervalType, + rangeInfos map[string]*RangeColumnInfo, +) (timeModels, blockModels, entityModels, unknownModels []string) { + timeModels = make([]string, 0) + blockModels = make([]string, 0) + entityModels = make([]string, 0) + unknownModels = make([]string, 0) + + for _, model := range models { + // First check frontmatter interval type (most accurate) + if intervalTypes != nil { + if intervalType, ok := intervalTypes[model]; ok { + switch intervalType { + case IntervalTypeEntity: + // Entity models need special handling - they have time columns but indexed by entity + entityModels = append(entityModels, model) + + continue + case IntervalTypeBlock: + blockModels = append(blockModels, model) + + continue + case IntervalTypeSlot: + timeModels = append(timeModels, model) + + continue + } + } + } + + // Fallback: use range column detection + info, ok := rangeInfos[model] + if !ok { + unknownModels = append(unknownModels, model) + + continue + } + + colLower := strings.ToLower(info.RangeColumn) + + switch { + case strings.Contains(colLower, "date_time") || strings.Contains(colLower, "timestamp"): + timeModels = append(timeModels, model) + case strings.Contains(colLower, "block"): + blockModels = append(blockModels, model) + default: + unknownModels = append(unknownModels, model) + } + } + + return timeModels, blockModels, entityModels, unknownModels +} + +// FallbackRangeDiscovery provides heuristic-based range discovery when Claude is unavailable. +func FallbackRangeDiscovery( + ctx context.Context, + gen *Generator, + models []string, + network string, + rangeInfos map[string]*RangeColumnInfo, + duration string, + xatuCBTPath string, +) (*DiscoveryResult, error) { + // Get interval types from model frontmatter for accurate categorization + intervalTypes, err := GetExternalModelIntervalTypes(models, xatuCBTPath) + if err != nil { + // Log warning but continue with column-based detection as fallback + gen.log.WithError(err).Warn("failed to get interval types from frontmatter, using column-based detection") + + intervalTypes = nil + } + + // Group models by interval type + timeModels, blockModels, entityModels, unknownModels := categorizeModelsByType(models, intervalTypes, rangeInfos) + + // Query ranges for models + var latestMin, earliestMax time.Time + + strategies := make([]TableRangeStrategy, 0, len(models)) + + for _, model := range models { + info := rangeInfos[model] + + // Check model category + isEntity := contains(entityModels, model) + isUnknown := contains(unknownModels, model) + + var rangeCol string + + var colType RangeColumnType + + // For entity models, find a time column in the schema + if isEntity { + // Query schema to find a time column + columns, schemaErr := gen.DescribeTable(ctx, model) + if schemaErr != nil { + gen.log.WithError(schemaErr).WithField("model", model).Warn("failed to describe entity table") + + strategies = append(strategies, TableRangeStrategy{ + Model: model, + ColumnType: RangeColumnTypeTime, + Confidence: 0.3, + Reasoning: fmt.Sprintf("Entity model (schema query failed: %v)", schemaErr), + }) + + continue + } + + timeCol := findTimeColumnInSchema(columns) + if timeCol == "" { + gen.log.WithField("model", model).Warn("no time column found for entity model") + + strategies = append(strategies, TableRangeStrategy{ + Model: model, + ColumnType: RangeColumnTypeTime, + Confidence: 0.3, + Reasoning: "Entity model - no time column found in schema", + }) + + continue + } + + rangeCol = timeCol + colType = RangeColumnTypeTime + } else if isUnknown { + // Unknown models - try default range column + rangeCol = DefaultRangeColumn + colType = RangeColumnTypeTime + } else { + // Time or block models - use detected range column + rangeCol = DefaultRangeColumn + if info != nil { + rangeCol = info.RangeColumn + } + + colType = ClassifyRangeColumn(rangeCol, nil) + } + + modelRange, queryErr := gen.QueryModelRange(ctx, model, network, rangeCol) + if queryErr != nil { + gen.log.WithError(queryErr).WithField("model", model).Warn("range query failed") + + strategies = append(strategies, TableRangeStrategy{ + Model: model, + RangeColumn: rangeCol, + ColumnType: colType, + Confidence: 0.3, + Reasoning: fmt.Sprintf("Range query failed: %v", queryErr), + }) + + continue + } + + if latestMin.IsZero() || modelRange.Min.After(latestMin) { + latestMin = modelRange.Min + } + + if earliestMax.IsZero() || modelRange.Max.Before(earliestMax) { + earliestMax = modelRange.Max + } + + reasoning := "Heuristic-based detection (Claude unavailable)" + if isEntity { + reasoning = fmt.Sprintf("Entity model - using %s for time filtering", rangeCol) + } + + strategies = append(strategies, TableRangeStrategy{ + Model: model, + RangeColumn: rangeCol, + ColumnType: colType, + Confidence: 0.7, + Reasoning: reasoning, + }) + } + + // Handle case where no models have valid ranges + hasRanges := !latestMin.IsZero() && !earliestMax.IsZero() + + var fromValue, toValue string + + var primaryType RangeColumnType + + var primaryColumn string + + if hasRanges { + // Check for intersection + if latestMin.After(earliestMax) { + return nil, fmt.Errorf("no intersecting range found across all models") + } + + // Parse duration string (e.g., "5m", "10m", "1h") + rangeDuration, parseErr := time.ParseDuration(duration) + if parseErr != nil { + rangeDuration = 5 * time.Minute // Default to 5 minutes if parsing fails + } + + // Use the last N minutes/hours of available data + effectiveMax := earliestMax.Add(-1 * time.Minute) // Account for ingestion lag + effectiveMin := effectiveMax.Add(-rangeDuration) + + if effectiveMin.Before(latestMin) { + effectiveMin = latestMin + } + + fromValue = effectiveMin.Format("2006-01-02 15:04:05") + toValue = effectiveMax.Format("2006-01-02 15:04:05") + + // Determine primary range type based on majority + if len(timeModels)+len(entityModels) >= len(blockModels) { + primaryType = RangeColumnTypeTime + primaryColumn = DefaultRangeColumn + } else { + primaryType = RangeColumnTypeBlock + primaryColumn = "block_number" + } + + // Update strategies with range values (skip strategies without valid range column) + for i := range strategies { + if strategies[i].RangeColumn != "" { + strategies[i].FromValue = fromValue + strategies[i].ToValue = toValue + } + } + } else { + // No valid ranges found - this is an error condition + return nil, fmt.Errorf("no valid range columns found for any model") + } + + warnings := make([]string, 0) + if len(blockModels) > 0 && len(timeModels) > 0 { + warnings = append(warnings, + "Mixed range column types detected (time and block). "+ + "Block-based tables may not correlate correctly with time-based filtering.") + } + + return &DiscoveryResult{ + PrimaryRangeType: primaryType, + PrimaryRangeColumn: primaryColumn, + FromValue: fromValue, + ToValue: toValue, + Strategies: strategies, + OverallConfidence: 0.6, // Lower confidence for heuristic + Summary: "Heuristic-based range detection (Claude unavailable)", + Warnings: warnings, + }, nil +} + +// ReadTransformationSQL reads the SQL file for a transformation model. +func ReadTransformationSQL(model, xatuCBTPath string) (string, error) { + sqlPath := filepath.Join(xatuCBTPath, "models", "transformations", model+".sql") + + content, err := os.ReadFile(sqlPath) + if err != nil { + return "", fmt.Errorf("failed to read transformation SQL: %w", err) + } + + return string(content), nil +} + +// ModelDataCount holds the row count validation result for a model. +type ModelDataCount struct { + Model string + Strategy *TableRangeStrategy + RowCount int64 + HasData bool + Error error +} + +// ValidationResult contains the results of validating a discovery strategy. +type ValidationResult struct { + Counts []ModelDataCount + AllHaveData bool + EmptyModels []string // Models with zero rows + ErroredModels []string // Models that failed to query (timeout, etc.) + TotalRows int64 + MinRowCount int64 + MinRowModel string +} + +// ValidateStrategyHasData queries each model to verify data exists in the proposed ranges. +func (g *Generator) ValidateStrategyHasData( + ctx context.Context, + result *DiscoveryResult, + network string, +) (*ValidationResult, error) { + counts := make([]ModelDataCount, 0, len(result.Strategies)) + emptyModels := make([]string, 0) + erroredModels := make([]string, 0) + + var totalRows int64 + + minRowCount := int64(-1) + minRowModel := "" + + for _, strategy := range result.Strategies { + count, err := g.QueryRowCount(ctx, strategy.Model, network, strategy.RangeColumn, strategy.FromValue, strategy.ToValue, strategy.FilterSQL, strategy.CorrelationFilter) + + modelCount := ModelDataCount{ + Model: strategy.Model, + Strategy: &strategy, + Error: err, + } + + if err != nil { + modelCount.HasData = false + + erroredModels = append(erroredModels, strategy.Model) + } else { + modelCount.RowCount = count + modelCount.HasData = count > 0 + totalRows += count + + if !modelCount.HasData { + emptyModels = append(emptyModels, strategy.Model) + } + + if minRowCount < 0 || count < minRowCount { + minRowCount = count + minRowModel = strategy.Model + } + } + + counts = append(counts, modelCount) + } + + return &ValidationResult{ + Counts: counts, + AllHaveData: len(emptyModels) == 0 && len(erroredModels) == 0, + EmptyModels: emptyModels, + ErroredModels: erroredModels, + TotalRows: totalRows, + MinRowCount: minRowCount, + MinRowModel: minRowModel, + }, nil +} + +// QueryRowCount queries the number of rows in a model for a given range. +// For dimension tables (empty rangeColumn), it counts all rows for the network. +func (g *Generator) QueryRowCount( + ctx context.Context, + model string, + network string, + rangeColumn string, + fromValue string, + toValue string, + filterSQL string, + correlationFilter string, +) (int64, error) { + // Build additional filter clause + filterClause := "" + if filterSQL != "" { + filterClause = fmt.Sprintf("\n AND %s", filterSQL) + } + + if correlationFilter != "" { + filterClause += fmt.Sprintf("\n AND %s", correlationFilter) + } + + var query string + + // Handle dimension tables (no range column) + if rangeColumn == "" { + query = fmt.Sprintf(` + SELECT COUNT(*) as cnt + FROM default.%s + WHERE meta_network_name = '%s'%s + FORMAT JSON + `, model, network, filterClause) + } else { + // Determine if this is a numeric or time-based column + isNumeric := !strings.Contains(strings.ToLower(rangeColumn), "date") && + !strings.Contains(strings.ToLower(rangeColumn), "time") + + if isNumeric { + query = fmt.Sprintf(` + SELECT COUNT(*) as cnt + FROM default.%s + WHERE meta_network_name = '%s' + AND %s >= %s + AND %s <= %s%s + FORMAT JSON + `, model, network, rangeColumn, fromValue, rangeColumn, toValue, filterClause) + } else { + query = fmt.Sprintf(` + SELECT COUNT(*) as cnt + FROM default.%s + WHERE meta_network_name = '%s' + AND %s >= toDateTime('%s') + AND %s <= toDateTime('%s')%s + FORMAT JSON + `, model, network, rangeColumn, fromValue, rangeColumn, toValue, filterClause) + } + } + + g.log.WithFields(logrus.Fields{ + "model": model, + "network": network, + "from": fromValue, + "to": toValue, + }).Debug("querying row count") + + chURL, err := g.buildClickHouseHTTPURL() + if err != nil { + return 0, fmt.Errorf("failed to build ClickHouse URL: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, chURL, strings.NewReader(query)) + if err != nil { + return 0, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "text/plain") + + client := &http.Client{ + Timeout: 2 * time.Minute, // Row count queries on large tables can take time + } + + resp, err := client.Do(req) + if err != nil { + return 0, fmt.Errorf("failed to execute request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + + return 0, fmt.Errorf("ClickHouse returned status %d: %s", resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return 0, fmt.Errorf("failed to read response: %w", err) + } + + var jsonResp struct { + Data []map[string]any `json:"data"` + } + + if err := json.Unmarshal(body, &jsonResp); err != nil { + return 0, fmt.Errorf("failed to parse JSON response: %w", err) + } + + if len(jsonResp.Data) == 0 { + return 0, nil + } + + // Extract count from response + cntVal, ok := jsonResp.Data[0]["cnt"] + if !ok { + return 0, fmt.Errorf("cnt not found in response") + } + + // Handle both string and numeric types + switch v := cntVal.(type) { + case string: + var count int64 + + _, err := fmt.Sscanf(v, "%d", &count) + + return count, err + case float64: + return int64(v), nil + case int64: + return v, nil + default: + return 0, fmt.Errorf("unexpected count type: %T", cntVal) + } +} + +// ExpandWindowMultiplier defines how much to expand the window on each retry. +const ExpandWindowMultiplier = 2 + +// SuggestExpandedStrategy creates a new strategy with an expanded time window. +// This is used when the original strategy has models with no data. +func SuggestExpandedStrategy(original *DiscoveryResult, multiplier int) *DiscoveryResult { + expanded := &DiscoveryResult{ + PrimaryRangeType: original.PrimaryRangeType, + PrimaryRangeColumn: original.PrimaryRangeColumn, + OverallConfidence: original.OverallConfidence * 0.9, // Reduce confidence slightly + Summary: fmt.Sprintf("%s (window expanded %dx)", original.Summary, multiplier), + Warnings: append([]string{}, original.Warnings...), + Strategies: make([]TableRangeStrategy, len(original.Strategies)), + } + + // For time-based ranges, we can try to expand by parsing and adjusting + // For now, just copy and add a warning - the actual expansion would need + // to be done with knowledge of the original window size + copy(expanded.Strategies, original.Strategies) + expanded.Warnings = append(expanded.Warnings, + fmt.Sprintf("Window expanded %dx to find data - verify data quality", multiplier)) + + return expanded +} diff --git a/pkg/seeddata/generator.go b/pkg/seeddata/generator.go index d2748ea..0b57742 100644 --- a/pkg/seeddata/generator.go +++ b/pkg/seeddata/generator.go @@ -37,17 +37,19 @@ func NewGenerator(log logrus.FieldLogger, cfg *config.LabConfig) *Generator { // GenerateOptions contains options for generating seed data. type GenerateOptions struct { - Model string // Table name (e.g., "beacon_api_eth_v1_events_block") - Network string // Network name (e.g., "mainnet", "sepolia") - Spec string // Fork spec (e.g., "pectra", "fusaka") - RangeColumn string // Column to filter on (e.g., "slot", "epoch") - From string // Range start value - To string // Range end value - Filters []Filter // Additional filters - Limit int // Max rows (0 = unlimited) - OutputPath string // Output file path - SanitizeIPs bool // Enable IP address sanitization - Salt string // Salt for IP sanitization (shared across batch for consistency) + Model string // Table name (e.g., "beacon_api_eth_v1_events_block") + Network string // Network name (e.g., "mainnet", "sepolia") + Spec string // Fork spec (e.g., "pectra", "fusaka") + RangeColumn string // Column to filter on (e.g., "slot", "epoch") + From string // Range start value + To string // Range end value + Filters []Filter // Additional filters + FilterSQL string // Raw SQL fragment for additional WHERE conditions (from AI discovery) + CorrelationFilter string // Subquery filter for dimension tables (e.g., "validator_index IN (SELECT ...)") + Limit int // Max rows (0 = unlimited) + OutputPath string // Output file path + SanitizeIPs bool // Enable IP address sanitization + Salt string // Salt for IP sanitization (shared across batch for consistency) // sanitizedColumns is an internal field set by Generate() when SanitizeIPs is true. // It contains the pre-computed column list with IP sanitization expressions. @@ -67,6 +69,7 @@ type GenerateResult struct { RowCount int64 // Number of rows extracted (estimated from file size) FileSize int64 // File size in bytes SanitizedColumns []string // IP columns that were sanitized (for display to user) + Query string // SQL query used (for debugging) } // Generate extracts data from external ClickHouse and writes to a parquet file. @@ -110,10 +113,14 @@ func (g *Generator) Generate(ctx context.Context, opts GenerateOptions) (*Genera query := g.buildQuery(opts) g.log.WithFields(logrus.Fields{ - "model": opts.Model, - "network": opts.Network, - "output": opts.OutputPath, - }).Debug("generating seed data") + "model": opts.Model, + "network": opts.Network, + "output": opts.OutputPath, + "range_column": opts.RangeColumn, + "from": opts.From, + "to": opts.To, + "query": query, + }).Info("generating seed data") // Execute query and stream to file fileSize, err := g.executeQueryToFile(ctx, query, opts.OutputPath) @@ -125,6 +132,7 @@ func (g *Generator) Generate(ctx context.Context, opts GenerateOptions) (*Genera OutputPath: opts.OutputPath, FileSize: fileSize, SanitizedColumns: sanitizedColumns, + Query: query, }, nil } @@ -147,21 +155,33 @@ func (g *Generator) buildQuery(opts GenerateOptions) string { sb.WriteString("'") // Add range filter if specified + // Use column-name-based detection (same logic as validation query in discovery.go) if opts.RangeColumn != "" && opts.From != "" && opts.To != "" { - fromVal := formatSQLValue(opts.From) - toVal := formatSQLValue(opts.To) + colLower := strings.ToLower(opts.RangeColumn) + isTimeColumn := strings.Contains(colLower, "date") || strings.Contains(colLower, "time") sb.WriteString("\n AND ") sb.WriteString(opts.RangeColumn) sb.WriteString(" >= ") - sb.WriteString(fromVal) + + if isTimeColumn { + sb.WriteString(fmt.Sprintf("toDateTime('%s')", opts.From)) + } else { + sb.WriteString(opts.From) // Numeric value as-is + } + sb.WriteString("\n AND ") sb.WriteString(opts.RangeColumn) sb.WriteString(" <= ") - sb.WriteString(toVal) + + if isTimeColumn { + sb.WriteString(fmt.Sprintf("toDateTime('%s')", opts.To)) + } else { + sb.WriteString(opts.To) // Numeric value as-is + } } - // Add additional filters + // Add additional filters (structured) for _, filter := range opts.Filters { sb.WriteString("\n AND ") sb.WriteString(filter.Column) @@ -171,6 +191,18 @@ func (g *Generator) buildQuery(opts GenerateOptions) string { sb.WriteString(formatSQLValue(filter.Value)) } + // Add raw SQL filter if specified (from AI discovery) + if opts.FilterSQL != "" { + sb.WriteString("\n AND ") + sb.WriteString(opts.FilterSQL) + } + + // Add correlation filter if specified (subquery for dimension tables) + if opts.CorrelationFilter != "" { + sb.WriteString("\n AND ") + sb.WriteString(opts.CorrelationFilter) + } + // Add limit if specified if opts.Limit > 0 { sb.WriteString(fmt.Sprintf("\nLIMIT %d", opts.Limit)) diff --git a/pkg/seeddata/s3.go b/pkg/seeddata/s3.go index 0601e66..3018cad 100644 --- a/pkg/seeddata/s3.go +++ b/pkg/seeddata/s3.go @@ -126,17 +126,49 @@ func (u *S3Uploader) Upload(ctx context.Context, opts UploadOptions) (*UploadRes } defer file.Close() - // Upload to S3 + // Get file size for explicit ContentLength + fileInfo, err := file.Stat() + if err != nil { + return nil, fmt.Errorf("failed to stat file: %w", err) + } + + fileSize := fileInfo.Size() + + u.log.WithFields(logrus.Fields{ + "file": opts.LocalPath, + "size": fileSize, + "key": key, + }).Debug("uploading file to S3") + + // Upload to S3 with explicit content length + // Cache-Control: no-cache ensures CDN/browsers always revalidate with origin + // R2 will use ETag for efficient conditional requests (304 Not Modified) _, err = u.client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: aws.String(u.bucket), - Key: aws.String(key), - Body: file, - ContentType: aws.String("application/octet-stream"), + Bucket: aws.String(u.bucket), + Key: aws.String(key), + Body: file, + ContentType: aws.String("application/octet-stream"), + ContentLength: aws.Int64(fileSize), + CacheControl: aws.String("no-cache"), }) if err != nil { return nil, fmt.Errorf("failed to upload to S3: %w", err) } + // Verify upload by checking object metadata + headResp, headErr := u.client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(u.bucket), + Key: aws.String(key), + }) + if headErr != nil { + u.log.WithError(headErr).Warn("failed to verify upload") + } else if headResp.ContentLength != nil && *headResp.ContentLength != fileSize { + return nil, fmt.Errorf("upload verification failed: expected %d bytes but S3 reports %d bytes", + fileSize, *headResp.ContentLength) + } else { + u.log.WithField("verified_size", *headResp.ContentLength).Debug("upload verified") + } + return &UploadResult{ S3URL: fmt.Sprintf("s3://%s/%s", u.bucket, key), PublicURL: fmt.Sprintf("https://%s/%s", u.publicDomain, key),