Skip to content

Commit f97ec76

Browse files
authored
Enforce query response size limit after decompression in query-frontend (#6607)
1 parent 393a672 commit f97ec76

19 files changed

+296
-35
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
77
* [FEATURE] Update prometheus alertmanager version to v0.28.0 and add new integration msteamsv2, jira, and rocketchat. #6590
88
* [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663
9+
* [ENHANCEMENT] Query Frontend: Add new limit `-frontend.max-query-response-size` for total query response size after decompression in query frontend. #6607
910
* [ENHANCEMENT] Alertmanager: Add nflog and silences maintenance metrics. #6659
1011
* [ENHANCEMENT] Querier: limit label APIs to query only ingesters if `start` param is not been specified. #6618
1112
* [ENHANCEMENT] Alertmanager: Add new limits `-alertmanager.max-silences-count` and `-alertmanager.max-silences-size-bytes` for limiting silences per tenant. #6605

docs/configuration/config-file-reference.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3568,6 +3568,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
35683568
# CLI flag: -querier.max-query-parallelism
35693569
[max_query_parallelism: <int> | default = 14]
35703570

3571+
# The maximum total uncompressed query response size. If the query was sharded
3572+
# the limit is applied to the total response size of all shards. This limit is
3573+
# enforced in query-frontend for `query` and `query_range` APIs. 0 to disable.
3574+
# CLI flag: -frontend.max-query-response-size
3575+
[max_query_response_size: <int> | default = 0]
3576+
35713577
# Most recent allowed cacheable result per-tenant, to prevent caching very
35723578
# recent results that might still be in flux.
35733579
# CLI flag: -frontend.max-cache-freshness

integration/query_frontend_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"os"
1414
"path/filepath"
1515
"strconv"
16+
"strings"
1617
"sync"
1718
"testing"
1819
"time"
@@ -952,3 +953,67 @@ func TestQueryFrontendStatsFromResultsCacheShouldBeSame(t *testing.T) {
952953
// we expect same amount of samples_scanned added to the metric despite the second query hit the cache.
953954
require.Equal(t, numSamplesScannedTotal2, numSamplesScannedTotal*2)
954955
}
956+
957+
func TestQueryFrontendResponseSizeLimit(t *testing.T) {
958+
s, err := e2e.NewScenario(networkName)
959+
require.NoError(t, err)
960+
defer s.Close()
961+
962+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
963+
"-frontend.max-query-response-size": "4096",
964+
"-querier.split-queries-by-interval": "1m",
965+
})
966+
967+
consul := e2edb.NewConsul()
968+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
969+
require.NoError(t, s.StartAndWaitReady(consul, minio))
970+
971+
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
972+
require.NoError(t, s.Start(queryFrontend))
973+
974+
flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint()
975+
976+
ingester := e2ecortex.NewIngesterWithConfigFile("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", flags, "")
977+
distributor := e2ecortex.NewDistributorWithConfigFile("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", flags, "")
978+
979+
querier := e2ecortex.NewQuerierWithConfigFile("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", flags, "")
980+
981+
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))
982+
require.NoError(t, s.WaitReady(queryFrontend))
983+
984+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
985+
require.NoError(t, err)
986+
987+
startTime := time.Now().Add(-1 * time.Hour)
988+
for i := 0; i < 10; i++ {
989+
ts := startTime.Add(time.Duration(i) * time.Minute)
990+
longLabelValue1 := strings.Repeat("long_label_value_1_", 100)
991+
longLabelValue2 := strings.Repeat("long_label_value_2_", 100)
992+
largeSeries, _ := generateSeries("large_series", ts, prompb.Label{Name: "label1", Value: longLabelValue1}, prompb.Label{Name: "label2", Value: longLabelValue2})
993+
res, err := c.Push(largeSeries)
994+
require.NoError(t, err)
995+
require.Equal(t, http.StatusOK, res.StatusCode)
996+
997+
smallSeries, _ := generateSeries("small_series", ts)
998+
res, err = c.Push(smallSeries)
999+
require.NoError(t, err)
1000+
require.Equal(t, http.StatusOK, res.StatusCode)
1001+
}
1002+
1003+
qfeClient, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1")
1004+
require.NoError(t, err)
1005+
1006+
queryStart := startTime.Add(-1 * time.Minute)
1007+
queryEnd := startTime.Add(10 * time.Minute)
1008+
1009+
// Expect response size larger than limit (4 KB)
1010+
resp, body, err := qfeClient.QueryRangeRaw(`{__name__="large_series"}`, queryStart, queryEnd, 30*time.Second, nil)
1011+
require.NoError(t, err)
1012+
require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode)
1013+
require.Contains(t, string(body), "the query response size exceeds limit")
1014+
1015+
// Expect response size less than limit (4 KB)
1016+
resp, _, err = qfeClient.QueryRangeRaw(`{__name__="small_series"}`, queryStart, queryEnd, 30*time.Second, nil)
1017+
require.NoError(t, err)
1018+
require.Equal(t, http.StatusOK, resp.StatusCode)
1019+
}

