Skip to content

Commit 2651ebf

Browse files
committed
Add max pending request limiting
1 parent a26ab5f commit 2651ebf

File tree

5 files changed

+152
-33
lines changed

5 files changed

+152
-33
lines changed

cmd/thanos/receive.go

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,19 @@ func runReceive(
268268
return errors.Wrap(err, "parse limit configuration")
269269
}
270270
}
271-
limiter, err := receive.NewLimiter(conf.writeLimitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"), conf.limitsConfigReloadTimer)
271+
if conf.maxPendingGrpcWriteRequests > 0 {
272+
level.Info(logger).Log("msg", "set max pending gRPC write request in limiter", "max_pending_requests", conf.maxPendingGrpcWriteRequests)
273+
}
274+
limiter, err := receive.NewLimiterWithOptions(
275+
conf.writeLimitsConfig,
276+
reg,
277+
receiveMode,
278+
log.With(logger, "component", "receive-limiter"),
279+
conf.limitsConfigReloadTimer,
280+
receive.LimiterOptions{
281+
MaxPendingRequests: int32(conf.maxPendingGrpcWriteRequests),
282+
},
283+
)
272284
if err != nil {
273285
return errors.Wrap(err, "creating limiter")
274286
}
@@ -408,7 +420,20 @@ func runReceive(
408420
store.LazyRetrieval,
409421
options...,
410422
)
411-
mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits)
423+
if conf.maxPendingGrpcReadRequests > 0 {
424+
level.Info(logger).Log(
425+
"msg", "set max pending gRPC read request in instrumented store server",
426+
"max_pending_requests", conf.maxPendingGrpcReadRequests,
427+
)
428+
}
429+
mts := store.NewLimitedStoreServerWithOptions(
430+
store.NewInstrumentedStoreServer(reg, proxy),
431+
reg,
432+
conf.storeRateLimits,
433+
store.LimitsOptions{
434+
MaxPendingSeriesRequests: int32(conf.maxPendingGrpcReadRequests),
435+
},
436+
)
412437
rw := store.ReadWriteTSDBStore{
413438
StoreServer: mts,
414439
WriteableStoreServer: webHandler,
@@ -974,6 +999,8 @@ type receiveConfig struct {
974999
topMetricsMinimumCardinality uint64
9751000
topMetricsUpdateInterval time.Duration
9761001
matcherConverterCacheCapacity int
1002+
maxPendingGrpcReadRequests int
1003+
maxPendingGrpcWriteRequests int
9771004

9781005
featureList *[]string
9791006
}
@@ -1138,6 +1165,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
11381165
Default("5m").DurationVar(&rc.topMetricsUpdateInterval)
11391166
cmd.Flag("receive.store-matcher-converter-cache-capacity", "The number of label matchers to cache in the matcher converter for the Store API. Set to 0 to disable to cache. Default is 0.").
11401167
Default("0").IntVar(&rc.matcherConverterCacheCapacity)
1168+
cmd.Flag("receive.max-pending-grcp-read-requests", "Throttle gRPC read requests when this number of requests are pending. Value 0 disables this feature.").
1169+
Default("0").IntVar(&rc.maxPendingGrpcReadRequests)
1170+
cmd.Flag("receive.max-pending-grcp-write-requests", "Throttle gRPC write requests when this number of requests are pending. Value 0 disables this feature.").
1171+
Default("0").IntVar(&rc.maxPendingGrpcWriteRequests)
11411172
rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings()
11421173
}
11431174

