Skip to content

Commit 6b3bd7b

Browse files
authored
Query Frontend For Logical Query Plan Support (#6884)
* frontend: Implement logical query plan middleware under new distributed execution feature flag Signed-off-by: rubywtl <[email protected]> * Combine instant and range logical query plan gen middleware + Update tests Signed-off-by: rubywtl <[email protected]> * remove unneccessary configs in query frontend + restructure logical-plan-gen middleware helper functions Signed-off-by: rubywtl <[email protected]> * adjust prom request logicalplan type and update tests Signed-off-by: rubywtl <[email protected]> * merge related tests and improve readability Signed-off-by: rubywtl <[email protected]> * edit feature flag name and visibility Signed-off-by: rubywtl <[email protected]> * Add lookbackDelta and EnableStepStats back to query frontend config Signed-off-by: rubywtl <[email protected]> * style: code cleanup and readability improvements Signed-off-by: rubywtl <[email protected]> * fix import files lint error Signed-off-by: rubywtl <[email protected]> * lint: fix import orders Signed-off-by: rubywtl <[email protected]> * lint: fix import order Signed-off-by: rubywtl <[email protected]> * remove enableStepStats from query frontend Signed-off-by: rubywtl <[email protected]> * add logical plan optimization-related config param to middleware Signed-off-by: rubywtl <[email protected]> * change params passed-in in logical plan middleware Signed-off-by: rubywtl <[email protected]> --------- Signed-off-by: rubywtl <[email protected]>
1 parent 328bc3b commit 6b3bd7b

17 files changed

+773
-22
lines changed

pkg/cortex/modules.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,12 +533,21 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
533533
prometheusCodec,
534534
shardedPrometheusCodec,
535535
t.Cfg.Querier.LookbackDelta,
536+
t.Cfg.Querier.DefaultEvaluationInterval,
537+
t.Cfg.Frontend.DistributedExecEnabled,
536538
)
537539
if err != nil {
538540
return nil, err
539541
}
540542

541-
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, instantQueryCodec, queryAnalyzer, t.Cfg.Querier.LookbackDelta)
543+
instantQueryMiddlewares, err := instantquery.Middlewares(
544+
util_log.Logger,
545+
t.Overrides,
546+
instantQueryCodec,
547+
queryAnalyzer,
548+
t.Cfg.Querier.LookbackDelta,
549+
t.Cfg.Querier.DefaultEvaluationInterval,
550+
t.Cfg.Frontend.DistributedExecEnabled)
542551
if err != nil {
543552
return nil, err
544553
}