pkg/frontend/transport/handler.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cortexproject/cortex/pkg/tenant"
2828
"github.com/cortexproject/cortex/pkg/util"
2929
util_api "github.com/cortexproject/cortex/pkg/util/api"
30+
"github.com/cortexproject/cortex/pkg/util/limiter"
3031
util_log "github.com/cortexproject/cortex/pkg/util/log"
3132
)
3233

@@ -72,6 +73,8 @@ const (
7273
limitBytesStoreGateway = `exceeded bytes limit`
7374
)
7475

76+
var noopResponseSizeLimiter = limiter.NewResponseSizeLimiter(0)
77+
7578
// Config for a Handler.
7679
type HandlerConfig struct {
7780
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
@@ -277,7 +280,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
277280
// If the response status code is not 2xx, try to get the
278281
// error message from response body.
279282
if resp.StatusCode/100 != 2 {
280-
body, err2 := tripperware.BodyBuffer(resp, f.log)
283+
body, err2 := tripperware.BodyBytes(resp, noopResponseSizeLimiter, f.log)
281284
if err2 == nil {
282285
err = httpgrpc.Errorf(resp.StatusCode, "%s", string(body))
283286
}

pkg/frontend/transport/retry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error)
5656
// This is not that efficient as we might decode the body multiple
5757
// times. But error response should be too large so we should be fine.
5858
// TODO: investigate ways to decode only once.
59-
body, err := tripperware.BodyBufferFromHTTPGRPCResponse(resp, nil)
59+
body, err := tripperware.BodyBytesFromHTTPGRPCResponse(resp, nil)
6060
if err != nil {
6161
return nil, err
6262
}

pkg/querier/tripperware/instantquery/instant_query.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/cortexproject/cortex/pkg/querier/stats"
2323
"github.com/cortexproject/cortex/pkg/querier/tripperware"
2424
"github.com/cortexproject/cortex/pkg/util"
25+
"github.com/cortexproject/cortex/pkg/util/limiter"
2526
"github.com/cortexproject/cortex/pkg/util/spanlogger"
2627
)
2728

@@ -92,25 +93,27 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for
9293
return &result, nil
9394
}
9495