pkg/receive/handler.go

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import (
3636
"github.com/prometheus/prometheus/tsdb"
3737
"go.opentelemetry.io/otel/attribute"
3838
"go.opentelemetry.io/otel/trace"
39-
"go.uber.org/atomic"
4039
"golang.org/x/exp/slices"
4140
"google.golang.org/grpc"
4241
"google.golang.org/grpc/codes"
@@ -142,9 +141,6 @@ type Handler struct {
142141
writeTimeseriesTotal *prometheus.HistogramVec
143142
writeE2eLatency *prometheus.HistogramVec
144143

145-
pendingWriteRequests prometheus.Gauge
146-
pendingWriteRequestsCounter atomic.Int32
147-
148144
Limiter *Limiter
149145
}
150146

@@ -235,12 +231,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
235231
Buckets: []float64{1, 5, 10, 20, 30, 40, 50, 60, 90, 120, 300, 600, 900, 1200, 1800, 3600},
236232
}, []string{"code", "tenant", "rollup"},
237233
),
238-
pendingWriteRequests: promauto.With(registerer).NewGauge(
239-
prometheus.GaugeOpts{
240-
Name: "thanos_receive_pending_write_requests",
241-
Help: "The number of pending write requests.",
242-
},
243-
),
244234
}
245235

246236
h.forwardRequests.WithLabelValues(labelSuccess)
@@ -1083,12 +1073,15 @@ func quorumReached(successes []int, successThreshold int) bool {
10831073

10841074
// RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore.
10851075
func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*storepb.WriteResponse, error) {
1076+
if h.Limiter.ShouldRejectNewRequest() {
1077+
return nil, status.Error(codes.ResourceExhausted, "too many pending write requests")
1078+
}
1079+
// NB: ShouldRejectNewRequest() increments the number of pending requests only when it returns false.
1080+
defer h.Limiter.DecrementPendingRequests()
1081+
10861082
span, ctx := tracing.StartSpan(ctx, "receive_grpc")
10871083
defer span.Finish()
10881084

1089-
h.pendingWriteRequests.Set(float64(h.pendingWriteRequestsCounter.Add(1)))
1090-
defer h.pendingWriteRequestsCounter.Add(-1)
1091-
10921085
_, err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries})
10931086
if err != nil {
10941087
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)

pkg/receive/limiter.go

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"github.com/prometheus/client_golang/prometheus"
1919
"github.com/thanos-io/thanos/pkg/extprom"
2020
"github.com/thanos-io/thanos/pkg/gate"
21+
22+
"go.uber.org/atomic"
2123
)
2224

2325
// Limiter is responsible for managing the configuration and initialization of
@@ -35,6 +37,19 @@ type Limiter struct {
3537
configReloadFailedCounter prometheus.Counter
3638
receiverMode ReceiverMode
3739
configReloadTimer time.Duration
40+
41+
// Reject a request if this limit is reached.
42+
// This filed is set at the instance creation and never changes afterwards.
43+
// So it's safe to read it without a lock.
44+
maxPendingRequests int32
45+
maxPendingRequestLimitHit prometheus.Counter
46+
pendingRequests atomic.Int32
47+
pendingRequestsGauge prometheus.Gauge
48+
}
49+
50+
type LimiterOptions struct {
51+
// Value 0 disables the max pending request limiting hehavior.
52+
MaxPendingRequests int32
3853
}
3954

4055
// headSeriesLimiter encompasses active/head series limiting logic.
@@ -62,16 +77,50 @@ func (l *Limiter) HeadSeriesLimiter() headSeriesLimiter {
6277
return l.headSeriesLimiter
6378
}
6479

