diff --git a/CHANGELOG.md b/CHANGELOG.md index af02cb16365..1dce98bf817 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,7 @@ * [ENHANCEMENT] Query Frontend: Add a `format_query` label value to the `op` label at `cortex_query_frontend_queries_total` metric. #6925 * [ENHANCEMENT] API: add request ID injection to context to enable tracking requests across downstream services. #6895 * [ENHANCEMENT] gRPC: Add gRPC Channelz monitoring. #6950 +* [ENHANCEMENT] Add source metadata to requests(api vs ruler) and used in resource based throttling to only reject adhoc queries. #6947 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/pkg/api/middlewares.go b/pkg/api/middlewares.go index dcb9c298169..cda2076a681 100644 --- a/pkg/api/middlewares.go +++ b/pkg/api/middlewares.go @@ -37,6 +37,7 @@ func (h HTTPHeaderMiddleware) injectRequestContext(r *http.Request) *http.Reques reqId = uuid.NewString() } requestContextMap[requestmeta.RequestIdKey] = reqId + requestContextMap[requestmeta.RequestSourceKey] = requestmeta.SourceAPI ctx := requestmeta.ContextWithRequestMetadataMap(r.Context(), requestContextMap) return r.WithContext(ctx) diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index bba985ea1c0..b6c523f205e 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -30,6 +30,7 @@ import ( util_api "github.com/cortexproject/cortex/pkg/util/api" "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/requestmeta" ) const ( @@ -245,11 +246,11 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } userID := tenant.JoinTenantIDs(tenantIDs) + source := tripperware.GetSource(r) if f.tenantFederationCfg.Enabled { maxTenant := f.tenantFederationCfg.MaxTenant if maxTenant > 0 && len(tenantIDs) > maxTenant { - source := tripperware.GetSource(r.Header.Get("User-Agent")) if f.cfg.QueryStatsEnabled { f.rejectedQueries.WithLabelValues(reasonTooManyTenants, source, userID).Inc() } @@ -289,7 +290,6 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } http.Error(w, err.Error(), statusCode) if f.cfg.QueryStatsEnabled && util.IsRequestBodyTooLarge(err) { - source := tripperware.GetSource(r.Header.Get("User-Agent")) f.rejectedQueries.WithLabelValues(reasonRequestBodySizeExceeded, source, userID).Inc() } return @@ -297,7 +297,6 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Body = io.NopCloser(&buf) } - source := tripperware.GetSource(r.Header.Get("User-Agent")) // Log request if f.cfg.QueryStatsEnabled { queryString = f.parseRequestQueryString(r, buf) @@ -401,7 +400,7 @@ func (f *Handler) logQueryRequest(r *http.Request, queryString url.Values, sourc logMessage = append(logMessage, "accept_encoding", acceptEncoding) } - shouldLog := source == tripperware.SourceAPI || (f.cfg.EnabledRulerQueryStatsLog && source == tripperware.SourceRuler) + shouldLog := source == requestmeta.SourceAPI || (f.cfg.EnabledRulerQueryStatsLog && source == requestmeta.SourceRuler) if shouldLog { logMessage = append(logMessage, formatQueryString(queryString)...) level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...) @@ -531,7 +530,7 @@ func (f *Handler) reportQueryStats(r *http.Request, source, userID string, query } } - shouldLog := source == tripperware.SourceAPI || (f.cfg.EnabledRulerQueryStatsLog && source == tripperware.SourceRuler) + shouldLog := source == requestmeta.SourceAPI || (f.cfg.EnabledRulerQueryStatsLog && source == requestmeta.SourceRuler) if shouldLog { logMessage = append(logMessage, formatQueryString(queryString)...) if error != nil { diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index aa863230295..8f62826a017 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -26,11 +26,11 @@ import ( querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tenantfederation" - "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/tenant" util_api "github.com/cortexproject/cortex/pkg/util/api" "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/requestmeta" ) type roundTripperFunc func(*http.Request) (*http.Response, error) @@ -216,7 +216,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResponseBodySizeExceeded, tripperware.SourceAPI, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResponseBodySizeExceeded, requestmeta.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusRequestEntityTooLarge, @@ -232,7 +232,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManyRequests, tripperware.SourceAPI, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManyRequests, requestmeta.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusTooManyRequests, @@ -248,7 +248,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManySamples, tripperware.SourceAPI, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTooManySamples, requestmeta.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -264,7 +264,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTimeRangeExceeded, tripperware.SourceAPI, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonTimeRangeExceeded, requestmeta.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -280,7 +280,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesFetched, tripperware.SourceAPI, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesFetched, requestmeta.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -296,7 +296,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksFetched, tripperware.SourceAPI, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksFetched, requestmeta.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -312,7 +312,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunkBytesFetched, tripperware.SourceAPI, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunkBytesFetched, requestmeta.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -328,7 +328,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonDataBytesFetched, tripperware.SourceAPI, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonDataBytesFetched, requestmeta.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -344,7 +344,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesLimitStoreGateway, tripperware.SourceAPI, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonSeriesLimitStoreGateway, requestmeta.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -360,7 +360,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksLimitStoreGateway, tripperware.SourceAPI, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonChunksLimitStoreGateway, requestmeta.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -376,7 +376,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonBytesLimitStoreGateway, tripperware.SourceAPI, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonBytesLimitStoreGateway, requestmeta.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -393,7 +393,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResourceExhausted, tripperware.SourceAPI, userID)) + v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResourceExhausted, requestmeta.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusUnprocessableEntity, @@ -410,7 +410,7 @@ func TestHandler_ServeHTTP(t *testing.T) { }, nil }), additionalMetricsCheckFunc: func(h *Handler) { - v := promtest.ToFloat64(h.slowQueries.WithLabelValues(tripperware.SourceAPI, userID)) + v := promtest.ToFloat64(h.slowQueries.WithLabelValues(requestmeta.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, expectedStatusCode: http.StatusOK, @@ -472,12 +472,12 @@ func TestReportQueryStatsFormat(t *testing.T) { tests := map[string]testCase{ "should not include query and header details if empty": { expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0`, - source: tripperware.SourceAPI, + source: requestmeta.SourceAPI, }, "should include query length and string at the end": { queryString: url.Values(map[string][]string{"query": {"up"}}), expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 query_length=2 param_query=up`, - source: tripperware.SourceAPI, + source: requestmeta.SourceAPI, }, "should include query stats": { queryStats: &querier_stats.QueryStats{ @@ -494,17 +494,17 @@ func TestReportQueryStatsFormat(t *testing.T) { }, }, expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 samples_scanned=0 query_storage_wall_time_seconds=6000`, - source: tripperware.SourceAPI, + source: requestmeta.SourceAPI, }, "should include user agent": { header: http.Header{"User-Agent": []string{"Grafana"}}, expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 user_agent=Grafana`, - source: tripperware.SourceAPI, + source: requestmeta.SourceAPI, }, "should include response error": { responseErr: errors.New("foo_err"), expectedLog: `level=error msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 error=foo_err`, - source: tripperware.SourceAPI, + source: requestmeta.SourceAPI, }, "should include query priority": { queryString: url.Values(map[string][]string{"query": {"up"}}), @@ -513,7 +513,7 @@ func TestReportQueryStatsFormat(t *testing.T) { PriorityAssigned: true, }, expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 query_length=2 priority=99 param_query=up`, - source: tripperware.SourceAPI, + source: requestmeta.SourceAPI, }, "should include data fetch min and max time": { queryString: url.Values(map[string][]string{"query": {"up"}}), @@ -522,7 +522,7 @@ func TestReportQueryStatsFormat(t *testing.T) { DataSelectMinTime: 1704067200000, }, expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 data_select_max_time=1704153600 data_select_min_time=1704067200 query_length=2 param_query=up`, - source: tripperware.SourceAPI, + source: requestmeta.SourceAPI, }, "should include query stats with store gateway stats": { queryStats: &querier_stats.QueryStats{ @@ -541,16 +541,16 @@ func TestReportQueryStatsFormat(t *testing.T) { }, }, expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 samples_scanned=0 store_gateway_touched_postings_count=20 store_gateway_touched_posting_bytes=200 query_storage_wall_time_seconds=6000`, - source: tripperware.SourceAPI, + source: requestmeta.SourceAPI, }, "should not report a log": { expectedLog: ``, - source: tripperware.SourceRuler, + source: requestmeta.SourceRuler, enabledRulerQueryStatsLog: false, }, "should report a log": { expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0`, - source: tripperware.SourceRuler, + source: requestmeta.SourceRuler, enabledRulerQueryStatsLog: true, }, } @@ -559,6 +559,7 @@ func TestReportQueryStatsFormat(t *testing.T) { t.Run(testName, func(t *testing.T) { handler := NewHandler(HandlerConfig{QueryStatsEnabled: true, EnabledRulerQueryStatsLog: testData.enabledRulerQueryStatsLog}, tenantfederation.Config{}, http.DefaultTransport, logger, nil) req.Header = testData.header + req = req.WithContext(requestmeta.ContextWithRequestSource(context.Background(), testData.source)) handler.reportQueryStats(req, testData.source, userID, testData.queryString, responseTime, testData.queryStats, testData.responseErr, statusCode, resp) data, err := io.ReadAll(outputBuf) require.NoError(t, err) @@ -706,7 +707,7 @@ func Test_TenantFederation_MaxTenant(t *testing.T) { require.Contains(t, string(body), test.expectedErrMsg) if strings.Contains(test.expectedErrMsg, "too many tenants") { - v := promtest.ToFloat64(handler.rejectedQueries.WithLabelValues(reasonTooManyTenants, tripperware.SourceAPI, test.orgId)) + v := promtest.ToFloat64(handler.rejectedQueries.WithLabelValues(reasonTooManyTenants, requestmeta.SourceAPI, test.orgId)) assert.Equal(t, float64(1), v) } } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index c2dab4a54ec..cda8e23f57b 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -63,6 +63,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/limiter" logutil "github.com/cortexproject/cortex/pkg/util/log" util_math "github.com/cortexproject/cortex/pkg/util/math" + "github.com/cortexproject/cortex/pkg/util/requestmeta" "github.com/cortexproject/cortex/pkg/util/resource" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/spanlogger" @@ -1696,7 +1697,7 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery } // We will report *this* request in the error too. - c, err := i.trackInflightQueryRequest() + c, err := i.trackInflightQueryRequest(ctx) if err != nil { return nil, err } @@ -1804,7 +1805,7 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu q.Close() } - c, err := i.trackInflightQueryRequest() + c, err := i.trackInflightQueryRequest(ctx) if err != nil { return nil, cleanup, err } @@ -1901,7 +1902,7 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR q.Close() } - c, err := i.trackInflightQueryRequest() + c, err := i.trackInflightQueryRequest(ctx) if err != nil { return nil, cleanup, err } @@ -2252,7 +2253,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return nil } -func (i *Ingester) trackInflightQueryRequest() (func(), error) { +func (i *Ingester) trackInflightQueryRequest(ctx context.Context) (func(), error) { gl := i.getInstanceLimits() if gl != nil && gl.MaxInflightQueryRequests > 0 { if i.inflightQueryRequests.Load() >= gl.MaxInflightQueryRequests { @@ -2262,7 +2263,7 @@ func (i *Ingester) trackInflightQueryRequest() (func(), error) { i.maxInflightQueryRequests.Track(i.inflightQueryRequests.Inc()) - if i.resourceBasedLimiter != nil { + if i.resourceBasedLimiter != nil && !requestmeta.RequestFromRuler(ctx) { if err := i.resourceBasedLimiter.AcceptNewRequest(); err != nil { level.Warn(i.logger).Log("msg", "failed to accept request", "err", err) return nil, httpgrpc.Errorf(http.StatusServiceUnavailable, "failed to query: %s", limiter.ErrResourceLimitReachedStr) @@ -2282,7 +2283,7 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th } defer q.Close() - c, err := i.trackInflightQueryRequest() + c, err := i.trackInflightQueryRequest(ctx) if err != nil { return 0, 0, 0, 0, err } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index c59879a1d84..e5934ea9722 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -61,6 +61,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/limiter" + "github.com/cortexproject/cortex/pkg/util/requestmeta" "github.com/cortexproject/cortex/pkg/util/resource" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/test" @@ -3227,11 +3228,18 @@ func Test_Ingester_Query_ResourceThresholdBreached(t *testing.T) { } rreq := &client.QueryRequest{} + ctx = requestmeta.ContextWithRequestSource(ctx, requestmeta.SourceAPI) s := &mockQueryStreamServer{ctx: ctx} err = i.QueryStream(rreq, s) require.Error(t, err) exhaustedErr := limiter.ResourceLimitReachedError{} require.ErrorContains(t, err, exhaustedErr.Error()) + + // we shouldn't reject queries from ruler + ctx = requestmeta.ContextWithRequestSource(ctx, requestmeta.SourceRuler) + s = &mockQueryStreamServer{ctx: ctx} + err = i.QueryStream(rreq, s) + require.Nil(t, err) } func TestIngester_LabelValues_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) { diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index 0af6ab9c618..4c3e6102635 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -22,6 +22,7 @@ import ( "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/limiter" + "github.com/cortexproject/cortex/pkg/util/requestmeta" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/thanos-io/promql-engine/logicalplan" @@ -82,8 +83,7 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for result.Stats = r.FormValue("stats") result.Path = r.URL.Path - isSourceRuler := strings.Contains(r.Header.Get("User-Agent"), tripperware.RulerUserAgent) - if isSourceRuler { + if tripperware.GetSource(r) == requestmeta.SourceRuler { // When the source is the Ruler, then forward whole headers result.Headers = r.Header } else { @@ -210,7 +210,7 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ h.Add("Content-Type", "application/x-www-form-urlencoded") - isSourceRuler := strings.Contains(h.Get("User-Agent"), tripperware.RulerUserAgent) + isSourceRuler := strings.Contains(h.Get("User-Agent"), tripperware.RulerUserAgent) || requestmeta.RequestFromRuler(ctx) if !isSourceRuler { // When the source is the Ruler, skip set header tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression) diff --git a/pkg/querier/tripperware/query.go b/pkg/querier/tripperware/query.go index 180ce1c27d0..8742c71698f 100644 --- a/pkg/querier/tripperware/query.go +++ b/pkg/querier/tripperware/query.go @@ -57,9 +57,6 @@ const ( QueryResponseCortexMIMEType = "application/" + QueryResponseCortexMIMESubType QueryResponseCortexMIMESubType = "x-cortex-query+proto" RulerUserAgent = "CortexRuler" - - SourceRuler = "ruler" - SourceAPI = "api" ) // Codec is used to encode/decode query range requests and responses so they can be passed down to middlewares. diff --git a/pkg/querier/tripperware/roundtrip.go b/pkg/querier/tripperware/roundtrip.go index b7759b8b45b..a505166c174 100644 --- a/pkg/querier/tripperware/roundtrip.go +++ b/pkg/querier/tripperware/roundtrip.go @@ -183,7 +183,7 @@ func NewQueryTripperware( now := time.Now() userStr := tenant.JoinTenantIDs(tenantIDs) activeUsers.UpdateUserTimestamp(userStr, now) - source := GetSource(r.Header.Get("User-Agent")) + source := GetSource(r) queriesPerTenant.WithLabelValues(op, source, userStr).Inc() if maxSubQuerySteps > 0 && (isQuery || isQueryRange) { @@ -279,11 +279,13 @@ func (q roundTripper) Do(ctx context.Context, r Request) (Response, error) { return q.codec.DecodeResponse(ctx, response, r) } -func GetSource(userAgent string) string { - if strings.Contains(userAgent, RulerUserAgent) { +func GetSource(r *http.Request) string { + // check it for backwards compatibility + userAgent := r.Header.Get("User-Agent") + if strings.Contains(userAgent, RulerUserAgent) || requestmeta.RequestFromRuler(r.Context()) { // caller is ruler - return SourceRuler + return requestmeta.SourceRuler } - return SourceAPI + return requestmeta.SourceAPI } diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 68c45a5bdcf..0b5c8826990 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -188,6 +188,7 @@ func EngineQueryFunc(engine promql.QueryEngine, frontendClient *frontendClient, // Add request ID to the context so that it can be used in logs and metrics for split queries. ctx = requestmeta.ContextWithRequestId(ctx, uuid.NewString()) + ctx = requestmeta.ContextWithRequestSource(ctx, requestmeta.SourceRuler) if frontendClient != nil { v, err := frontendClient.InstantQuery(ctx, qs, t) diff --git a/pkg/ruler/frontend_client_pool.go b/pkg/ruler/frontend_client_pool.go index 7b131621aa6..38847861387 100644 --- a/pkg/ruler/frontend_client_pool.go +++ b/pkg/ruler/frontend_client_pool.go @@ -4,18 +4,14 @@ import ( "time" "github.com/go-kit/log" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/httpgrpc" - "github.com/weaveworks/common/middleware" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" "github.com/cortexproject/cortex/pkg/ring/client" "github.com/cortexproject/cortex/pkg/util/grpcclient" - cortexmiddleware "github.com/cortexproject/cortex/pkg/util/middleware" ) type frontendPool struct { @@ -55,11 +51,7 @@ func newFrontendPool(cfg Config, log log.Logger, reg prometheus.Registerer) *cli } func (f *frontendPool) createFrontendClient(addr string) (client.PoolClient, error) { - opts, err := f.grpcConfig.DialOption([]grpc.UnaryClientInterceptor{ - otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), - middleware.ClientUserHeaderInterceptor, - cortexmiddleware.PrometheusGRPCUnaryInstrumentation(f.frontendClientRequestDuration), - }, nil) + opts, err := f.grpcConfig.DialOption(grpcclient.Instrument(f.frontendClientRequestDuration)) if err != nil { return nil, err } diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 9e61d63abf2..84862d7f293 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -28,6 +28,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" util_limiter "github.com/cortexproject/cortex/pkg/util/limiter" + "github.com/cortexproject/cortex/pkg/util/requestmeta" "github.com/cortexproject/cortex/pkg/util/resource" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" @@ -408,7 +409,7 @@ func (g *StoreGateway) syncStores(ctx context.Context, reason string) { } func (g *StoreGateway) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error { - if err := g.checkResourceUtilization(); err != nil { + if err := g.checkResourceUtilization(srv.Context()); err != nil { return err } return g.stores.Series(req, srv) @@ -416,7 +417,7 @@ func (g *StoreGateway) Series(req *storepb.SeriesRequest, srv storegatewaypb.Sto // LabelNames implements the Storegateway proto service. func (g *StoreGateway) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { - if err := g.checkResourceUtilization(); err != nil { + if err := g.checkResourceUtilization(ctx); err != nil { return nil, err } return g.stores.LabelNames(ctx, req) @@ -424,14 +425,14 @@ func (g *StoreGateway) LabelNames(ctx context.Context, req *storepb.LabelNamesRe // LabelValues implements the Storegateway proto service. func (g *StoreGateway) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { - if err := g.checkResourceUtilization(); err != nil { + if err := g.checkResourceUtilization(ctx); err != nil { return nil, err } return g.stores.LabelValues(ctx, req) } -func (g *StoreGateway) checkResourceUtilization() error { - if g.resourceBasedLimiter == nil { +func (g *StoreGateway) checkResourceUtilization(ctx context.Context) error { + if g.resourceBasedLimiter == nil || requestmeta.RequestFromRuler(ctx) { return nil } diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index b9070c236e7..6c81da08888 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -43,6 +43,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" util_limiter "github.com/cortexproject/cortex/pkg/util/limiter" + "github.com/cortexproject/cortex/pkg/util/requestmeta" "github.com/cortexproject/cortex/pkg/util/resource" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/test" @@ -1236,11 +1237,17 @@ func TestStoreGateway_SeriesThrottledByResourceMonitor(t *testing.T) { g.resourceBasedLimiter, err = util_limiter.NewResourceBasedLimiter(&mockResourceMonitor{cpu: 0.4, heap: 0.6}, limits, nil, "store-gateway") require.NoError(t, err) + ctx = requestmeta.ContextWithRequestSource(ctx, requestmeta.SourceAPI) srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID)) err = g.Series(req, srv) require.Error(t, err) exhaustedErr := util_limiter.ResourceLimitReachedError{} require.ErrorContains(t, err, exhaustedErr.Error()) + + ctx = requestmeta.ContextWithRequestSource(ctx, requestmeta.SourceRuler) + srv = newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID)) + err = g.Series(req, srv) + require.Nil(t, err) } func mockGatewayConfig() Config { diff --git a/pkg/util/requestmeta/context.go b/pkg/util/requestmeta/context.go index 2efae506d96..43ee33c7bc4 100644 --- a/pkg/util/requestmeta/context.go +++ b/pkg/util/requestmeta/context.go @@ -40,6 +40,7 @@ func ContextWithRequestMetadataMapFromHeaders(ctx context.Context, headers map[s headerKeys = append(headerKeys, LoggingHeadersKey) } headerKeys = append(headerKeys, RequestIdKey) + headerKeys = append(headerKeys, RequestSourceKey) for _, header := range headerKeys { if v, ok := headers[textproto.CanonicalMIMEHeaderKey(header)]; ok { headerMap[header] = v diff --git a/pkg/util/requestmeta/logging_headers.go b/pkg/util/requestmeta/logging_headers.go index cdf6f0d2e2c..02b2a4270bf 100644 --- a/pkg/util/requestmeta/logging_headers.go +++ b/pkg/util/requestmeta/logging_headers.go @@ -25,9 +25,12 @@ func LoggingHeadersFromContext(ctx context.Context) map[string]string { } loggingHeadersString := metadataMap[LoggingHeadersKey] if loggingHeadersString == "" { - // Backward compatibility: if no specific headers are listed, return all metadata + // Backward compatibility: if no specific headers are listed, return all metadata excluding requestId and source result := make(map[string]string, len(metadataMap)) for k, v := range metadataMap { + if k == RequestIdKey || k == RequestSourceKey { + continue + } result[k] = v } return result @@ -49,8 +52,9 @@ func LoggingHeadersAndRequestIdFromContext(ctx context.Context) map[string]strin } loggingHeaders := LoggingHeadersFromContext(ctx) - reqId := RequestIdFromContext(ctx) - loggingHeaders[RequestIdKey] = reqId + if reqId := RequestIdFromContext(ctx); reqId != "" { + loggingHeaders[RequestIdKey] = reqId + } return loggingHeaders } diff --git a/pkg/util/requestmeta/source.go b/pkg/util/requestmeta/source.go new file mode 100644 index 00000000000..6f0f23db069 --- /dev/null +++ b/pkg/util/requestmeta/source.go @@ -0,0 +1,27 @@ +package requestmeta + +import "context" + +const RequestSourceKey = "x-cortex-request-source" + +const ( + SourceAPI = "api" + SourceRuler = "ruler" +) + +func ContextWithRequestSource(ctx context.Context, source string) context.Context { + metadataMap := MapFromContext(ctx) + if metadataMap == nil { + metadataMap = make(map[string]string) + } + metadataMap[RequestSourceKey] = source + return ContextWithRequestMetadataMap(ctx, metadataMap) +} + +func RequestFromRuler(ctx context.Context) bool { + metadataMap := MapFromContext(ctx) + if metadataMap == nil { + return false + } + return metadataMap[RequestSourceKey] == SourceRuler +}