Skip to content

Commit 5e39a8b

Browse files
committed
resource based throttling: reject only addhock queries
Signed-off-by: Erlan Zholdubai uulu <[email protected]>
1 parent 4f4de93 commit 5e39a8b

File tree

8 files changed

+58
-11
lines changed

8 files changed

+58
-11
lines changed

pkg/api/middlewares.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func (h HTTPHeaderMiddleware) injectRequestContext(r *http.Request) *http.Reques
3737
reqId = uuid.NewString()
3838
}
3939
requestContextMap[requestmeta.RequestIdKey] = reqId
40+
requestContextMap[requestmeta.RequestSourceKey] = requestmeta.SourceApi
4041

4142
ctx := requestmeta.ContextWithRequestMetadataMap(r.Context(), requestContextMap)
4243
return r.WithContext(ctx)

pkg/ingester/ingester.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ import (
6363
"github.com/cortexproject/cortex/pkg/util/limiter"
6464
logutil "github.com/cortexproject/cortex/pkg/util/log"
6565
util_math "github.com/cortexproject/cortex/pkg/util/math"
66+
"github.com/cortexproject/cortex/pkg/util/requestmeta"
6667
"github.com/cortexproject/cortex/pkg/util/resource"
6768
"github.com/cortexproject/cortex/pkg/util/services"
6869
"github.com/cortexproject/cortex/pkg/util/spanlogger"
@@ -1696,7 +1697,7 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
16961697
}
16971698

16981699
// We will report *this* request in the error too.
1699-
c, err := i.trackInflightQueryRequest()
1700+
c, err := i.trackInflightQueryRequest(ctx)
17001701
if err != nil {
17011702
return nil, err
17021703
}
@@ -1804,7 +1805,7 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
18041805
q.Close()
18051806
}
18061807

1807-
c, err := i.trackInflightQueryRequest()
1808+
c, err := i.trackInflightQueryRequest(ctx)
18081809
if err != nil {
18091810
return nil, cleanup, err
18101811
}
@@ -1901,7 +1902,7 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
19011902
q.Close()
19021903
}
19031904

1904-
c, err := i.trackInflightQueryRequest()
1905+
c, err := i.trackInflightQueryRequest(ctx)
19051906
if err != nil {
19061907
return nil, cleanup, err
19071908
}
@@ -2252,7 +2253,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
22522253
return nil
22532254
}
22542255

