Skip to content

Commit 4101d10

Browse files
authored
Resource based throttling bugfix 2 (#7013)
* Make error from RBT query rejection retryable in querier Signed-off-by: Justin Jung <[email protected]> * Add tests Signed-off-by: Justin Jung <[email protected]> * Lint Signed-off-by: Justin Jung <[email protected]> * Lint import Signed-off-by: Justin Jung <[email protected]> * Added test in error translate queryable Signed-off-by: Justin Jung <[email protected]> --------- Signed-off-by: Justin Jung <[email protected]>
1 parent 4cc3e14 commit 4101d10

File tree

10 files changed

+48
-37
lines changed

10 files changed

+48
-37
lines changed

pkg/frontend/transport/handler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
388388
cfg: HandlerConfig{QueryStatsEnabled: true},
389389
expectedMetrics: 6,
390390
roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
391-
resourceLimitReachedErr := &limiter.ResourceLimitReachedError{}
391+
resourceLimitReachedErr := limiter.ErrResourceLimitReached
392392
return &http.Response{
393393
StatusCode: http.StatusServiceUnavailable,
394394
Body: io.NopCloser(strings.NewReader(resourceLimitReachedErr.Error())),

pkg/ingester/ingester.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2265,7 +2265,7 @@ func (i *Ingester) trackInflightQueryRequest() (func(), error) {
22652265
if i.resourceBasedLimiter != nil {
22662266
if err := i.resourceBasedLimiter.AcceptNewRequest(); err != nil {
22672267
level.Warn(i.logger).Log("msg", "failed to accept request", "err", err)
2268-
return nil, httpgrpc.Errorf(http.StatusServiceUnavailable, "failed to query: %s", limiter.ErrResourceLimitReachedStr)
2268+
return nil, limiter.ErrResourceLimitReached
22692269
}
22702270
}
22712271

pkg/ingester/ingester_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3230,8 +3230,9 @@ func Test_Ingester_Query_ResourceThresholdBreached(t *testing.T) {
32303230
s := &mockQueryStreamServer{ctx: ctx}
32313231
err = i.QueryStream(rreq, s)
32323232
require.Error(t, err)
3233-
exhaustedErr := limiter.ResourceLimitReachedError{}
3234-
require.ErrorContains(t, err, exhaustedErr.Error())
3233+
3234+
// Expected error from isRetryableError in blocks_store_queryable.go
3235+
require.ErrorIs(t, err, limiter.ErrResourceLimitReached)
32353236
}
32363237

32373238
func TestIngester_LabelValues_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) {

pkg/querier/blocks_store_queryable.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,17 +1204,11 @@ func countSamplesAndChunks(series ...*storepb.Series) (samplesCount, chunksCount
12041204

12051205
// only retry connection issues
12061206
func isRetryableError(err error) bool {
1207-
// retry upon resource exhaustion error from resource monitor
1208-
var resourceExhaustedErr *limiter.ResourceLimitReachedError
1209-
if errors.As(err, &resourceExhaustedErr) {
1210-
return true
1211-
}
1212-
12131207
switch status.Code(err) {
12141208
case codes.Unavailable:
12151209
return true
12161210
case codes.ResourceExhausted:
1217-
return errors.Is(err, storegateway.ErrTooManyInflightRequests)
1211+
return errors.Is(err, storegateway.ErrTooManyInflightRequests) || errors.Is(err, limiter.ErrResourceLimitReached)
12181212
// Client side connection closing, this error happens during store gateway deployment.
12191213
// https://github.com/grpc/grpc-go/blob/03172006f5d168fc646d87928d85cb9c4a480291/clientconn.go#L67
12201214
case codes.Canceled:

pkg/querier/blocks_store_queryable_test.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"io"
7+
"net/http"
78
"sort"
89
"strings"
910
"testing"
@@ -31,6 +32,7 @@ import (
3132
"github.com/thanos-io/thanos/pkg/store/hintspb"
3233
"github.com/thanos-io/thanos/pkg/store/labelpb"
3334
"github.com/thanos-io/thanos/pkg/store/storepb"
35+
"github.com/weaveworks/common/httpgrpc"
3436
"github.com/weaveworks/common/user"
3537
"google.golang.org/grpc"
3638
"google.golang.org/grpc/codes"
@@ -44,7 +46,6 @@ import (
4446
"github.com/cortexproject/cortex/pkg/util"
4547
"github.com/cortexproject/cortex/pkg/util/limiter"
4648
util_log "github.com/cortexproject/cortex/pkg/util/log"
47-
"github.com/cortexproject/cortex/pkg/util/resource"
4849
"github.com/cortexproject/cortex/pkg/util/services"
4950
"github.com/cortexproject/cortex/pkg/util/validation"
5051
)
@@ -1527,7 +1528,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
15271528
map[BlocksStoreClient][]ulid.ULID{
15281529
&storeGatewayClientMock{
15291530
remoteAddr: "1.1.1.1",
1530-
mockedSeriesErr: &limiter.ResourceLimitReachedError{},
1531+
mockedSeriesErr: limiter.ErrResourceLimitReached,
15311532
}: {block1},
15321533
},
15331534
map[BlocksStoreClient][]ulid.ULID{
@@ -2499,20 +2500,18 @@ func TestBlocksStoreQuerier_PromQLExecution(t *testing.T) {
24992500
}
25002501
}
25012502

2502-
func TestBlocksStoreQuerier_ShouldRetryResourceBasedThrottlingError(t *testing.T) {
2503-
limits := map[resource.Type]float64{
2504-
resource.CPU: 0.5,
2505-
resource.Heap: 0.5,
2506-
}
2507-
2508-
resourceBasedLimiter, err := limiter.NewResourceBasedLimiter(&limiter.MockMonitor{
2509-
CpuUtilization: 0.7,
2510-
HeapUtilization: 0.7,
2511-
}, limits, prometheus.DefaultRegisterer, "ingester")
2512-
require.NoError(t, err)
2513-
2514-
err = resourceBasedLimiter.AcceptNewRequest()
2515-
require.True(t, isRetryableError(err))
2503+
func TestBlocksStoreQuerier_isRetryableError(t *testing.T) {
2504+
require.True(t, isRetryableError(status.Error(codes.Unavailable, "")))
2505+
require.True(t, isRetryableError(storegateway.ErrTooManyInflightRequests))
2506+
require.True(t, isRetryableError(limiter.ErrResourceLimitReached))
2507+
require.True(t, isRetryableError(status.Error(codes.Canceled, "grpc: the client connection is closing")))
2508+
require.True(t, isRetryableError(errors.New("pool exhausted")))
2509+
2510+
require.False(t, isRetryableError(status.Error(codes.ResourceExhausted, "some other error")))
2511+
require.False(t, isRetryableError(status.Error(codes.Canceled, "some other error")))
2512+
require.False(t, isRetryableError(errors.New("some other error")))
2513+
require.False(t, isRetryableError(fmt.Errorf("some other error")))
2514+
require.False(t, isRetryableError(httpgrpc.Errorf(http.StatusServiceUnavailable, "some other error")))
25162515
}
25172516

25182517
type blocksStoreSetMock struct {

pkg/querier/error_translate_queryable_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"github.com/weaveworks/common/httpgrpc"
2525
"github.com/weaveworks/common/user"
2626

27+
"github.com/cortexproject/cortex/pkg/storegateway"
28+
"github.com/cortexproject/cortex/pkg/util/limiter"
2729
"github.com/cortexproject/cortex/pkg/util/validation"
2830
)
2931

@@ -113,6 +115,16 @@ func TestApiStatusCodes(t *testing.T) {
113115
expectedString: "test string",
114116
expectedCode: 422,
115117
},
118+
{
119+
err: storegateway.ErrTooManyInflightRequests,
120+
expectedString: "too many inflight requests in store gateway",
121+
expectedCode: 500,
122+
},
123+
{
124+
err: limiter.ErrResourceLimitReached,
125+
expectedString: limiter.ErrResourceLimitReachedStr,
126+
expectedCode: 500,
127+
},
116128
} {
117129
for k, q := range map[string]storage.SampleAndChunkQueryable{
118130
"error from queryable": errorTestQueryable{err: tc.err},

pkg/storegateway/gateway.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/thanos-io/objstore"
1717
"github.com/thanos-io/thanos/pkg/extprom"
1818
"github.com/thanos-io/thanos/pkg/store/storepb"
19-
"github.com/weaveworks/common/httpgrpc"
2019
"github.com/weaveworks/common/logging"
2120

2221
"github.com/cortexproject/cortex/pkg/configs"
@@ -437,7 +436,7 @@ func (g *StoreGateway) checkResourceUtilization() error {
437436

438437
if err := g.resourceBasedLimiter.AcceptNewRequest(); err != nil {
439438
level.Warn(g.logger).Log("msg", "failed to accept request", "err", err)
440-
return httpgrpc.Errorf(http.StatusServiceUnavailable, "failed to query: %s", util_limiter.ErrResourceLimitReachedStr)
439+
return util_limiter.ErrResourceLimitReached
441440
}
442441

443442
return nil

pkg/storegateway/gateway_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,8 +1239,9 @@ func TestStoreGateway_SeriesThrottledByResourceMonitor(t *testing.T) {
12391239
srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID))
12401240
err = g.Series(req, srv)
12411241
require.Error(t, err)
1242-
exhaustedErr := util_limiter.ResourceLimitReachedError{}
1243-
require.ErrorContains(t, err, exhaustedErr.Error())
1242+
1243+
// Expected error from isRetryableError in blocks_store_queryable.go
1244+
require.ErrorIs(t, err, util_limiter.ErrResourceLimitReached)
12441245
}
12451246

12461247
func mockGatewayConfig() Config {

pkg/util/limiter/resource_based_limiter.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,15 @@ import (
55

66
"github.com/prometheus/client_golang/prometheus"
77
"github.com/prometheus/client_golang/prometheus/promauto"
8+
"google.golang.org/grpc/codes"
9+
"google.golang.org/grpc/status"
810

911
"github.com/cortexproject/cortex/pkg/util/resource"
1012
)
1113

1214
const ErrResourceLimitReachedStr = "resource limit reached"
1315

14-
type ResourceLimitReachedError struct{}
15-
16-
func (e *ResourceLimitReachedError) Error() string {
17-
return ErrResourceLimitReachedStr
18-
}
16+
var ErrResourceLimitReached = status.Error(codes.ResourceExhausted, ErrResourceLimitReachedStr)
1917

2018
type ResourceBasedLimiter struct {
2119
resourceMonitor resource.IMonitor
@@ -64,7 +62,7 @@ func (l *ResourceBasedLimiter) AcceptNewRequest() error {
6462

6563
if utilization >= limit {
6664
l.limitBreachedCount.WithLabelValues(string(resType)).Inc()
67-
return fmt.Errorf("%s utilization limit reached (limit: %.3f, utilization: %.3f): %w", resType, limit, utilization, &ResourceLimitReachedError{})
65+
return fmt.Errorf("%s utilization limit reached (limit: %.3f, utilization: %.3f): %s", resType, limit, utilization, ErrResourceLimitReachedStr)
6866
}
6967
}
7068

pkg/util/limiter/resource_based_limiter_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55

66
"github.com/prometheus/client_golang/prometheus"
77
"github.com/stretchr/testify/require"
8+
"google.golang.org/grpc/codes"
9+
"google.golang.org/grpc/status"
810

911
"github.com/cortexproject/cortex/pkg/util/resource"
1012
)
@@ -24,3 +26,8 @@ func Test_ResourceBasedLimiter(t *testing.T) {
2426
err = limiter.AcceptNewRequest()
2527
require.NoError(t, err)
2628
}
29+
30+
func Test_ResourceBasedLimiter_ErrResourceLimitReached(t *testing.T) {
31+
// Expected error code from isRetryableError in blocks_store_queryable.go
32+
require.Equal(t, codes.ResourceExhausted, status.Code(ErrResourceLimitReached))
33+
}

0 commit comments

Comments
 (0)