Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 98 additions & 21 deletions pkg/commands/lab_xatu_cbt_generate_transformation.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func runGenerateTransformationTest(
return fmt.Errorf("failed to detect range columns: %w", err)
}

discoveryResult, err = seeddata.FallbackRangeDiscovery(ctx, gen, externalModels, network, rangeInfos, duration)
discoveryResult, err = seeddata.FallbackRangeDiscovery(ctx, gen, externalModels, network, rangeInfos, duration, labCfg.Repos.XatuCBT)
if err != nil {
return fmt.Errorf("fallback range discovery failed: %w", err)
}
Expand Down Expand Up @@ -254,6 +254,26 @@ func runGenerateTransformationTest(
return fmt.Errorf("failed to read transformation SQL: %w", sqlErr)
}

// Read intermediate dependency SQL (for WHERE clause analysis)
intermediateSQL, intErr := seeddata.ReadIntermediateSQL(tree, labCfg.Repos.XatuCBT)
if intErr != nil {
ui.Warning(fmt.Sprintf("Could not read intermediate SQL: %v", intErr))
// Continue without intermediate SQL - not critical
}

// Convert to IntermediateSQL slice
var intermediateModels []seeddata.IntermediateSQL
for modelName, sql := range intermediateSQL {
intermediateModels = append(intermediateModels, seeddata.IntermediateSQL{
Model: modelName,
SQL: sql,
})
}

if len(intermediateModels) > 0 {
ui.Info(fmt.Sprintf("Including %d intermediate model(s) for WHERE clause analysis", len(intermediateModels)))
}

// Invoke Claude for analysis
ui.Blank()