pkg/frontend/config.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ type CombinedFrontendConfig struct {
2020
FrontendV1 v1.Config `yaml:",inline"`
2121
FrontendV2 v2.Config `yaml:",inline"`
2222

23-
DownstreamURL string `yaml:"downstream_url"`
23+
DownstreamURL string `yaml:"downstream_url"`
24+
DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`
2425
}
2526

2627
func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
@@ -29,6 +30,7 @@ func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
2930
cfg.FrontendV2.RegisterFlags(f)
3031

3132
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
33+
f.BoolVar(&cfg.DistributedExecEnabled, "frontend.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.")
3234
}
3335

3436
// InitFrontend initializes frontend (either V1 -- without scheduler, or V2 -- with scheduler) or no frontend at
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package tripperware
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"time"
7+
8+
"github.com/prometheus/prometheus/promql/parser"
9+
"github.com/thanos-io/promql-engine/logicalplan"
10+
"github.com/thanos-io/promql-engine/query"
11+
"github.com/weaveworks/common/httpgrpc"
12+
)
13+
14+
const (
15+
stepBatch = 10
16+
)
17+
18+
func DistributedQueryMiddleware(defaultEvaluationInterval time.Duration, lookbackDelta time.Duration) Middleware {
19+
return MiddlewareFunc(func(next Handler) Handler {
20+
return distributedQueryMiddleware{
21+
next: next,
22+
lookbackDelta: lookbackDelta,
23+
defaultEvaluationInterval: defaultEvaluationInterval,
24+
}
25+
})
26+
}
27+
28+
func getStartAndEnd(start time.Time, end time.Time, step time.Duration) (time.Time, time.Time) {
29+
if step == 0 {
30+
return start, start
31+
}
32+
return start, end
33+
}
34+
35+
type distributedQueryMiddleware struct {
36+
next Handler
37+
defaultEvaluationInterval time.Duration
38+
lookbackDelta time.Duration
39+
}
40+
41+
func (d distributedQueryMiddleware) newLogicalPlan(qs string, start time.Time, end time.Time, step time.Duration) (*logicalplan.Plan, error) {
42+
43+
start, end = getStartAndEnd(start, end, step)
44+
45+
qOpts := query.Options{
46+
Start: start,
47+
End: end,
48+
Step: step,
49+
StepsBatch: stepBatch,
50+
NoStepSubqueryIntervalFn: func(duration time.Duration) time.Duration {
51+
return d.defaultEvaluationInterval
52+
},
53+
// Hardcoded value for execution-time-params that will be re-populated again in the querier stage
54+
LookbackDelta: d.lookbackDelta,
55+
EnablePerStepStats: false,
56+
}
57+
58+
expr, err := parser.NewParser(qs, parser.WithFunctions(parser.Functions)).ParseExpr()
59+
if err != nil {
60+
return nil, err
61+
}
62+
63+
planOpts := logicalplan.PlanOptions{
64+
DisableDuplicateLabelCheck: false,
65+
}
66+
67+
logicalPlan := logicalplan.NewFromAST(expr, &qOpts, planOpts)
68+
optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers)
69+
70+
return &optimizedPlan, nil
71+
}
72+
73+
func (d distributedQueryMiddleware) Do(ctx context.Context, r Request) (Response, error) {
74+
promReq, ok := r.(*PrometheusRequest)
75+
if !ok {
76+
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format")
77+
}
78+
79+
startTime := time.Unix(0, promReq.Start*int64(time.Millisecond))
80+
endTime := time.Unix(0, promReq.End*int64(time.Millisecond))
81+
step := time.Duration(promReq.Step) * time.Millisecond
82+
83+
var err error
84+
85+
newLogicalPlan, err := d.newLogicalPlan(promReq.Query, startTime, endTime, step)
86+
if err != nil {
87+
return nil, err
88+
}
89+
90+
promReq.LogicalPlan = *newLogicalPlan
91+
92+
return d.next.Do(ctx, r)
93+
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package tripperware
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestLogicalPlanGeneration(t *testing.T) {
13+
testCases := []struct {
14+
name string
15+
queryType string // "instant" or "range"
16+
input *PrometheusRequest
17+
err error
18+
}{
19+
// instant query test cases
20+
{
21+
name: "instant - rate vector selector",
22+
queryType: "instant",
23+
input: &PrometheusRequest{
24+
Start: 100000,
25+
End: 100000,
26+
Query: "rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])",
27+
},
28+
},
29+
{
30+
name: "instant - memory usage expression",
31+
queryType: "instant",
32+
input: &PrometheusRequest{
33+
Start: 100000,
34+
End: 100000,
35+
Query: "100 * (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes))",
36+
},
37+
},
38+
{
39+
name: "instant - scalar only query",
40+
queryType: "instant",
41+
input: &PrometheusRequest{
42+
Start: 100000,
43+
End: 100000,
44+
Query: "42",
45+
},
46+
},
47+
{
48+
name: "instant - vector arithmetic",
49+
queryType: "instant",
50+
input: &PrometheusRequest{
51+
Start: 100000,
52+
End: 100000,
53+
Query: "node_load1 / ignoring(cpu) node_cpu_seconds_total",
54+
},
55+
},
56+
{
57+
name: "instant - avg_over_time with nested rate",
58+
queryType: "instant",
59+
input: &PrometheusRequest{
60+
Start: 100000,
61+
End: 100000,
62+
Query: "avg_over_time(rate(http_requests_total[5m])[30m:5m])",
63+
},
64+
},
65+
66+
// query range test cases
67+
{
68+
name: "range - rate vector over time",
69+
queryType: "range",
70+
input: &PrometheusRequest{
71+
Start: 100000,
72+
End: 200000,
73+
Step: 15000,
74+
Query: "rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])",
75+
},
76+
},
77+
{
78+
name: "range - memory usage ratio",
79+
queryType: "range",
80+
input: &PrometheusRequest{
81+
Start: 100000,
82+
End: 200000,
83+
Step: 30000,
84+
Query: "100 * (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes))",
85+
},
86+
},
87+
{
88+
name: "range - avg_over_time function",
89+
queryType: "range",
90+
input: &PrometheusRequest{
91+
Start: 100000,
92+
End: 200000,
93+
Step: 60000,
94+
Query: "avg_over_time(http_requests_total[5m])",
95+
},
96+
},
97+
{
98+
name: "range - vector arithmetic with range",
99+
queryType: "range",
100+
input: &PrometheusRequest{
101+
Start: 100000,
102+
End: 200000,
103+
Step: 10000,
104+
Query: "rate(node_network_receive_bytes_total[1m]) / rate(node_network_transmit_bytes_total[1m])",
105+
},
106+
},
107+
{
108+
name: "range - simple scalar operation",
109+
queryType: "range",
110+
input: &PrometheusRequest{
111+
Start: 100000,
112+
End: 200000,
113+
Step: 15000,
114+
Query: "2 + 2",
115+
},
116+
},
117+
}
118+
119+
for i, tc := range testCases {
120+
tc := tc
121+
t.Run(strconv.Itoa(i)+"_"+tc.name, func(t *testing.T) {
122+
t.Parallel()
123+
124+
middleware := DistributedQueryMiddleware(time.Minute, 5*time.Minute)
125+
126+
handler := middleware.Wrap(HandlerFunc(func(_ context.Context, req Request) (Response, error) {
127+
return nil, nil
128+
}))
129+
130+
// additional validation on the test cases based on query type
131+
if tc.queryType == "range" {
132+
require.NotZero(t, tc.input.Step, "range query should have non-zero step")
133+
require.NotEqual(t, tc.input.Start, tc.input.End, "range query should have different start and end times")
134+
} else {
135+
require.Equal(t, tc.input.Start, tc.input.End, "instant query should have equal start and end times")
136+
require.Zero(t, tc.input.Step, "instant query should have zero step")
137+
}
138+
139+
// test: execute middleware to populate the logical plan
140+
_, err := handler.Do(context.Background(), tc.input)
141+
require.NoError(t, err)
142+
require.NotEmpty(t, tc.input.LogicalPlan, "logical plan should be populated")
143+
144+
})
145+
}
146+
}

pkg/querier/tripperware/instantquery/instant_query.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"github.com/cortexproject/cortex/pkg/util"
2424
"github.com/cortexproject/cortex/pkg/util/limiter"
2525
"github.com/cortexproject/cortex/pkg/util/spanlogger"
26+
27+
"github.com/thanos-io/promql-engine/logicalplan"
2628
)
2729

2830
var (
@@ -141,6 +143,19 @@ func (c instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response,
141143
return &resp, nil
142144
}
143145

146+
func (c instantQueryCodec) getSerializedBody(promReq *tripperware.PrometheusRequest) ([]byte, error) {
147+
var byteLP []byte
148+
var err error
149+
150+
if promReq.LogicalPlan != nil {
151+
byteLP, err = logicalplan.Marshal(promReq.LogicalPlan.Root())
152+
if err != nil {
153+
return nil, err
154+
}
155+
}
156+
return byteLP, nil
157+
}
158+
144159
func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Request) (*http.Request, error) {
145160
promReq, ok := r.(*tripperware.PrometheusRequest)
146161
if !ok {
@@ -168,17 +183,24 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ
168183
}
169184
}
170185

186+
h.Add("Content-Type", "application/json")
187+
171188
isSourceRuler := strings.Contains(h.Get("User-Agent"), tripperware.RulerUserAgent)
172189
if !isSourceRuler {
173190
// When the source is the Ruler, skip set header
174191
tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression)
175192
}
176193

194+
byteBody, err := c.getSerializedBody(promReq)
195+
if err != nil {
196+
return nil, err
197+
}
198+
177199
req := &http.Request{
178-
Method: "GET",
200+
Method: "POST",
179201
RequestURI: u.String(), // This is what the httpgrpc code looks at.
180202
URL: u,
181-
Body: http.NoBody,
203+
Body: io.NopCloser(bytes.NewReader(byteBody)),
182204
Header: h,
183205
}
184206

pkg/querier/tripperware/instantquery/instant_query_middlewares.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,18 @@ func Middlewares(
1515
merger tripperware.Merger,
1616
queryAnalyzer querysharding.Analyzer,
1717
lookbackDelta time.Duration,
18+
defaultEvaluationInterval time.Duration,
19+
distributedExecEnabled bool,
1820
) ([]tripperware.Middleware, error) {
1921
m := []tripperware.Middleware{
2022
NewLimitsMiddleware(limits, lookbackDelta),
2123
tripperware.ShardByMiddleware(log, limits, merger, queryAnalyzer),
2224
}
25+
26+
if distributedExecEnabled {
27+
m = append(m,
28+
tripperware.DistributedQueryMiddleware(defaultEvaluationInterval, lookbackDelta))
29+
}
30+
2331
return m, nil
2432
}

0 commit comments

Comments
 (0)