Skip to content

Commit 2fecf4d

Browse files
authored
Reject series/write requests when max pending request limit is hit (#114)
2 parents a26ab5f + cb958ba commit 2fecf4d

File tree

5 files changed

+125
-32
lines changed

5 files changed

+125
-32
lines changed

cmd/thanos/receive.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,19 @@ func runReceive(
268268
return errors.Wrap(err, "parse limit configuration")
269269
}
270270
}
271-
limiter, err := receive.NewLimiter(conf.writeLimitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"), conf.limitsConfigReloadTimer)
271+
if conf.maxPendingGrpcWriteRequests > 0 {
272+
level.Info(logger).Log("msg", "set max pending gRPC write request in limiter", "max_pending_requests", conf.maxPendingGrpcWriteRequests)
273+
}
274+
limiter, err := receive.NewLimiterWithOptions(
275+
conf.writeLimitsConfig,
276+
reg,
277+
receiveMode,
278+
log.With(logger, "component", "receive-limiter"),
279+
conf.limitsConfigReloadTimer,
280+
receive.LimiterOptions{
281+
MaxPendingRequests: int32(conf.maxPendingGrpcWriteRequests),
282+
},
283+
)
272284
if err != nil {
273285
return errors.Wrap(err, "creating limiter")
274286
}
@@ -408,6 +420,7 @@ func runReceive(
408420
store.LazyRetrieval,
409421
options...,
410422
)
423+
411424
mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits)
412425
rw := store.ReadWriteTSDBStore{
413426
StoreServer: mts,
@@ -974,6 +987,7 @@ type receiveConfig struct {
974987
topMetricsMinimumCardinality uint64
975988
topMetricsUpdateInterval time.Duration
976989
matcherConverterCacheCapacity int
990+
maxPendingGrpcWriteRequests int
977991

978992
featureList *[]string
979993
}
@@ -1138,6 +1152,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
11381152
Default("5m").DurationVar(&rc.topMetricsUpdateInterval)
11391153
cmd.Flag("receive.store-matcher-converter-cache-capacity", "The number of label matchers to cache in the matcher converter for the Store API. Set to 0 to disable to cache. Default is 0.").
11401154
Default("0").IntVar(&rc.matcherConverterCacheCapacity)
1155+
cmd.Flag("receive.max-pending-grcp-write-requests", "Reject right away gRPC write requests when this number of requests are pending. Value 0 disables this feature.").
1156+
Default("0").IntVar(&rc.maxPendingGrpcWriteRequests)
11411157
rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings()
11421158
}
11431159

pkg/receive/handler.go

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import (
3636
"github.com/prometheus/prometheus/tsdb"
3737
"go.opentelemetry.io/otel/attribute"
3838
"go.opentelemetry.io/otel/trace"
39-
"go.uber.org/atomic"
4039
"golang.org/x/exp/slices"
4140
"google.golang.org/grpc"
4241
"google.golang.org/grpc/codes"
@@ -142,9 +141,6 @@ type Handler struct {
142141
writeTimeseriesTotal *prometheus.HistogramVec
143142
writeE2eLatency *prometheus.HistogramVec
144143

145-
pendingWriteRequests prometheus.Gauge
146-
pendingWriteRequestsCounter atomic.Int32
147-
148144
Limiter *Limiter
149145
}
150146

@@ -235,12 +231,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
235231
Buckets: []float64{1, 5, 10, 20, 30, 40, 50, 60, 90, 120, 300, 600, 900, 1200, 1800, 3600},
236232
}, []string{"code", "tenant", "rollup"},
237233
),
238-
pendingWriteRequests: promauto.With(registerer).NewGauge(
239-
prometheus.GaugeOpts{
240-
Name: "thanos_receive_pending_write_requests",
241-
Help: "The number of pending write requests.",
242-
},
243-
),
244234
}
245235

246236
h.forwardRequests.WithLabelValues(labelSuccess)
@@ -1083,12 +1073,15 @@ func quorumReached(successes []int, successThreshold int) bool {
10831073

10841074
// RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore.
10851075
func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*storepb.WriteResponse, error) {
1076+
if h.Limiter.ShouldRejectNewRequest() {
1077+
return nil, status.Error(codes.ResourceExhausted, "too many pending write requests")
1078+
}
1079+
// NB: ShouldRejectNewRequest() increments the number of pending requests only when it returns false.
1080+
defer h.Limiter.DecrementPendingRequests()
1081+
10861082
span, ctx := tracing.StartSpan(ctx, "receive_grpc")
10871083
defer span.Finish()
10881084

1089-
h.pendingWriteRequests.Set(float64(h.pendingWriteRequestsCounter.Add(1)))
1090-
defer h.pendingWriteRequestsCounter.Add(-1)
1091-
10921085
_, err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries})
10931086
if err != nil {
10941087
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)

pkg/receive/limiter.go

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"github.com/prometheus/client_golang/prometheus"
1919
"github.com/thanos-io/thanos/pkg/extprom"
2020
"github.com/thanos-io/thanos/pkg/gate"
21+
22+
"go.uber.org/atomic"
2123
)
2224

