Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"runtime"
"runtime/debug"
"time"

"github.com/go-kit/log/level"
"github.com/opentracing-contrib/go-stdlib/nethttp"
Expand Down Expand Up @@ -533,12 +534,25 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
prometheusCodec,
shardedPrometheusCodec,
t.Cfg.Querier.LookbackDelta,
func(time.Duration) time.Duration {
return t.Cfg.Querier.DefaultEvaluationInterval
},
t.Cfg.Frontend.DistributedExecEnabled,
)
if err != nil {
return nil, err
}

instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, instantQueryCodec, queryAnalyzer, t.Cfg.Querier.LookbackDelta)
instantQueryMiddlewares, err := instantquery.Middlewares(
util_log.Logger,
t.Overrides,
instantQueryCodec,
queryAnalyzer,
t.Cfg.Querier.LookbackDelta,
func(time.Duration) time.Duration {
return t.Cfg.Querier.DefaultEvaluationInterval
},
t.Cfg.Frontend.DistributedExecEnabled)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ type CombinedFrontendConfig struct {
FrontendV1 v1.Config `yaml:",inline"`
FrontendV2 v2.Config `yaml:",inline"`

DownstreamURL string `yaml:"downstream_url"`
DownstreamURL string `yaml:"downstream_url"`
DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`
}

func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -29,6 +30,7 @@ func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
cfg.FrontendV2.RegisterFlags(f)

f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
f.BoolVar(&cfg.DistributedExecEnabled, "frontend.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.")
}

// InitFrontend initializes frontend (either V1 -- without scheduler, or V2 -- with scheduler) or no frontend at
Expand Down
89 changes: 89 additions & 0 deletions pkg/querier/tripperware/distributed_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package tripperware

import (
"context"
"net/http"
"time"

"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/promql-engine/query"
"github.com/weaveworks/common/httpgrpc"
)

const (
stepBatch = 10
)

func DistributedQueryMiddleware(noStepSubqueryIntervalFn func(time.Duration) time.Duration) Middleware {
return MiddlewareFunc(func(next Handler) Handler {
return distributedQueryMiddleware{
next: next,
noStepSubqueryIntervalFn: noStepSubqueryIntervalFn,
}
})
}

func getStartAndEnd(start time.Time, end time.Time, step time.Duration) (time.Time, time.Time) {
if step == 0 {
return start, start
}
return start, end
}

type distributedQueryMiddleware struct {
next Handler
noStepSubqueryIntervalFn func(time.Duration) time.Duration
}

func (d distributedQueryMiddleware) newLogicalPlan(qs string, start time.Time, end time.Time, step time.Duration) (*logicalplan.Plan, error) {

start, end = getStartAndEnd(start, end, step)

qOpts := query.Options{
Start: start,
End: end,
Step: step,
StepsBatch: stepBatch,
NoStepSubqueryIntervalFn: d.noStepSubqueryIntervalFn,
// Hardcoded value for execution-time-params that will be re-populated again in the querier stage
LookbackDelta: 0,
EnablePerStepStats: false,
}

expr, err := parser.NewParser(qs, parser.WithFunctions(parser.Functions)).ParseExpr()
if err != nil {
return nil, err
}

planOpts := logicalplan.PlanOptions{
DisableDuplicateLabelCheck: false,
}

logicalPlan := logicalplan.NewFromAST(expr, &qOpts, planOpts)
optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers)

return &optimizedPlan, nil
}

func (d distributedQueryMiddleware) Do(ctx context.Context, r Request) (Response, error) {
promReq, ok := r.(*PrometheusRequest)
if !ok {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format")
}

startTime := time.Unix(0, promReq.Start*int64(time.Millisecond))
endTime := time.Unix(0, promReq.End*int64(time.Millisecond))
step := time.Duration(promReq.Step) * time.Millisecond

var err error

newLogicalPlan, err := d.newLogicalPlan(promReq.Query, startTime, endTime, step)
if err != nil {
return nil, err
}

promReq.LogicalPlan = *newLogicalPlan

return d.next.Do(ctx, r)
}
148 changes: 148 additions & 0 deletions pkg/querier/tripperware/distributed_query_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package tripperware

import (
"context"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestLogicalPlanGeneration(t *testing.T) {
testCases := []struct {
name string
queryType string // "instant" or "range"
input *PrometheusRequest
err error
}{
// instant query test cases
{
name: "instant - rate vector selector",
queryType: "instant",
input: &PrometheusRequest{
Start: 100000,
End: 100000,
Query: "rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])",
},
},
{
name: "instant - memory usage expression",
queryType: "instant",
input: &PrometheusRequest{
Start: 100000,
End: 100000,
Query: "100 * (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes))",
},
},
{
name: "instant - scalar only query",
queryType: "instant",
input: &PrometheusRequest{
Start: 100000,
End: 100000,
Query: "42",
},
},
{
name: "instant - vector arithmetic",
queryType: "instant",
input: &PrometheusRequest{
Start: 100000,
End: 100000,
Query: "node_load1 / ignoring(cpu) node_cpu_seconds_total",
},
},
{
name: "instant - avg_over_time with nested rate",
queryType: "instant",
input: &PrometheusRequest{
Start: 100000,
End: 100000,
Query: "avg_over_time(rate(http_requests_total[5m])[30m:5m])",
},
},

// query range test cases
{
name: "range - rate vector over time",
queryType: "range",
input: &PrometheusRequest{
Start: 100000,
End: 200000,
Step: 15000,
Query: "rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])",
},
},
{
name: "range - memory usage ratio",
queryType: "range",
input: &PrometheusRequest{
Start: 100000,
End: 200000,
Step: 30000,
Query: "100 * (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes))",
},
},
{
name: "range - avg_over_time function",
queryType: "range",
input: &PrometheusRequest{
Start: 100000,
End: 200000,
Step: 60000,
Query: "avg_over_time(http_requests_total[5m])",
},
},
{
name: "range - vector arithmetic with range",
queryType: "range",
input: &PrometheusRequest{
Start: 100000,
End: 200000,
Step: 10000,
Query: "rate(node_network_receive_bytes_total[1m]) / rate(node_network_transmit_bytes_total[1m])",
},
},
{
name: "range - simple scalar operation",
queryType: "range",
input: &PrometheusRequest{
Start: 100000,
End: 200000,
Step: 15000,
Query: "2 + 2",
},
},
}

for i, tc := range testCases {
tc := tc
t.Run(strconv.Itoa(i)+"_"+tc.name, func(t *testing.T) {
t.Parallel()

middleware := DistributedQueryMiddleware(func(time.Duration) time.Duration {
return time.Minute
})

handler := middleware.Wrap(HandlerFunc(func(_ context.Context, req Request) (Response, error) {
return nil, nil
}))

// additional validation on the test cases based on query type
if tc.queryType == "range" {
require.NotZero(t, tc.input.Step, "range query should have non-zero step")
require.NotEqual(t, tc.input.Start, tc.input.End, "range query should have different start and end times")
} else {
require.Equal(t, tc.input.Start, tc.input.End, "instant query should have equal start and end times")
require.Zero(t, tc.input.Step, "instant query should have zero step")
}

// test: execute middleware to populate the logical plan
_, err := handler.Do(context.Background(), tc.input)
require.NoError(t, err)
require.NotEmpty(t, tc.input.LogicalPlan, "logical plan should be populated")

})
}
}
26 changes: 24 additions & 2 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/spanlogger"

"github.com/thanos-io/promql-engine/logicalplan"
)

var (
Expand Down Expand Up @@ -141,6 +143,19 @@ func (c instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response,
return &resp, nil
}

func (c instantQueryCodec) getSerializedBody(promReq *tripperware.PrometheusRequest) ([]byte, error) {
var byteLP []byte
var err error

if promReq.LogicalPlan != nil {
byteLP, err = logicalplan.Marshal(promReq.LogicalPlan.Root())
if err != nil {
return nil, err
}
}
return byteLP, nil
}

func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Request) (*http.Request, error) {
promReq, ok := r.(*tripperware.PrometheusRequest)
if !ok {
Expand Down Expand Up @@ -168,17 +183,24 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ
}
}

h.Add("Content-Type", "application/json")

isSourceRuler := strings.Contains(h.Get("User-Agent"), tripperware.RulerUserAgent)
if !isSourceRuler {
// When the source is the Ruler, skip set header
tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression)
}

byteBody, err := c.getSerializedBody(promReq)
if err != nil {
return nil, err
}

req := &http.Request{
Method: "GET",
Method: "POST",
RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u,
Body: http.NoBody,
Body: io.NopCloser(bytes.NewReader(byteBody)),
Header: h,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@ func Middlewares(
merger tripperware.Merger,
queryAnalyzer querysharding.Analyzer,
lookbackDelta time.Duration,
noStepSubqueryIntervalFn func(time.Duration) time.Duration,
distributedExecEnabled bool,
) ([]tripperware.Middleware, error) {
m := []tripperware.Middleware{
NewLimitsMiddleware(limits, lookbackDelta),
tripperware.ShardByMiddleware(log, limits, merger, queryAnalyzer),
}

if distributedExecEnabled {
m = append(m,
tripperware.DistributedQueryMiddleware(noStepSubqueryIntervalFn))
}

return m, nil
}
Loading
Loading