95-
func (instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _ tripperware.Request) (tripperware.Response, error) {
96+
func (c instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _ tripperware.Request) (tripperware.Response, error) {
9697
log, ctx := spanlogger.New(ctx, "DecodeQueryInstantResponse") //nolint:ineffassign,staticcheck
9798
defer log.Finish()
9899

99100
if err := ctx.Err(); err != nil {
100101
return nil, err
101102
}
102103

103-
buf, err := tripperware.BodyBuffer(r, log)
104+
responseSizeLimiter := limiter.ResponseSizeLimiterFromContextWithFallback(ctx)
105+
body, err := tripperware.BodyBytes(r, responseSizeLimiter, log)
104106
if err != nil {
105107
log.Error(err)
106108
return nil, err
107109
}
110+
108111
if r.StatusCode/100 != 2 {
109-
return nil, httpgrpc.Errorf(r.StatusCode, "%s", string(buf))
112+
return nil, httpgrpc.Errorf(r.StatusCode, "%s", string(body))
110113
}
111114

112115
var resp tripperware.PrometheusResponse
113-
err = tripperware.UnmarshalResponse(r, buf, &resp)
116+
err = tripperware.UnmarshalResponse(r, body, &resp)
114117

115118
if err != nil {
116119
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)

pkg/querier/tripperware/instantquery/instant_query_test.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
)
2727

2828
var testInstantQueryCodec = NewInstantQueryCodec(string(tripperware.NonCompression), string(tripperware.ProtobufCodecType))
29-
3029
var jsonHttpReq = &http.Request{
3130
Header: map[string][]string{
3231
"Accept": {"application/json"},
@@ -190,7 +189,9 @@ func TestCompressedResponse(t *testing.T) {
190189
Header: h,
191190
Body: io.NopCloser(responseBody),
192191
}
193-
resp, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)
192+
193+
ctx := user.InjectOrgID(context.Background(), "1")
194+
resp, err := testInstantQueryCodec.DecodeResponse(ctx, response, nil)
194195
require.Equal(t, tc.err, err)
195196

196197
if err == nil {
@@ -454,7 +455,8 @@ func TestResponse(t *testing.T) {
454455
}
455456
}
456457

457-
resp, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)
458+
ctx := user.InjectOrgID(context.Background(), "1")
459+
resp, err := testInstantQueryCodec.DecodeResponse(ctx, response, nil)
458460
require.NoError(t, err)
459461

460462
// Reset response, as the above call will have consumed the body reader.
@@ -464,7 +466,7 @@ func TestResponse(t *testing.T) {
464466
Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))),
465467
ContentLength: int64(len(tc.jsonBody)),
466468
}
467-
resp2, err := testInstantQueryCodec.EncodeResponse(context.Background(), jsonHttpReq, resp)
469+
resp2, err := testInstantQueryCodec.EncodeResponse(ctx, jsonHttpReq, resp)
468470
require.NoError(t, err)
469471
assert.Equal(t, response, resp2)
470472
})
@@ -710,7 +712,7 @@ func TestMergeResponse(t *testing.T) {
710712
tc := tc
711713
t.Run(tc.name, func(t *testing.T) {
712714
t.Parallel()
713-
ctx, cancelCtx := context.WithCancel(context.Background())
715+
ctx, cancelCtx := context.WithCancel(user.InjectOrgID(context.Background(), "1"))
714716

715717
var resps []tripperware.Response
716718
for _, r := range tc.resps {
@@ -1723,7 +1725,7 @@ func TestMergeResponseProtobuf(t *testing.T) {
17231725
tc := tc
17241726
t.Run(tc.name, func(t *testing.T) {
17251727
t.Parallel()
1726-
ctx, cancelCtx := context.WithCancel(context.Background())
1728+
ctx, cancelCtx := context.WithCancel(user.InjectOrgID(context.Background(), "1"))
17271729

17281730
var resps []tripperware.Response
17291731
for _, r := range tc.resps {
@@ -1870,7 +1872,9 @@ func Benchmark_Decode(b *testing.B) {
18701872
StatusCode: 200,
18711873
Body: io.NopCloser(bytes.NewBuffer(body)),
18721874
}
1873-
_, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)
1875+
1876+
ctx := user.InjectOrgID(context.Background(), "1")
1877+
_, err := testInstantQueryCodec.DecodeResponse(ctx, response, nil)
18741878
require.NoError(b, err)
18751879
}
18761880
})
@@ -1933,7 +1937,9 @@ func Benchmark_Decode_Protobuf(b *testing.B) {
19331937
Header: http.Header{"Content-Type": []string{"application/x-protobuf"}},
19341938
Body: io.NopCloser(bytes.NewBuffer(body)),
19351939
}
1936-
_, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)
1940+
1941+
ctx := user.InjectOrgID(context.Background(), "1")
1942+
_, err := testInstantQueryCodec.DecodeResponse(ctx, response, nil)
19371943
require.NoError(b, err)
19381944
}
19391945
})