2325
// Limiter is responsible for managing the configuration and initialization of
@@ -35,6 +37,19 @@ type Limiter struct {
3537
configReloadFailedCounter prometheus.Counter
3638
receiverMode ReceiverMode
3739
configReloadTimer time.Duration
40+
41+
// Reject a request if this limit is reached.
42+
// This filed is set at the instance creation and never changes afterwards.
43+
// So it's safe to read it without a lock.
44+
maxPendingRequests int32
45+
maxPendingRequestLimitHit prometheus.Counter
46+
pendingRequests atomic.Int32
47+
pendingRequestsGauge prometheus.Gauge
48+
}
49+
50+
type LimiterOptions struct {
51+
// Value 0 disables the max pending request limiting hehavior.
52+
MaxPendingRequests int32
3853
}
3954

4055
// headSeriesLimiter encompasses active/head series limiting logic.
@@ -62,16 +77,50 @@ func (l *Limiter) HeadSeriesLimiter() headSeriesLimiter {
6277
return l.headSeriesLimiter
6378
}
6479

80+
func (l *Limiter) ShouldRejectNewRequest() bool {
81+
// maxPendingRequests doesn't change once set when a limiter instance is created.
82+
// So, it's safe to read it without a lock.
83+
if l.maxPendingRequests > 0 && l.pendingRequests.Load() >= l.maxPendingRequests {
84+
if l.maxPendingRequestLimitHit != nil {
85+
l.maxPendingRequestLimitHit.Inc()
86+
}
87+
return true
88+
}
89+
newValue := l.pendingRequests.Add(1)
90+
if l.pendingRequestsGauge != nil {
91+
l.pendingRequestsGauge.Set(float64(newValue))
92+
}
93+
return false
94+
}
95+
96+
func (l *Limiter) DecrementPendingRequests() {
97+
newValue := l.pendingRequests.Add(-1)
98+
if l.pendingRequestsGauge != nil {
99+
l.pendingRequestsGauge.Set(float64(newValue))
100+
}
101+
}
102+
65103
// NewLimiter creates a new *Limiter given a configuration and prometheus
66104
// registerer.
67105
func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMode, logger log.Logger, configReloadTimer time.Duration) (*Limiter, error) {
106+
return NewLimiterWithOptions(configFile, reg, r, logger, configReloadTimer, LimiterOptions{})
107+
}
108+
109+
func NewLimiterWithOptions(
110+
configFile fileContent,
111+
reg prometheus.Registerer,
112+
r ReceiverMode,
113+
logger log.Logger,
114+
configReloadTimer time.Duration,
115+
opts LimiterOptions) (*Limiter, error) {
68116
limiter := &Limiter{
69-
writeGate: gate.NewNoop(),
70-
requestLimiter: &noopRequestLimiter{},
71-
headSeriesLimiter: NewNopSeriesLimit(),
72-
logger: logger,
73-
receiverMode: r,
74-
configReloadTimer: configReloadTimer,
117+
writeGate: gate.NewNoop(),
118+
requestLimiter: &noopRequestLimiter{},
119+
headSeriesLimiter: NewNopSeriesLimit(),
120+
logger: logger,
121+
receiverMode: r,
122+
configReloadTimer: configReloadTimer,
123+
maxPendingRequests: opts.MaxPendingRequests,
75124
}
76125

77126
if reg != nil {
@@ -92,6 +141,26 @@ func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMod
92141
Help: "How many times the limit configuration failed to reload.",
93142
},
94143
)
144+
limiter.configReloadFailedCounter = promauto.With(limiter.registerer).NewCounter(
145+
prometheus.CounterOpts{
146+
Namespace: "thanos",
147+
Subsystem: "receive",
148+
Name: "limits_config_reload_err_total",
149+
Help: "How many times the limit configuration failed to reload.",
150+
},
151+
)
152+
limiter.maxPendingRequestLimitHit = promauto.With(limiter.registerer).NewCounter(
153+
prometheus.CounterOpts{
154+
Name: "thanos_receive_max_pending_write_request_limit_hit_total",
155+
Help: "Number of times the max pending write request limit was hit",
156+
},
157+
)
158+
limiter.pendingRequestsGauge = promauto.With(limiter.registerer).NewGauge(
159+
prometheus.GaugeOpts{
160+
Name: "thanos_receive_pending_write_requests",
161+
Help: "Number of pending write requests",
162+
},
163+
)
95164
}
96165

