From a1c9029919ab9bab7806851e5d0af58af740df4d Mon Sep 17 00:00:00 2001 From: rubywtl Date: Fri, 25 Jul 2025 09:00:44 -0700 Subject: [PATCH] initial version with remote execution wrapped on child nodes --- .../tripperware/distributed_optimizer.go | 50 +++++++++++ .../tripperware/distributed_optimizer_test.go | 85 +++++++++++++++++++ pkg/querier/tripperware/distributed_query.go | 6 +- 3 files changed, 140 insertions(+), 1 deletion(-) create mode 100644 pkg/querier/tripperware/distributed_optimizer.go create mode 100644 pkg/querier/tripperware/distributed_optimizer_test.go diff --git a/pkg/querier/tripperware/distributed_optimizer.go b/pkg/querier/tripperware/distributed_optimizer.go new file mode 100644 index 00000000000..409b118af5b --- /dev/null +++ b/pkg/querier/tripperware/distributed_optimizer.go @@ -0,0 +1,50 @@ +package tripperware + +import ( + "github.com/prometheus/prometheus/util/annotations" + "github.com/thanos-io/promql-engine/logicalplan" + "github.com/thanos-io/promql-engine/query" +) + +// This is a simplified implementation. +// Future versions of the distributed optimizer are expected to: +// - Support more complex query patterns. +// - Incorporate diverse optimization strategies. +// - Extend support to node types beyond binary operations. + +type DistributedOptimizer struct{} + +func (d *DistributedOptimizer) Optimize(root logicalplan.Node, opts *query.Options) (logicalplan.Node, annotations.Annotations) { + warns := annotations.New() + + logicalplan.TraverseBottomUp(nil, &root, func(parent, current *logicalplan.Node) bool { + + if (*current).Type() == logicalplan.BinaryNode { + ch := (*current).Children() + + for i, child := range ch { + remoteNode := d.wrapWithRemoteExecution(*child, opts) + *ch[i] = remoteNode + } + } + + return false + }) + return root, *warns +} + +func (d *DistributedOptimizer) wrapWithRemoteExecution(node logicalplan.Node, opts *query.Options) logicalplan.Node { + // the current version only creates one remote execution for one node, no extra sharding based on time ranges + + remoteNodes := make([]logicalplan.RemoteExecution, 1) + + remoteNodes[0] = logicalplan.RemoteExecution{ + Query: node.Clone(), + QueryRangeStart: opts.Start, + QueryRangeEnd: opts.End, + } + + return logicalplan.Deduplicate{ + Expressions: remoteNodes, + } +} diff --git a/pkg/querier/tripperware/distributed_optimizer_test.go b/pkg/querier/tripperware/distributed_optimizer_test.go new file mode 100644 index 00000000000..07ef5a125c5 --- /dev/null +++ b/pkg/querier/tripperware/distributed_optimizer_test.go @@ -0,0 +1,85 @@ +package tripperware + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// Before: +// binary node: sum(a) + sum(b) +// / \ +// aggr: sum(a) aggr: sum(b) +// | | +// vector selector vector selector + +// After dummy distributed optimizer: +// binary node: sum(a) + sum(b) +// / \ +// remote exec remote exec +// | | +// aggr: sum(a) aggr: sum(b) +// | | +// vector selector vector selector + +func TestDistributedOptimizer(t *testing.T) { + testCases := []struct { + name string + query string + start int64 + end int64 + step time.Duration + expected struct { + childrenCount int + remoteExecCount int + deduplicateType bool + } + }{ + { + name: "binary operation with aggregations", + query: "sum(rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])) + sum(rate(node_memory_Active_bytes[5m]))", + start: 100000, + end: 100000, + step: time.Minute, + expected: struct { + childrenCount int + remoteExecCount int + deduplicateType bool + }{ + childrenCount: 2, // binary node should have 2 children + deduplicateType: true, // each RemoteExecution should be wrapped in Deduplicate + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + req := &PrometheusRequest{ + Start: tc.start, + End: tc.end, + Query: tc.query, + } + + middleware := DistributedQueryMiddleware(tc.step, 5*time.Minute) + handler := middleware.Wrap(HandlerFunc(func(_ context.Context, req Request) (Response, error) { + return nil, nil + })) + _, err := handler.Do(context.Background(), req) + require.NoError(t, err) + require.NotNil(t, req.LogicalPlan, "logical plan should be populated") + + startNode := req.LogicalPlan.Root().Children()[0] + children := (*startNode).Children() + require.Len(t, children, tc.expected.childrenCount) + + LHS := *children[0] + RHS := *children[1] + + require.Equal(t, LHS.String(), "dedup(remote(sum(rate(node_cpu_seconds_total{mode!=\"idle\"}[5m]))) [1970-01-01 00:01:40 +0000 UTC, 1970-01-01 00:01:40 +0000 UTC])") + require.Equal(t, RHS.String(), "dedup(remote(sum(rate(node_memory_Active_bytes[5m]))) [1970-01-01 00:01:40 +0000 UTC, 1970-01-01 00:01:40 +0000 UTC])") + }) + } +} diff --git a/pkg/querier/tripperware/distributed_query.go b/pkg/querier/tripperware/distributed_query.go index 02a0692153d..55a452e2fb8 100644 --- a/pkg/querier/tripperware/distributed_query.go +++ b/pkg/querier/tripperware/distributed_query.go @@ -67,7 +67,11 @@ func (d distributedQueryMiddleware) newLogicalPlan(qs string, start time.Time, e logicalPlan := logicalplan.NewFromAST(expr, &qOpts, planOpts) optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers) - return &optimizedPlan, nil + dOptimizer := DistributedOptimizer{} + dOptimizedPlanNode, _ := dOptimizer.Optimize(optimizedPlan.Root(), &qOpts) + lp := logicalplan.New(dOptimizedPlanNode, &qOpts, planOpts) + + return &lp, nil } func (d distributedQueryMiddleware) Do(ctx context.Context, r Request) (Response, error) {