diff --git a/integration/querier_test.go b/integration/querier_test.go index 7e16b587dbb..27929ba5d86 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -1375,3 +1375,78 @@ func TestQuerierEngineConfigs(t *testing.T) { } } + +func TestQuerierDistributedExecution(t *testing.T) { + // e2e test setup + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // initialize the flags + flags := mergeFlags( + BlocksStorageFlags(), + map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": (5 * time.Second).String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((5 * time.Second * 2) - 1).String(), + "-querier.thanos-engine": "true", + // enable distributed execution (logical plan execution) + "-querier.distributed-exec-enabled": "true", + }, + ) + + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + consul := e2edb.NewConsul() + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // start services + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + queryScheduler := e2ecortex.NewQueryScheduler("query-scheduler", flags, "") + storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(queryScheduler, distributor, ingester, storeGateway)) + flags = mergeFlags(flags, map[string]string{ + "-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","), + }) + + queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", mergeFlags(flags, map[string]string{ + "-frontend.scheduler-address": queryScheduler.NetworkGRPCEndpoint(), + }), "") + require.NoError(t, s.Start(queryFrontend)) + + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-querier.scheduler-address": queryScheduler.NetworkGRPCEndpoint(), + }), "") + require.NoError(t, s.StartAndWaitReady(querier)) + + // wait until the distributor and querier has updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + series1Timestamp := time.Now() + series2Timestamp := series1Timestamp.Add(time.Minute * 1) + series1, expectedVector1 := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"}) + series2, expectedVector2 := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"}) + + res, err := c.Push(series1) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // main tests + // - make sure queries are still executable with distributed execution enabled + var val model.Value + val, err = c.Query("series_1", series1Timestamp) + require.NoError(t, err) + require.Equal(t, expectedVector1, val.(model.Vector)) + + val, err = c.Query("series_2", series2Timestamp) + require.NoError(t, err) + require.Equal(t, expectedVector2, val.(model.Vector)) +} diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index 5eb6733532f..0fa51bf94a4 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -19,13 +19,13 @@ import ( "github.com/prometheus/common/route" "github.com/prometheus/common/version" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/weaveworks/common/instrument" "github.com/weaveworks/common/middleware" "github.com/cortexproject/cortex/pkg/api/queryapi" + "github.com/cortexproject/cortex/pkg/engine" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/querier/codec" "github.com/cortexproject/cortex/pkg/querier/stats" @@ -164,7 +164,7 @@ func NewQuerierHandler( querierCfg querier.Config, queryable storage.SampleAndChunkQueryable, exemplarQueryable storage.ExemplarQueryable, - engine promql.QueryEngine, + engine engine.QueryEngine, metadataQuerier querier.MetadataQuerier, reg prometheus.Registerer, logger log.Logger, diff --git a/pkg/api/queryapi/query_api.go b/pkg/api/queryapi/query_api.go index 5dd125a6c39..3e4f7bb49ca 100644 --- a/pkg/api/queryapi/query_api.go +++ b/pkg/api/queryapi/query_api.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/httputil" v1 "github.com/prometheus/prometheus/web/api/v1" + "github.com/thanos-io/promql-engine/logicalplan" "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/engine" @@ -26,7 +27,7 @@ import ( type QueryAPI struct { queryable storage.SampleAndChunkQueryable - queryEngine promql.QueryEngine + queryEngine engine.QueryEngine now func() time.Time statsRenderer v1.StatsRenderer logger log.Logger @@ -35,7 +36,7 @@ type QueryAPI struct { } func NewQueryAPI( - qe promql.QueryEngine, + qe engine.QueryEngine, q storage.SampleAndChunkQueryable, statsRenderer v1.StatsRenderer, logger log.Logger, @@ -101,10 +102,29 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) { ctx = engine.AddEngineTypeToContext(ctx, r) ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader)) - qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step)) - if err != nil { - return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query") + + var qry promql.Query + startTime := convertMsToTime(start) + endTime := convertMsToTime(end) + stepDuration := convertMsToDuration(step) + + byteLP := []byte(r.PostFormValue("plan")) + if len(byteLP) != 0 { + logicalPlan, err := logicalplan.Unmarshal(byteLP) + if err != nil { + return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil} + } + qry, err = q.queryEngine.MakeRangeQueryFromPlan(ctx, q.queryable, opts, logicalPlan, startTime, endTime, stepDuration, r.FormValue("query")) + if err != nil { + return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("failed to create range query from logical plan: %v", err)}, nil, nil} + } + } else { // if there is logical plan field is empty, fall back + qry, err = q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), startTime, endTime, stepDuration) + if err != nil { + return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query") + } } + // From now on, we must only return with a finalizer in the result (to // be called by the caller) or call qry.Close ourselves (which is // required in the case of a panic). @@ -157,9 +177,25 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) { ctx = engine.AddEngineTypeToContext(ctx, r) ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader)) - qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts)) - if err != nil { - return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query") + + var qry promql.Query + tsTime := convertMsToTime(ts) + + byteLP := []byte(r.PostFormValue("plan")) + if len(byteLP) != 0 { + logicalPlan, err := logicalplan.Unmarshal(byteLP) + if err != nil { + return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil} + } + qry, err = q.queryEngine.MakeInstantQueryFromPlan(ctx, q.queryable, opts, logicalPlan, tsTime, r.FormValue("query")) + if err != nil { + return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("failed to create instant query from logical plan: %v", err)}, nil, nil} + } + } else { // if there is logical plan field is empty, fall back + qry, err = q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), tsTime) + if err != nil { + return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query") + } } // From now on, we must only return with a finalizer in the result (to diff --git a/pkg/api/queryapi/query_api_test.go b/pkg/api/queryapi/query_api_test.go index 028184a12b8..2a0ce0cbc99 100644 --- a/pkg/api/queryapi/query_api_test.go +++ b/pkg/api/queryapi/query_api_test.go @@ -7,21 +7,28 @@ import ( "io" "net/http" "net/http/httptest" + "net/url" + "strings" "testing" "time" "github.com/go-kit/log" "github.com/gorilla/mux" "github.com/grafana/regexp" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/annotations" v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/stretchr/testify/require" + "github.com/thanos-io/promql-engine/logicalplan" + "github.com/thanos-io/promql-engine/query" "github.com/weaveworks/common/user" + engine2 "github.com/cortexproject/cortex/pkg/engine" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/querier/stats" @@ -64,10 +71,14 @@ func (mockQuerier) Close() error { } func Test_CustomAPI(t *testing.T) { - engine := promql.NewEngine(promql.EngineOpts{ - MaxSamples: 100, - Timeout: time.Second * 2, - }) + engine := engine2.New( + promql.EngineOpts{ + MaxSamples: 100, + Timeout: time.Second * 2, + }, + engine2.ThanosEngineConfig{Enabled: false}, + prometheus.NewRegistry()) + mockQueryable := &mockSampleAndChunkQueryable{ queryableFn: func(_, _ int64) (storage.Querier, error) { return mockQuerier{ @@ -175,10 +186,10 @@ func Test_CustomAPI(t *testing.T) { c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*")) router := mux.NewRouter() - router.Path("/api/v1/query").Methods("GET").Handler(c.Wrap(c.InstantQueryHandler)) - router.Path("/api/v1/query_range").Methods("GET").Handler(c.Wrap(c.RangeQueryHandler)) + router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler)) + router.Path("/api/v1/query_range").Methods("POST").Handler(c.Wrap(c.RangeQueryHandler)) - req := httptest.NewRequest(http.MethodGet, test.path, nil) + req := httptest.NewRequest(http.MethodPost, test.path, nil) ctx := context.Background() _, ctx = stats.ContextWithEmptyStats(ctx) req = req.WithContext(user.InjectOrgID(ctx, "user1")) @@ -209,10 +220,14 @@ func (m *mockCodec) Encode(_ *v1.Response) ([]byte, error) { } func Test_InvalidCodec(t *testing.T) { - engine := promql.NewEngine(promql.EngineOpts{ - MaxSamples: 100, - Timeout: time.Second * 2, - }) + engine := engine2.New( + promql.EngineOpts{ + MaxSamples: 100, + Timeout: time.Second * 2, + }, + engine2.ThanosEngineConfig{Enabled: false}, + prometheus.NewRegistry()) + mockQueryable := &mockSampleAndChunkQueryable{ queryableFn: func(_, _ int64) (storage.Querier, error) { return mockQuerier{ @@ -231,9 +246,9 @@ func Test_InvalidCodec(t *testing.T) { queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*")) router := mux.NewRouter() - router.Path("/api/v1/query").Methods("GET").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler)) + router.Path("/api/v1/query").Methods("POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler)) - req := httptest.NewRequest(http.MethodGet, "/api/v1/query?query=test", nil) + req := httptest.NewRequest(http.MethodPost, "/api/v1/query?query=test", nil) ctx := context.Background() _, ctx = stats.ContextWithEmptyStats(ctx) req = req.WithContext(user.InjectOrgID(ctx, "user1")) @@ -244,10 +259,14 @@ func Test_InvalidCodec(t *testing.T) { } func Test_CustomAPI_StatsRenderer(t *testing.T) { - engine := promql.NewEngine(promql.EngineOpts{ - MaxSamples: 100, - Timeout: time.Second * 2, - }) + engine := engine2.New( + promql.EngineOpts{ + MaxSamples: 100, + Timeout: time.Second * 2, + }, + engine2.ThanosEngineConfig{Enabled: false}, + prometheus.NewRegistry()) + mockQueryable := &mockSampleAndChunkQueryable{ queryableFn: func(_, _ int64) (storage.Querier, error) { return mockQuerier{ @@ -269,9 +288,9 @@ func Test_CustomAPI_StatsRenderer(t *testing.T) { queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*")) router := mux.NewRouter() - router.Path("/api/v1/query_range").Methods("GET").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler)) + router.Path("/api/v1/query_range").Methods("POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler)) - req := httptest.NewRequest(http.MethodGet, "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=5", nil) + req := httptest.NewRequest(http.MethodPost, "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=5", nil) ctx := context.Background() _, ctx = stats.ContextWithEmptyStats(ctx) req = req.WithContext(user.InjectOrgID(ctx, "user1")) @@ -285,3 +304,202 @@ func Test_CustomAPI_StatsRenderer(t *testing.T) { require.Equal(t, uint64(4), queryStats.LoadPeakSamples()) require.Equal(t, uint64(4), queryStats.LoadScannedSamples()) } + +func Test_Logicalplan_Requests(t *testing.T) { + engine := engine2.New( + promql.EngineOpts{ + MaxSamples: 100, + Timeout: time.Second * 2, + }, + engine2.ThanosEngineConfig{Enabled: true}, + prometheus.NewRegistry(), + ) + + mockMatrix := model.Matrix{ + { + Metric: model.Metric{"__name__": "test", "foo": "bar"}, + Values: []model.SamplePair{ + {Timestamp: 1536673665000, Value: 0}, + {Timestamp: 1536673670000, Value: 1}, + }, + }, + } + + mockQueryable := &mockSampleAndChunkQueryable{ + queryableFn: func(_, _ int64) (storage.Querier, error) { + return mockQuerier{matrix: mockMatrix}, nil + }, + } + + tests := []struct { + name string + path string + start int64 + end int64 + stepDuration int64 + requestBody func(t *testing.T) []byte + expectedCode int + expectedBody string + }{ + { + name: "[Range Query] with valid logical plan and empty query string", + path: "/api/v1/query_range?end=1536673680&query=&start=1536673665&step=5", + start: 1536673665, + end: 1536673680, + stepDuration: 5, + requestBody: func(t *testing.T) []byte { + return createTestLogicalPlan(t, 1536673665, 1536673680, 5) + }, + expectedCode: http.StatusOK, + expectedBody: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"test","foo":"bar"},"values":[[1536673665,"0"],[1536673670,"1"],[1536673675,"1"],[1536673680,"1"]]}]}}`, + }, + { + name: "[Range Query] with corrupted logical plan", // will throw an error from unmarhsal step + path: "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=5", + start: 1536673665, + end: 1536673680, + stepDuration: 5, + requestBody: func(t *testing.T) []byte { + return append(createTestLogicalPlan(t, 1536673665, 1536673680, 5), []byte("random data")...) + }, + expectedCode: http.StatusInternalServerError, + expectedBody: `{"status":"error","errorType":"server_error","error":"invalid logical plan: invalid character 'r' after top-level value"}`, + }, + { + name: "[Range Query] with empty body and non-empty query string", // fall back to promql query execution + path: "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=5", + start: 1536673665, + end: 1536673680, + stepDuration: 5, + requestBody: func(t *testing.T) []byte { + return []byte{} + }, + expectedCode: http.StatusOK, + expectedBody: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"test","foo":"bar"},"values":[[1536673665,"0"],[1536673670,"1"],[1536673675,"1"],[1536673680,"1"]]}]}}`, + }, + { + name: "[Range Query] with empty body and empty query string", // fall back to promql query execution, but will have error because of empty query string + path: "/api/v1/query_range?end=1536673680&query=&start=1536673665&step=5", + start: 1536673665, + end: 1536673680, + stepDuration: 5, + requestBody: func(t *testing.T) []byte { + return []byte{} + }, + expectedCode: http.StatusBadRequest, + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"query\\\"; unknown position: parse error: no expression found in input\"}", + }, + { + name: "[Instant Query] with valid logical plan and empty query string", + path: "/api/v1/query?query=test&time=1536673670", + start: 1536673670, + end: 1536673670, + stepDuration: 0, + requestBody: func(t *testing.T) []byte { + return createTestLogicalPlan(t, 1536673670, 1536673670, 0) + }, + expectedCode: http.StatusOK, + expectedBody: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"test","foo":"bar"},"value":[1536673670,"1"]}]}}`, + }, + { + name: "[Instant Query] with corrupted logical plan", + path: "/api/v1/query?query=test&time=1536673670", + start: 1536673670, + end: 1536673670, + stepDuration: 0, + requestBody: func(t *testing.T) []byte { + return append(createTestLogicalPlan(t, 1536673670, 1536673670, 0), []byte("random data")...) + }, + expectedCode: http.StatusInternalServerError, + expectedBody: `{"status":"error","errorType":"server_error","error":"invalid logical plan: invalid character 'r' after top-level value"}`, + }, + { + name: "[Instant Query] with empty body and non-empty query string", + path: "/api/v1/query?query=test&time=1536673670", + start: 1536673670, + end: 1536673670, + stepDuration: 0, + requestBody: func(t *testing.T) []byte { + return []byte{} + }, + expectedCode: http.StatusOK, + expectedBody: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"test","foo":"bar"},"value":[1536673670,"1"]}]}}`, + }, + { + name: "[Instant Query] with empty body and empty query string", + path: "/api/v1/query?query=&time=1536673670", + start: 1536673670, + end: 1536673670, + stepDuration: 0, + requestBody: func(t *testing.T) []byte { + return []byte{} + }, + expectedCode: http.StatusBadRequest, + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"query\\\"; unknown position: parse error: no expression found in input\"}", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*")) + router := mux.NewRouter() + router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler)) + router.Path("/api/v1/query_range").Methods("POST").Handler(c.Wrap(c.RangeQueryHandler)) + + req := createTestRequest(tt.path, tt.requestBody(t)) + rec := httptest.NewRecorder() + router.ServeHTTP(rec, req) + + require.Equal(t, tt.expectedCode, rec.Code) + body, err := io.ReadAll(rec.Body) + require.NoError(t, err) + require.Equal(t, tt.expectedBody, string(body)) + }) + } +} + +func createTestRequest(path string, planBytes []byte) *http.Request { + form := url.Values{} + form.Set("plan", string(planBytes)) + req := httptest.NewRequest(http.MethodPost, path, io.NopCloser(strings.NewReader(form.Encode()))) + + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + ctx := context.Background() + _, ctx = stats.ContextWithEmptyStats(ctx) + return req.WithContext(user.InjectOrgID(ctx, "user1")) +} + +func createTestLogicalPlan(t *testing.T, start, end int64, stepDuration int64) []byte { + startTime, endTime := convertMsToTime(start), convertMsToTime(end) + step := convertMsToDuration(stepDuration) + + qOpts := query.Options{ + Start: startTime, + End: startTime, + Step: 0, + StepsBatch: 10, + LookbackDelta: 0, + EnablePerStepStats: false, + } + + if step != 0 { + qOpts.End = endTime + qOpts.Step = step + } + + // using a different metric name here so that we can check with debugger which query (from query string vs http request body) + // is being executed by the queriers + expr, err := parser.NewParser("up", parser.WithFunctions(parser.Functions)).ParseExpr() + require.NoError(t, err) + + planOpts := logicalplan.PlanOptions{ + DisableDuplicateLabelCheck: false, + } + + logicalPlan, err := logicalplan.NewFromAST(expr, &qOpts, planOpts) + require.NoError(t, err) + byteval, err := logicalplan.Marshal(logicalPlan.Root()) + require.NoError(t, err) + + return byteval +} diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 09634c05b08..f50fcf26e17 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -14,7 +14,6 @@ import ( "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/promql" prom_storage "github.com/prometheus/prometheus/storage" "github.com/weaveworks/common/server" "github.com/weaveworks/common/signals" @@ -35,6 +34,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortex/storage" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/distributor" + "github.com/cortexproject/cortex/pkg/engine" "github.com/cortexproject/cortex/pkg/flusher" "github.com/cortexproject/cortex/pkg/frontend" frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1" @@ -322,7 +322,7 @@ type Cortex struct { QuerierQueryable prom_storage.SampleAndChunkQueryable ExemplarQueryable prom_storage.ExemplarQueryable MetadataQuerier querier.MetadataQuerier - QuerierEngine promql.QueryEngine + QuerierEngine engine.QueryEngine QueryFrontendTripperware tripperware.Tripperware ResourceMonitor *resource.Monitor diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 1228af5a004..a47888b8267 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -540,7 +540,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro shardedPrometheusCodec, t.Cfg.Querier.LookbackDelta, t.Cfg.Querier.DefaultEvaluationInterval, - t.Cfg.Frontend.DistributedExecEnabled, + t.Cfg.Querier.DistributedExecEnabled, ) if err != nil { return nil, err @@ -553,7 +553,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro queryAnalyzer, t.Cfg.Querier.LookbackDelta, t.Cfg.Querier.DefaultEvaluationInterval, - t.Cfg.Frontend.DistributedExecEnabled) + t.Cfg.Querier.DistributedExecEnabled) if err != nil { return nil, err } diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index be22e4573ad..1eb4d7f9f81 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" thanosengine "github.com/thanos-io/promql-engine/engine" + "github.com/thanos-io/promql-engine/logicalplan" ) type engineKeyType struct{} @@ -43,6 +44,13 @@ func GetEngineType(ctx context.Context) Type { return None } +type QueryEngine interface { + NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) + NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) + MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, ts time.Time, qs string) (promql.Query, error) + MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, start time.Time, end time.Time, interval time.Duration, qs string) (promql.Query, error) +} + type Engine struct { prometheusEngine *promql.Engine thanosEngine *thanosengine.Engine @@ -127,6 +135,53 @@ prom: return qf.prometheusEngine.NewRangeQuery(ctx, q, opts, qs, start, end, interval) } +func (qf *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, ts time.Time, qs string) (promql.Query, error) { + if engineType := GetEngineType(ctx); engineType == Prometheus { + qf.engineSwitchQueriesTotal.WithLabelValues(string(Prometheus)).Inc() + } else if engineType == Thanos { + qf.engineSwitchQueriesTotal.WithLabelValues(string(Thanos)).Inc() + } + + if qf.thanosEngine != nil { + res, err := qf.thanosEngine.MakeInstantQueryFromPlan(ctx, q, fromPromQLOpts(opts), root, ts) + if err != nil { + if thanosengine.IsUnimplemented(err) { + // fallback to use prometheus engine + qf.fallbackQueriesTotal.Inc() + goto prom + } + return nil, err + } + return res, nil + } + +prom: + return qf.prometheusEngine.NewInstantQuery(ctx, q, opts, qs, ts) +} + +func (qf *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, start time.Time, end time.Time, interval time.Duration, qs string) (promql.Query, error) { + if engineType := GetEngineType(ctx); engineType == Prometheus { + qf.engineSwitchQueriesTotal.WithLabelValues(string(Prometheus)).Inc() + } else if engineType == Thanos { + qf.engineSwitchQueriesTotal.WithLabelValues(string(Thanos)).Inc() + } + if qf.thanosEngine != nil { + res, err := qf.thanosEngine.MakeRangeQueryFromPlan(ctx, q, fromPromQLOpts(opts), root, start, end, interval) + if err != nil { + if thanosengine.IsUnimplemented(err) { + // fallback to use prometheus engine + qf.fallbackQueriesTotal.Inc() + goto prom + } + return nil, err + } + return res, nil + } + +prom: + return qf.prometheusEngine.NewRangeQuery(ctx, q, opts, qs, start, end, interval) +} + func fromPromQLOpts(opts promql.QueryOpts) *thanosengine.QueryOpts { if opts == nil { return &thanosengine.QueryOpts{} diff --git a/pkg/engine/engine_test.go b/pkg/engine/engine_test.go index 7b270e6604a..db00be12673 100644 --- a/pkg/engine/engine_test.go +++ b/pkg/engine/engine_test.go @@ -14,8 +14,11 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/promqltest" - "github.com/stretchr/testify/require" "github.com/thanos-io/promql-engine/execution/parse" + "github.com/thanos-io/promql-engine/logicalplan" + "github.com/thanos-io/promql-engine/query" + + "github.com/stretchr/testify/require" utillog "github.com/cortexproject/cortex/pkg/util/log" ) @@ -123,3 +126,98 @@ func TestEngine_XFunctions(t *testing.T) { }) } } + +func TestEngine_With_Logical_Plan(t *testing.T) { + ctx := context.Background() + reg := prometheus.NewRegistry() + + now := time.Now() + start := time.Now().Add(-time.Minute * 5) + step := time.Minute + queryable := promqltest.LoadedStorage(t, "") + opts := promql.EngineOpts{ + Logger: utillog.GoKitLogToSlog(log.NewNopLogger()), + Reg: reg, + } + queryEngine := New(opts, ThanosEngineConfig{Enabled: true}, reg) + + range_lp := createTestLogicalPlan(t, start, now, step, "up") + instant_lp := createTestLogicalPlan(t, now, now, 0, "up") + + r := &http.Request{Header: http.Header{}} + r.Header.Set(TypeHeader, string(Thanos)) + ctx = AddEngineTypeToContext(ctx, r) + + // Case 1: Executing logical plan with thanos engine + _, _ = queryEngine.MakeInstantQueryFromPlan(ctx, queryable, nil, instant_lp.Root(), now, "up") + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_thanos_engine_fallback_queries_total Total number of fallback queries due to not implementation in thanos engine + # TYPE cortex_thanos_engine_fallback_queries_total counter + cortex_thanos_engine_fallback_queries_total 0 + `), "cortex_thanos_engine_fallback_queries_total")) + + _, _ = queryEngine.MakeRangeQueryFromPlan(ctx, queryable, nil, range_lp.Root(), start, now, step, "up") + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_thanos_engine_fallback_queries_total Total number of fallback queries due to not implementation in thanos engine + # TYPE cortex_thanos_engine_fallback_queries_total counter + cortex_thanos_engine_fallback_queries_total 0 + `), "cortex_thanos_engine_fallback_queries_total")) + + // Case 2: Logical plan that thanos engine cannot execute (so it will fall back to prometheus engine) + err_range_lp := createTestLogicalPlan(t, start, now, step, "up[10]") + _, _ = queryEngine.MakeRangeQueryFromPlan(ctx, queryable, nil, err_range_lp.Root(), start, now, step, "up") + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_thanos_engine_fallback_queries_total Total number of fallback queries due to not implementation in thanos engine + # TYPE cortex_thanos_engine_fallback_queries_total counter + cortex_thanos_engine_fallback_queries_total 1 + `), "cortex_thanos_engine_fallback_queries_total")) + + // Case 3: executing with prometheus engine + r.Header.Set(TypeHeader, string(Prometheus)) + ctx = AddEngineTypeToContext(ctx, r) + + _, _ = queryEngine.MakeInstantQueryFromPlan(ctx, queryable, nil, instant_lp.Root(), now, "up") + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_engine_switch_queries_total Total number of queries where engine_type is set explicitly + # TYPE cortex_engine_switch_queries_total counter + cortex_engine_switch_queries_total{engine_type="prometheus"} 1 + cortex_engine_switch_queries_total{engine_type="thanos"} 3 + `), "cortex_engine_switch_queries_total")) + + _, _ = queryEngine.MakeRangeQueryFromPlan(ctx, queryable, nil, range_lp.Root(), start, now, step, "up") + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_engine_switch_queries_total Total number of queries where engine_type is set explicitly + # TYPE cortex_engine_switch_queries_total counter + cortex_engine_switch_queries_total{engine_type="prometheus"} 2 + cortex_engine_switch_queries_total{engine_type="thanos"} 3 + `), "cortex_engine_switch_queries_total")) +} + +func createTestLogicalPlan(t *testing.T, startTime time.Time, endTime time.Time, step time.Duration, q string) logicalplan.Plan { + + qOpts := query.Options{ + Start: startTime, + End: startTime, + Step: 0, + StepsBatch: 10, + LookbackDelta: 0, + EnablePerStepStats: false, + } + + if step != 0 { + qOpts.End = endTime + qOpts.Step = step + } + + expr, err := parser.NewParser(q, parser.WithFunctions(parser.Functions)).ParseExpr() + require.NoError(t, err) + + planOpts := logicalplan.PlanOptions{ + DisableDuplicateLabelCheck: false, + } + + logicalPlan, err := logicalplan.NewFromAST(expr, &qOpts, planOpts) + require.NoError(t, err) + + return logicalPlan +} diff --git a/pkg/frontend/config.go b/pkg/frontend/config.go index a1109f213ad..03dff13980e 100644 --- a/pkg/frontend/config.go +++ b/pkg/frontend/config.go @@ -20,8 +20,7 @@ type CombinedFrontendConfig struct { FrontendV1 v1.Config `yaml:",inline"` FrontendV2 v2.Config `yaml:",inline"` - DownstreamURL string `yaml:"downstream_url"` - DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` + DownstreamURL string `yaml:"downstream_url"` } func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) { @@ -30,7 +29,6 @@ func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) { cfg.FrontendV2.RegisterFlags(f) f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.") - f.BoolVar(&cfg.DistributedExecEnabled, "frontend.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.") } // InitFrontend initializes frontend (either V1 -- without scheduler, or V2 -- with scheduler) or no frontend at diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 78548030fba..9160f1c4112 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -96,6 +96,7 @@ type Config struct { ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` + DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` } var ( @@ -146,6 +147,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.") f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "[Experimental] Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.") + f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.") f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.") } @@ -202,7 +204,7 @@ func getChunksIteratorFunction(_ Config) chunkIteratorFunc { } // New builds a queryable and promql engine. -func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, promql.QueryEngine) { +func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.QueryEngine) { iteratorFunc := getChunksIteratorFunction(cfg) distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, cfg.QueryIngestersWithin, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts) diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index c8b41b165e1..0af6ab9c618 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -208,7 +208,7 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ } } - h.Add("Content-Type", "application/json") + h.Add("Content-Type", "application/x-www-form-urlencoded") isSourceRuler := strings.Contains(h.Get("User-Agent"), tripperware.RulerUserAgent) if !isSourceRuler { @@ -216,16 +216,19 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression) } - byteBody, err := c.getSerializedBody(promReq) + bodyBytes, err := c.getSerializedBody(promReq) if err != nil { return nil, err } + form := url.Values{} + form.Set("plan", string(bodyBytes)) + formEncoded := form.Encode() req := &http.Request{ Method: "POST", RequestURI: u.String(), // This is what the httpgrpc code looks at. URL: u, - Body: io.NopCloser(bytes.NewReader(byteBody)), + Body: io.NopCloser(strings.NewReader(formEncoded)), Header: h, } diff --git a/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go b/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go index 122f0645623..0b1de391f8e 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go @@ -212,8 +212,7 @@ func TestRoundTripWithAndWithoutDistributedExec(t *testing.T) { require.NoError(t, err) // check request body - body, err := io.ReadAll(req.Body) - require.NoError(t, err) + body := []byte(req.PostFormValue("plan")) if tc.expectEmptyBody { require.Empty(t, body) } else { diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index f0b11db6121..786676846bc 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -196,8 +196,7 @@ func (c prometheusCodec) EncodeRequest(ctx context.Context, r tripperware.Reques h.Add(n, v) } } - - h.Add("Content-Type", "application/json") + h.Add("Content-Type", "application/x-www-form-urlencoded") tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression) @@ -206,11 +205,15 @@ func (c prometheusCodec) EncodeRequest(ctx context.Context, r tripperware.Reques return nil, err } + form := url.Values{} + form.Set("plan", string(bodyBytes)) + formEncoded := form.Encode() + req := &http.Request{ Method: "POST", RequestURI: u.String(), // This is what the httpgrpc code looks at. URL: u, - Body: io.NopCloser(bytes.NewReader(bodyBytes)), + Body: io.NopCloser(strings.NewReader(formEncoded)), Header: h, } diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index f21eae986df..acf66698c16 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -231,8 +231,7 @@ func TestRoundTripWithAndWithoutDistributedExec(t *testing.T) { require.NoError(t, err) // check request body - body, err := io.ReadAll(req.Body) - require.NoError(t, err) + body := []byte(req.PostFormValue("plan")) if tc.expectEmptyBody { require.Empty(t, body) } else {