diff --git a/engine/explain_test.go b/engine/explain_test.go index e645bc980..e709a0bb2 100644 --- a/engine/explain_test.go +++ b/engine/explain_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/thanos-io/promql-engine/api" "github.com/thanos-io/promql-engine/engine" "github.com/efficientgo/core/testutil" @@ -262,3 +263,142 @@ func renderAnalysisTree(node *engine.AnalyzeOutputNode, level int) string { return result.String() } + +func TestQueryAnalyzeWithRemoteExecution(t *testing.T) { + t.Parallel() + opts := promql.EngineOpts{Timeout: 1 * time.Hour} + + var loadStrings []string + loadStrings = append(loadStrings, "load 30s") + + clusters := []string{"cluster-0", "cluster-1", "cluster-2"} + pods := []string{"pod-0", "pod-1", "pod-2", "pod-3", "pod-4"} + namespaces := []string{"ns-0", "ns-1"} + + for _, cluster := range clusters { + for _, pod := range pods { + for _, ns := range namespaces { + loadStrings = append(loadStrings, fmt.Sprintf( + `kube_pod_info{k8s_cluster="%s", pod="%s", namespace="%s"} 1+1x14`, + cluster, pod, ns, + )) + } + } + } + loadTestString := strings.Join(loadStrings, "\n") + + mockStorage := promqltest.LoadedStorage(t, loadTestString) + defer mockStorage.Close() + + numRemoteEngines := 3 + remoteEngines := make([]api.RemoteEngine, 0, numRemoteEngines) + for i := 0; i < numRemoteEngines; i++ { + minT := mockStorage.Head().Meta().MinTime + maxT := mockStorage.Head().Meta().MaxTime + + remoteEngines = append(remoteEngines, engine.NewRemoteEngine( + engine.Opts{EngineOpts: opts, EnableAnalysis: true}, + mockStorage, + minT, + maxT, + []labels.Labels{labels.FromStrings("k8s_cluster", fmt.Sprintf("cluster-%d", i))}, + )) + } + + endpoints := api.NewStaticEndpoints(remoteEngines) + distEngine := engine.NewDistributedEngine(engine.Opts{EngineOpts: opts, EnableAnalysis: true}) + + start := time.Unix(240, 0) + end := time.Unix(1000, 0) + + query := "sum by (k8s_cluster) (kube_pod_info)" + + t.Run("instant_query", func(t *testing.T) { + ctx := context.Background() + q, err := distEngine.MakeInstantQuery(ctx, mockStorage, endpoints, nil, query, start) + testutil.Ok(t, err) + + results := q.Exec(ctx) + testutil.Ok(t, results.Err) + + explainableQuery := q.(engine.ExplainableQuery) + analysis := explainableQuery.Analyze() + + result := renderAnalysisTree(analysis, 0) + t.Log(result) + + testutil.Assert(t, analysis != nil, "analysis should not be nil") + testutil.Assert(t, assertExecutionTimeNonZero(t, analysis)) + + testutil.Assert(t, analysis.TotalSamples() > 0, "total samples from root analysis node should be greater than 0") + + validateOperators(t, analysis, numRemoteEngines) + }) + + t.Run("range_query", func(t *testing.T) { + ctx := context.Background() + q, err := distEngine.MakeRangeQuery(ctx, mockStorage, endpoints, nil, query, start, end, 60*time.Second) + testutil.Ok(t, err) + + results := q.Exec(ctx) + testutil.Ok(t, results.Err) + + explainableQuery := q.(engine.ExplainableQuery) + analysis := explainableQuery.Analyze() + result := renderAnalysisTree(analysis, 0) + t.Log(result) + + // Validate the structure + testutil.Assert(t, analysis != nil, "analysis should not be nil") + testutil.Assert(t, assertExecutionTimeNonZero(t, analysis)) + testutil.Assert(t, analysis.TotalSamples() >= 720, "total samples from root analysis node should be greater than 0") + + validateOperators(t, analysis, numRemoteEngines) + }) +} + +// validateOperators checks that the plan contains the expected operators in the right structure +func validateOperators(t *testing.T, node *engine.AnalyzeOutputNode, numRemoteEngines int) { + var remoteExecNodes []*engine.AnalyzeOutputNode + var aggregateNodes []*engine.AnalyzeOutputNode + var concurrentNodes []*engine.AnalyzeOutputNode + var dedupNodes []*engine.AnalyzeOutputNode + + findOperatorsRecursive(node, "remoteExec", &remoteExecNodes) + findOperatorsRecursive(node, "aggregate", &aggregateNodes) + findOperatorsRecursive(node, "concurrent", &concurrentNodes) + findOperatorsRecursive(node, "dedup", &dedupNodes) + + testutil.Assert(t, len(remoteExecNodes) > 0, "should find at least one remoteExec operator") + for i, rn := range remoteExecNodes { + t.Logf("RemoteExec node %d: %s, TotalSamples: %d, PeakSamples: %d", i, rn.OperatorTelemetry.String(), rn.TotalSamples(), rn.PeakSamples()) + testutil.Assert(t, rn.TotalSamples() > 0, fmt.Sprintf("remoteExec node %d (%s) should have > 0 total samples", i, rn.OperatorTelemetry.String())) + } + + testutil.Assert(t, len(concurrentNodes) > 0, "should find concurrent operator") + + if numRemoteEngines > 1 { + testutil.Assert(t, len(aggregateNodes) > 0, "should find aggregate operator when numRemoteEngines > 1") + testutil.Assert(t, len(dedupNodes) > 0, "should find dedup operator when numRemoteEngines > 1") + } else { + testutil.Assert(t, len(aggregateNodes) == 0, "should NOT find separate aggregate operator when numRemoteEngines == 1") + testutil.Assert(t, len(dedupNodes) == 0, "should not find dedup operator when numRemoteEngines == 1") + // Check if remoteExec itself is doing the aggregation for the single remote engine case + if len(remoteExecNodes) == 1 { + testutil.Assert(t, strings.Contains(remoteExecNodes[0].OperatorTelemetry.String(), "sum by"), "remoteExec should be performing sum aggregation for single remote engine") + } + } +} + +func findOperatorsRecursive(node *engine.AnalyzeOutputNode, operatorNameSubstring string, foundNodes *[]*engine.AnalyzeOutputNode) { + if node == nil { + return + } + if strings.Contains(node.OperatorTelemetry.String(), operatorNameSubstring) { + *foundNodes = append(*foundNodes, node) + } + + for _, child := range node.Children { + findOperatorsRecursive(child, operatorNameSubstring, foundNodes) + } +}