2255-
func (i *Ingester) trackInflightQueryRequest() (func(), error) {
2256+
func (i *Ingester) trackInflightQueryRequest(ctx context.Context) (func(), error) {
22562257
gl := i.getInstanceLimits()
22572258
if gl != nil && gl.MaxInflightQueryRequests > 0 {
22582259
if i.inflightQueryRequests.Load() >= gl.MaxInflightQueryRequests {
@@ -2262,7 +2263,7 @@ func (i *Ingester) trackInflightQueryRequest() (func(), error) {
22622263

22632264
i.maxInflightQueryRequests.Track(i.inflightQueryRequests.Inc())
22642265

2265-
if i.resourceBasedLimiter != nil {
2266+
if i.resourceBasedLimiter != nil && requestmeta.RequestSourceFromContext(ctx) == requestmeta.SourceApi {
22662267
if err := i.resourceBasedLimiter.AcceptNewRequest(); err != nil {
22672268
level.Warn(i.logger).Log("msg", "failed to accept request", "err", err)
22682269
return nil, httpgrpc.Errorf(http.StatusServiceUnavailable, "failed to query: %s", limiter.ErrResourceLimitReachedStr)
@@ -2282,7 +2283,7 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
22822283
}
22832284
defer q.Close()
22842285

2285-
c, err := i.trackInflightQueryRequest()
2286+
c, err := i.trackInflightQueryRequest(ctx)
22862287
if err != nil {
22872288
return 0, 0, 0, 0, err
22882289
}

pkg/ingester/ingester_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ import (
6161
"github.com/cortexproject/cortex/pkg/util"
6262
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
6363
"github.com/cortexproject/cortex/pkg/util/limiter"
64+
"github.com/cortexproject/cortex/pkg/util/requestmeta"
6465
"github.com/cortexproject/cortex/pkg/util/resource"
6566
"github.com/cortexproject/cortex/pkg/util/services"
6667
"github.com/cortexproject/cortex/pkg/util/test"
@@ -3227,11 +3228,18 @@ func Test_Ingester_Query_ResourceThresholdBreached(t *testing.T) {
32273228
}
32283229

32293230
rreq := &client.QueryRequest{}
3231+
ctx = requestmeta.ContextWithRequestSource(ctx, requestmeta.SourceApi)
32303232
s := &mockQueryStreamServer{ctx: ctx}
32313233
err = i.QueryStream(rreq, s)
32323234
require.Error(t, err)
32333235
exhaustedErr := limiter.ResourceLimitReachedError{}
32343236
require.ErrorContains(t, err, exhaustedErr.Error())
3237+
3238+
// we shouldn't reject queries from ruler
3239+
ctx = requestmeta.ContextWithRequestSource(ctx, requestmeta.SourceRuler)
3240+
s = &mockQueryStreamServer{ctx: ctx}
3241+
err = i.QueryStream(rreq, s)
3242+
require.Nil(t, err)
32353243
}
32363244

32373245
func TestIngester_LabelValues_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) {

pkg/ruler/compat.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ func EngineQueryFunc(engine promql.QueryEngine, frontendClient *frontendClient,
188188

189189
// Add request ID to the context so that it can be used in logs and metrics for split queries.
190190
ctx = requestmeta.ContextWithRequestId(ctx, uuid.NewString())
191+
ctx = requestmeta.ContextWithRequestSource(ctx, requestmeta.SourceRuler)
191192

192193
if frontendClient != nil {
193194
v, err := frontendClient.InstantQuery(ctx, qs, t)

pkg/storegateway/gateway.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/cortexproject/cortex/pkg/util"
2929
"github.com/cortexproject/cortex/pkg/util/flagext"
3030
util_limiter "github.com/cortexproject/cortex/pkg/util/limiter"
31+
"github.com/cortexproject/cortex/pkg/util/requestmeta"
3132
"github.com/cortexproject/cortex/pkg/util/resource"
3233
"github.com/cortexproject/cortex/pkg/util/services"
3334
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -408,30 +409,30 @@ func (g *StoreGateway) syncStores(ctx context.Context, reason string) {
408409
}
409410

410411
func (g *StoreGateway) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error {
411-
if err := g.checkResourceUtilization(); err != nil {
412+
if err := g.checkResourceUtilization(srv.Context()); err != nil {
412413
return err
413414
}
414415
return g.stores.Series(req, srv)
415416
}
416417

417418
// LabelNames implements the Storegateway proto service.
418419
func (g *StoreGateway) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
419-
if err := g.checkResourceUtilization(); err != nil {
420+
if err := g.checkResourceUtilization(ctx); err != nil {
420421
return nil, err
421422
}
422423
return g.stores.LabelNames(ctx, req)
423424
}
424425

425426
// LabelValues implements the Storegateway proto service.
426427
func (g *StoreGateway) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
427-
if err := g.checkResourceUtilization(); err != nil {
428+
if err := g.checkResourceUtilization(ctx); err != nil {
428429
return nil, err
429430
}
430431
return g.stores.LabelValues(ctx, req)
431432
}
432433

433-
func (g *StoreGateway) checkResourceUtilization() error {
434-
if g.resourceBasedLimiter == nil {
434+
func (g *StoreGateway) checkResourceUtilization(ctx context.Context) error {
435+
if g.resourceBasedLimiter == nil || requestmeta.RequestSourceFromContext(ctx) == requestmeta.SourceRuler {
435436
return nil
436437
}
437438

pkg/storegateway/gateway_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"github.com/cortexproject/cortex/pkg/util"
4444
"github.com/cortexproject/cortex/pkg/util/flagext"
4545
util_limiter "github.com/cortexproject/cortex/pkg/util/limiter"
46+
"github.com/cortexproject/cortex/pkg/util/requestmeta"
4647
"github.com/cortexproject/cortex/pkg/util/resource"
4748
"github.com/cortexproject/cortex/pkg/util/services"
4849
"github.com/cortexproject/cortex/pkg/util/test"
@@ -1236,11 +1237,17 @@ func TestStoreGateway_SeriesThrottledByResourceMonitor(t *testing.T) {
12361237
g.resourceBasedLimiter, err = util_limiter.NewResourceBasedLimiter(&mockResourceMonitor{cpu: 0.4, heap: 0.6}, limits, nil, "store-gateway")
12371238
require.NoError(t, err)
12381239

1240+
ctx = requestmeta.ContextWithRequestSource(ctx, requestmeta.SourceApi)
12391241
srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID))
12401242
err = g.Series(req, srv)
12411243
require.Error(t, err)
12421244
exhaustedErr := util_limiter.ResourceLimitReachedError{}
12431245
require.ErrorContains(t, err, exhaustedErr.Error())
1246+
1247+
ctx = requestmeta.ContextWithRequestSource(ctx, requestmeta.SourceRuler)
1248+
srv = newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID))
1249+
err = g.Series(req, srv)
1250+
require.Nil(t, err)
12441251
}
12451252

12461253
func mockGatewayConfig() Config {

pkg/util/requestmeta/context.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ func ContextWithRequestMetadataMapFromHeaders(ctx context.Context, headers map[s
4040
headerKeys = append(headerKeys, LoggingHeadersKey)
4141
}
4242
headerKeys = append(headerKeys, RequestIdKey)
43+
headerKeys = append(headerKeys, RequestSourceKey)
4344
for _, header := range headerKeys {
4445
if v, ok := headers[textproto.CanonicalMIMEHeaderKey(header)]; ok {
4546
headerMap[header] = v

pkg/util/requestmeta/source.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package requestmeta
2+
3+
import "context"
4+
5+
const RequestSourceKey = "x-cortex-request-source"
6+
7+
const (
8+
SourceApi = "api"
9+
SourceRuler = "ruler"
10+
)
11+
12+
func RequestSourceFromContext(ctx context.Context) string {
13+
metadataMap := MapFromContext(ctx)
14+
if metadataMap == nil {
15+
return ""
16+
}
17+
return metadataMap[RequestSourceKey]
18+
}
19+
20+
func ContextWithRequestSource(ctx context.Context, source string) context.Context {
21+
metadataMap := MapFromContext(ctx)
22+
if metadataMap == nil {
23+
metadataMap = make(map[string]string)
24+
}
25+
metadataMap[RequestSourceKey] = source
26+
return ContextWithRequestMetadataMap(ctx, metadataMap)
27+
}

0 commit comments

Comments
 (0)