97166
if configFile == nil {

pkg/store/limiter.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"github.com/prometheus/client_golang/prometheus"
1212
"github.com/prometheus/client_golang/prometheus/promauto"
1313
"go.uber.org/atomic"
14+
"google.golang.org/grpc/codes"
15+
"google.golang.org/grpc/status"
1416

1517
"github.com/thanos-io/thanos/pkg/extkingpin"
1618
"github.com/thanos-io/thanos/pkg/store/storepb"
@@ -108,11 +110,14 @@ func NewBytesLimiterFactory(limit units.Base2Bytes) BytesLimiterFactory {
108110
type SeriesSelectLimits struct {
109111
SeriesPerRequest uint64
110112
SamplesPerRequest uint64
113+
PendingRequests int32
111114
}
112115

113116
func (l *SeriesSelectLimits) RegisterFlags(cmd extkingpin.FlagClause) {
114117
cmd.Flag("store.limits.request-series", "The maximum series allowed for a single Series request. The Series call fails if this limit is exceeded. 0 means no limit.").Default("0").Uint64Var(&l.SeriesPerRequest)
115118
cmd.Flag("store.limits.request-samples", "The maximum samples allowed for a single Series request, The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains a maximum of 120 samples.").Default("0").Uint64Var(&l.SamplesPerRequest)
119+
cmd.Flag("store.limits.pending-requests", "Reject gRPC series requests right away when this number of requests are pending. Value 0 disables this feature.").
120+
Default("0").Int32Var(&l.PendingRequests)
116121
}
117122

118123
var _ storepb.StoreServer = &limitedStoreServer{}
@@ -123,6 +128,13 @@ type limitedStoreServer struct {
123128
newSeriesLimiter SeriesLimiterFactory
124129
newSamplesLimiter ChunksLimiterFactory
125130
failedRequestsCounter *prometheus.CounterVec
131+
132+
// This is a read-only field once it's set.
133+
// Value 0 disables the feature.
134+
maxPendingRequests int32
135+
pendingRequests atomic.Int32
136+
maxPendingRequestLimitHit prometheus.Counter
137+
pendingRequestsGauge prometheus.Gauge
126138
}
127139

128140
// NewLimitedStoreServer creates a new limitedStoreServer.
@@ -135,10 +147,25 @@ func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer,
135147
Name: "thanos_store_selects_dropped_total",
136148
Help: "Number of select queries that were dropped due to configured limits.",
137149
}, []string{"reason"}),
150+
pendingRequestsGauge: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
151+
Name: "thanos_store_server_pending_series_requests",
152+
Help: "Number of pending series requests",
153+
}),
154+
maxPendingRequestLimitHit: promauto.With(reg).NewCounter(prometheus.CounterOpts{
155+
Name: "thanos_store_server_hit_max_pending_series_requests_total",
156+
Help: "Number of pending series requests that hit the max pending request limit",
157+
}),
158+
maxPendingRequests: selectLimits.PendingRequests,
138159
}
139160
}
140161

141162
func (s *limitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
163+
if s.maxPendingRequests > 0 && s.pendingRequests.Load() >= s.maxPendingRequests {
164+
return status.Error(codes.ResourceExhausted, "too many pending series requests")
165+
}
166+
s.pendingRequestsGauge.Set(float64(s.pendingRequests.Add(1)))
167+
defer s.pendingRequests.Add(-1)
168+
142169
seriesLimiter := s.newSeriesLimiter(s.failedRequestsCounter.WithLabelValues("series"))
143170
chunksLimiter := s.newSamplesLimiter(s.failedRequestsCounter.WithLabelValues("chunks"))
144171
limitedSrv := newLimitedServer(srv, seriesLimiter, chunksLimiter)

pkg/store/telemetry.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ import (
1111
"github.com/prometheus/client_golang/prometheus/promauto"
1212

1313
"github.com/thanos-io/thanos/pkg/store/storepb"
14-
15-
"go.uber.org/atomic"
1614
)
1715

1816
// seriesStatsAggregator aggregates results from fanned-out queries into a histogram given their
@@ -157,12 +155,8 @@ type instrumentedStoreServer struct {
157155
storepb.StoreServer
158156
seriesRequested prometheus.Histogram
159157
chunksRequested prometheus.Histogram
160-
161-
pendingRequests prometheus.Gauge
162-
pendingRequestsCounter atomic.Int32
163158
}
164159

165-
// NewInstrumentedStoreServer creates a new instrumentedStoreServer.
166160
func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreServer) storepb.StoreServer {
167161
return &instrumentedStoreServer{
168162
StoreServer: store,
@@ -176,17 +170,11 @@ func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreSe
176170
Help: "Number of requested chunks for Series calls",
177171
Buckets: []float64{1, 100, 1000, 10000, 100000, 10000000, 100000000, 1000000000},
178172
}),
179-
pendingRequests: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
180-
Name: "thanos_store_server_pending_series_requests",
181-
Help: "Number of pending series requests",
182-
}),
183173
}
184174
}
185175

186176
func (s *instrumentedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
187177
instrumented := newInstrumentedServer(srv)
188-
s.pendingRequests.Set(float64(s.pendingRequestsCounter.Add(1)))
189-
defer s.pendingRequestsCounter.Add(-1)
190178

191179
if err := s.StoreServer.Series(req, instrumented); err != nil {
192180
return err

0 commit comments

Comments
 (0)