Expand All @@ -262,6 +282,7 @@ func runGenerateTransformationTest(
discoveryResult, err = discoveryClient.AnalyzeRanges(ctx, seeddata.DiscoveryInput{
TransformationModel: model,
TransformationSQL: transformationSQL,
IntermediateModels: intermediateModels,
Network: network,
Duration: duration,
ExternalModels: schemaInfo,
Expand All @@ -277,7 +298,7 @@ func runGenerateTransformationTest(
return fmt.Errorf("failed to detect range columns: %w", rangeErr)
}

discoveryResult, err = seeddata.FallbackRangeDiscovery(ctx, gen, externalModels, network, rangeInfos, duration)
discoveryResult, err = seeddata.FallbackRangeDiscovery(ctx, gen, externalModels, network, rangeInfos, duration, labCfg.Repos.XatuCBT)
if err != nil {
return fmt.Errorf("fallback range discovery failed: %w", err)
}
Expand Down Expand Up @@ -345,14 +366,44 @@ func runGenerateTransformationTest(
bridgeInfo = fmt.Sprintf(" (via %s)", strategy.BridgeTable)
}

ui.Info(fmt.Sprintf(" • %s: %s [%s → %s] %s%s",
strategy.Model,
strategy.RangeColumn,
strategy.FromValue,
strategy.ToValue,
confidence,
bridgeInfo,
))
// Handle dimension tables (no range) vs regular tables
if strategy.RangeColumn == "" || strategy.ColumnType == seeddata.RangeColumnTypeNone {
ui.Info(fmt.Sprintf(" • %s: (dimension table - all data) %s%s",
strategy.Model,
confidence,
bridgeInfo,
))
} else {
ui.Info(fmt.Sprintf(" • %s: %s [%s → %s] %s%s",
strategy.Model,
strategy.RangeColumn,
strategy.FromValue,
strategy.ToValue,
confidence,
bridgeInfo,
))
}

// Display additional filters if present
if strategy.FilterSQL != "" {
ui.Info(fmt.Sprintf(" Filter: %s", strategy.FilterSQL))
}

// Display correlation filter if present (for dimension tables)
if strategy.CorrelationFilter != "" {
// Truncate long subqueries for display
corrFilter := strategy.CorrelationFilter
if len(corrFilter) > 80 {
corrFilter = corrFilter[:77] + "..."
}

ui.Info(fmt.Sprintf(" Correlation: %s", corrFilter))
}

// Display if optional
if strategy.Optional {
ui.Info(" (optional - LEFT JOIN)")
}
}

// Display warnings
Expand Down Expand Up @@ -602,21 +653,47 @@ func runGenerateTransformationTest(
}

// Show query parameters (helps debug empty parquets)
ui.Info(fmt.Sprintf(" %s: %s [%s → %s]", extModel, strategy.RangeColumn, strategy.FromValue, strategy.ToValue))
filterInfo := ""
if strategy.FilterSQL != "" {
filterInfo = fmt.Sprintf(" + filter: %s", strategy.FilterSQL)
}

if strategy.CorrelationFilter != "" {
// Truncate long subqueries for display
corrFilter := strategy.CorrelationFilter
if len(corrFilter) > 60 {
corrFilter = corrFilter[:57] + "..."
}

filterInfo += fmt.Sprintf(" + correlation: %s", corrFilter)
}

// Handle dimension tables (no range) vs regular tables
if strategy.RangeColumn == "" || strategy.ColumnType == seeddata.RangeColumnTypeNone {
if strategy.CorrelationFilter != "" {
ui.Info(fmt.Sprintf(" %s: (correlated dimension table)%s", extModel, filterInfo))
} else {
ui.Info(fmt.Sprintf(" %s: (dimension table - all data)%s", extModel, filterInfo))
}
} else {
ui.Info(fmt.Sprintf(" %s: %s [%s → %s]%s", extModel, strategy.RangeColumn, strategy.FromValue, strategy.ToValue, filterInfo))
}

genSpinner := ui.NewSpinner(fmt.Sprintf("Generating %s", extModel))

result, genErr := gen.Generate(ctx, seeddata.GenerateOptions{
Model: extModel,
Network: network,
Spec: spec,
RangeColumn: strategy.RangeColumn,
From: strategy.FromValue,
To: strategy.ToValue,
Limit: limit,
OutputPath: outputPath,
SanitizeIPs: sanitizeIPs,
Salt: salt,
Model: extModel,
Network: network,
Spec: spec,
RangeColumn: strategy.RangeColumn,
From: strategy.FromValue,
To: strategy.ToValue,
FilterSQL: strategy.FilterSQL,
CorrelationFilter: strategy.CorrelationFilter,
Limit: limit,
OutputPath: outputPath,
SanitizeIPs: sanitizeIPs,
Salt: salt,
})
if genErr != nil {
genSpinner.Fail(fmt.Sprintf("Failed to generate %s", extModel))
Expand Down
16 changes: 12 additions & 4 deletions pkg/seeddata/assertions.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func extractYAMLFromResponse(response string) string {
return strings.TrimSpace(matches[1])
}

// If no code block, look for content starting with a YAML list
// If no code block, look for content starting with a YAML list or discovery YAML
lines := strings.Split(response, "\n")

var yamlLines []string
Expand All @@ -229,13 +229,21 @@ func extractYAMLFromResponse(response string) string {

for _, line := range lines {
trimmed := strings.TrimSpace(line)
if strings.HasPrefix(trimmed, "- name:") {

// Start YAML when we see assertion-style or discovery-style start markers
if strings.HasPrefix(trimmed, "- name:") ||
strings.HasPrefix(trimmed, "primaryRangeType:") ||
strings.HasPrefix(trimmed, "primary_range_type:") {
inYAML = true
}

if inYAML {
// Stop if we hit non-YAML content
if trimmed != "" && !strings.HasPrefix(line, " ") && !strings.HasPrefix(line, "-") && !strings.HasPrefix(line, "\t") {
// Stop if we hit non-YAML content (text that doesn't look like YAML)
if trimmed != "" &&
!strings.HasPrefix(line, " ") &&
!strings.HasPrefix(line, "-") &&
!strings.HasPrefix(line, "\t") &&
!strings.Contains(line, ":") {
break
}

Expand Down
Loading