pkg/querier/tripperware/limits.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ type Limits interface {
1919
// frontend will process in parallel.
2020
MaxQueryParallelism(string) int
2121

22+
// MaxQueryResponseSize returns the max total response size of a query in bytes.
23+
MaxQueryResponseSize(string) int64
24+
2225
// MaxCacheFreshness returns the period after which results are cacheable,
2326
// to prevent caching of very recent results.
2427
MaxCacheFreshness(string) time.Duration

pkg/querier/tripperware/query.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"compress/gzip"
66
"context"
7+
"encoding/binary"
78
"fmt"
89
"io"
910
"net/http"
@@ -14,7 +15,6 @@ import (
1415

1516
"github.com/go-kit/log"
1617
"github.com/gogo/protobuf/proto"
17-
"github.com/golang/snappy"
1818
jsoniter "github.com/json-iterator/go"
1919
"github.com/opentracing/opentracing-go"
2020
otlog "github.com/opentracing/opentracing-go/log"
@@ -26,6 +26,7 @@ import (
2626
"github.com/weaveworks/common/httpgrpc"
2727

2828
"github.com/cortexproject/cortex/pkg/cortexpb"
29+
"github.com/cortexproject/cortex/pkg/util/limiter"
2930
"github.com/cortexproject/cortex/pkg/util/runutil"
3031
)
3132

@@ -448,7 +449,7 @@ type Buffer interface {
448449
Bytes() []byte
449450
}
450451

451-
func BodyBuffer(res *http.Response, logger log.Logger) ([]byte, error) {
452+
func BodyBytes(res *http.Response, responseSizeLimiter *limiter.ResponseSizeLimiter, logger log.Logger) ([]byte, error) {
452453
var buf *bytes.Buffer
453454

454455
// Attempt to cast the response body to a Buffer and use it if possible.
@@ -466,6 +467,11 @@ func BodyBuffer(res *http.Response, logger log.Logger) ([]byte, error) {
466467
}
467468
}
468469

470+
responseSize := getResponseSize(res, buf)
471+
if err := responseSizeLimiter.AddResponseBytes(responseSize); err != nil {
472+
return nil, httpgrpc.Errorf(http.StatusUnprocessableEntity, "%s", err.Error())
473+
}
474+
469475
// if the response is gzipped, lets unzip it here
470476
if strings.EqualFold(res.Header.Get("Content-Encoding"), "gzip") {
471477
gReader, err := gzip.NewReader(buf)
@@ -475,15 +481,12 @@ func BodyBuffer(res *http.Response, logger log.Logger) ([]byte, error) {
475481
defer runutil.CloseWithLogOnErr(logger, gReader, "close gzip reader")
476482

477483
return io.ReadAll(gReader)
478-
} else if strings.EqualFold(res.Header.Get("Content-Encoding"), "snappy") {
479-
sReader := snappy.NewReader(buf)
480-
return io.ReadAll(sReader)
481484
}
482485

483486
return buf.Bytes(), nil
484487
}
485488

486-
func BodyBufferFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logger) ([]byte, error) {
489+
func BodyBytesFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logger) ([]byte, error) {
487490
// if the response is gzipped, lets unzip it here
488491
headers := http.Header{}
489492
for _, h := range res.Headers {
@@ -502,6 +505,15 @@ func BodyBufferFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logge
502505
return res.Body, nil
503506
}
504507

508+
func getResponseSize(res *http.Response, buf *bytes.Buffer) int {
509+
if strings.EqualFold(res.Header.Get("Content-Encoding"), "gzip") && len(buf.Bytes()) >= 4 {
510+
// GZIP body contains the size of the original (uncompressed) input data
511+
// modulo 2^32 in the last 4 bytes (https://www.ietf.org/rfc/rfc1952.txt).
512+
return int(binary.LittleEndian.Uint32(buf.Bytes()[len(buf.Bytes())-4:]))
513+
}
514+
return len(buf.Bytes())
515+
}
516+
505517
// UnmarshalJSON implements json.Unmarshaler.
506518
func (s *PrometheusData) UnmarshalJSON(data []byte) error {
507519
var queryData struct {

pkg/querier/tripperware/query_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package tripperware
22

33
import (
4+
"bytes"
5+
"compress/gzip"
46
"math"
7+
"net/http"
58
"strconv"
69
"testing"
710
"time"
@@ -193,3 +196,50 @@ func generateData(timeseries, datapoints int) (floatMatrix, histogramMatrix []*S
193196
}
194197
return
195198
}
199+
200+
func Test_getResponseSize(t *testing.T) {
201+
tests := []struct {
202+
body []byte
203+
useGzip bool
204+
}{
205+
{
206+
body: []byte(`foo`),
207+
useGzip: false,
208+
},
209+
{
210+
body: []byte(`foo`),
211+
useGzip: true,
212+
},
213+
{
214+
body: []byte(`{"status":"success","data":{"resultType":"vector","result":[]}}`),
215+
useGzip: false,
216+
},
217+
{
218+
body: []byte(`{"status":"success","data":{"resultType":"vector","result":[]}}`),
219+
useGzip: true,
220+
},
221+
}
222+
223+
for i, test := range tests {
224+
t.Run(strconv.Itoa(i), func(t *testing.T) {
225+
expectedBodyLength := len(test.body)
226+
buf := &bytes.Buffer{}
227+
response := &http.Response{}
228+
229+
if test.useGzip {
230+
response = &http.Response{
231+
Header: http.Header{"Content-Encoding": []string{"gzip"}},
232+
}
233+
w := gzip.NewWriter(buf)
234+
_, err := w.Write(test.body)
235+
require.NoError(t, err)
236+
w.Close()
237+
} else {
238+
buf = bytes.NewBuffer(test.body)
239+
}
240+
241+
bodyLength := getResponseSize(response, buf)
242+
require.Equal(t, expectedBodyLength, bodyLength)
243+
})
244+
}
245+
}

0 commit comments

Comments
 (0)