80+
func (l *Limiter) ShouldRejectNewRequest() bool {
81+
// maxPendingRequests doesn't change once set when a limiter instance is created.
82+
// So, it's safe to read it without a lock.
83+
if l.maxPendingRequests > 0 && l.pendingRequests.Load() >= l.maxPendingRequests {
84+
if l.maxPendingRequestLimitHit != nil {
85+
l.maxPendingRequestLimitHit.Inc()
86+
}
87+
return true
88+
}
89+
newValue := l.pendingRequests.Add(1)
90+
if l.pendingRequestsGauge != nil {
91+
l.pendingRequestsGauge.Set(float64(newValue))
92+
}
93+
return false
94+
}
95+
96+
func (l *Limiter) DecrementPendingRequests() {
97+
newValue := l.pendingRequests.Add(-1)
98+
if l.pendingRequestsGauge != nil {
99+
l.pendingRequestsGauge.Set(float64(newValue))
100+
}
101+
}
102+
65103
// NewLimiter creates a new *Limiter given a configuration and prometheus
66104
// registerer.
67105
func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMode, logger log.Logger, configReloadTimer time.Duration) (*Limiter, error) {
106+
return NewLimiterWithOptions(configFile, reg, r, logger, configReloadTimer, LimiterOptions{})
107+
}
108+
109+
func NewLimiterWithOptions(
110+
configFile fileContent,
111+
reg prometheus.Registerer,
112+
r ReceiverMode,
113+
logger log.Logger,
114+
configReloadTimer time.Duration,
115+
config LimiterOptions) (*Limiter, error) {
68116
limiter := &Limiter{
69-
writeGate: gate.NewNoop(),
70-
requestLimiter: &noopRequestLimiter{},
71-
headSeriesLimiter: NewNopSeriesLimit(),
72-
logger: logger,
73-
receiverMode: r,
74-
configReloadTimer: configReloadTimer,
117+
writeGate: gate.NewNoop(),
118+
requestLimiter: &noopRequestLimiter{},
119+
headSeriesLimiter: NewNopSeriesLimit(),
120+
logger: logger,
121+
receiverMode: r,
122+
configReloadTimer: configReloadTimer,
123+
maxPendingRequests: config.MaxPendingRequests,
75124
}
76125

77126
if reg != nil {
@@ -92,6 +141,26 @@ func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMod
92141
Help: "How many times the limit configuration failed to reload.",
93142
},
94143
)
144+
limiter.configReloadFailedCounter = promauto.With(limiter.registerer).NewCounter(
145+
prometheus.CounterOpts{
146+
Namespace: "thanos",
147+
Subsystem: "receive",
148+
Name: "limits_config_reload_err_total",
149+
Help: "How many times the limit configuration failed to reload.",
150+
},
151+
)
152+
limiter.maxPendingRequestLimitHit = promauto.With(limiter.registerer).NewCounter(
153+
prometheus.CounterOpts{
154+
Name: "thanos_receive_max_pending_write_request_limit_hit_total",
155+
Help: "Number of times the max pending write request limit was hit",
156+
},
157+
)
158+
limiter.pendingRequestsGauge = promauto.With(limiter.registerer).NewGauge(
159+
prometheus.GaugeOpts{
160+
Name: "thanos_receive_pending_write_requests",
161+
Help: "Number of pending write requests",
162+
},
163+
)
95164
}
96165

97166
if configFile == nil {

pkg/store/limiter.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"github.com/prometheus/client_golang/prometheus"
1212
"github.com/prometheus/client_golang/prometheus/promauto"
1313
"go.uber.org/atomic"
14+
"google.golang.org/grpc/codes"
15+
"google.golang.org/grpc/status"
1416

1517
"github.com/thanos-io/thanos/pkg/extkingpin"
1618
"github.com/thanos-io/thanos/pkg/store/storepb"
@@ -123,10 +125,31 @@ type limitedStoreServer struct {
123125
newSeriesLimiter SeriesLimiterFactory
124126
newSamplesLimiter ChunksLimiterFactory
125127
failedRequestsCounter *prometheus.CounterVec
128+
129+
// This is a read-only field once it's set.
130+
// Value 0 disables the feature.
131+
maxPendingRequests int32
132+
pendingRequests atomic.Int32
133+
maxPendingRequestLimitHit prometheus.Counter
134+
pendingRequestsGauge prometheus.Gauge
135+
}
136+
137+
type LimitsOptions struct {
138+
// Value 0 disables the feature.
139+
MaxPendingSeriesRequests int32
126140
}
127141

128142
// NewLimitedStoreServer creates a new limitedStoreServer.
129143
func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer, selectLimits SeriesSelectLimits) storepb.StoreServer {
144+
return NewLimitedStoreServerWithOptions(store, reg, selectLimits, LimitsOptions{})
145+
}
146+
147+
func NewLimitedStoreServerWithOptions(
148+
store storepb.StoreServer,
149+
reg prometheus.Registerer,
150+
selectLimits SeriesSelectLimits,
151+
opts LimitsOptions,
152+
) storepb.StoreServer {
130153
return &limitedStoreServer{
131154
StoreServer: store,
132155
newSeriesLimiter: NewSeriesLimiterFactory(selectLimits.SeriesPerRequest),
@@ -135,10 +158,25 @@ func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer,
135158
Name: "thanos_store_selects_dropped_total",
136159
Help: "Number of select queries that were dropped due to configured limits.",
137160
}, []string{"reason"}),
161+
pendingRequestsGauge: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
162+
Name: "thanos_store_server_pending_series_requests",
163+
Help: "Number of pending series requests",
164+
}),
165+
maxPendingRequestLimitHit: promauto.With(reg).NewCounter(prometheus.CounterOpts{
166+
Name: "thanos_store_server_hit_max_pending_series_requests_total",
167+
Help: "Number of pending series requests that hit the max pending request limit",
168+
}),
169+
maxPendingRequests: opts.MaxPendingSeriesRequests,
138170
}
139171
}
140172

141173
func (s *limitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
174+
if s.maxPendingRequests > 0 && s.pendingRequests.Load() >= s.maxPendingRequests {
175+
return status.Error(codes.ResourceExhausted, "too many pending series requests")
176+
}
177+
s.pendingRequestsGauge.Set(float64(s.pendingRequests.Add(1)))
178+
defer s.pendingRequests.Add(-1)
179+
142180
seriesLimiter := s.newSeriesLimiter(s.failedRequestsCounter.WithLabelValues("series"))
143181
chunksLimiter := s.newSamplesLimiter(s.failedRequestsCounter.WithLabelValues("chunks"))
144182
limitedSrv := newLimitedServer(srv, seriesLimiter, chunksLimiter)

pkg/store/telemetry.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ import (
1111
"github.com/prometheus/client_golang/prometheus/promauto"
1212

1313
"github.com/thanos-io/thanos/pkg/store/storepb"
14-
15-
"go.uber.org/atomic"
1614
)
1715

1816
// seriesStatsAggregator aggregates results from fanned-out queries into a histogram given their
@@ -157,12 +155,8 @@ type instrumentedStoreServer struct {
157155
storepb.StoreServer
158156
seriesRequested prometheus.Histogram
159157
chunksRequested prometheus.Histogram
160-
161-
pendingRequests prometheus.Gauge
162-
pendingRequestsCounter atomic.Int32
163158
}
164159

165-
// NewInstrumentedStoreServer creates a new instrumentedStoreServer.
166160
func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreServer) storepb.StoreServer {
167161
return &instrumentedStoreServer{
168162
StoreServer: store,
@@ -176,17 +170,11 @@ func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreSe
176170
Help: "Number of requested chunks for Series calls",
177171
Buckets: []float64{1, 100, 1000, 10000, 100000, 10000000, 100000000, 1000000000},
178172
}),
179-
pendingRequests: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
180-
Name: "thanos_store_server_pending_series_requests",
181-
Help: "Number of pending series requests",
182-
}),
183173
}
184174
}
185175

186176
func (s *instrumentedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
187177
instrumented := newInstrumentedServer(srv)
188-
s.pendingRequests.Set(float64(s.pendingRequestsCounter.Add(1)))
189-
defer s.pendingRequestsCounter.Add(-1)
190178

191179
if err := s.StoreServer.Series(req, instrumented); err != nil {
192180
return err

0 commit comments

Comments
 (0)