Skip to content

Commit 5438a46

Browse files
committed
Fix load shedding
1 parent 95cb360 commit 5438a46

File tree

3 files changed

+24
-14
lines changed

3 files changed

+24
-14
lines changed

pkg/receive/handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,8 +1073,8 @@ func quorumReached(successes []int, successThreshold int) bool {
10731073

10741074
// RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore.
10751075
func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*storepb.WriteResponse, error) {
1076-
if h.Limiter.ShouldRejectNewRequest() {
1077-
return nil, status.Error(codes.ResourceExhausted, "too many pending write requests")
1076+
if rejected, msg := h.Limiter.ShouldRejectNewRequest(); rejected {
1077+
return nil, status.Error(codes.ResourceExhausted, msg)
10781078
}
10791079
// NB: ShouldRejectNewRequest() increments the number of pending requests only when it returns false.
10801080
defer h.Limiter.DecrementPendingRequests()

pkg/receive/limiter.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,27 +77,29 @@ func (l *Limiter) HeadSeriesLimiter() headSeriesLimiter {
7777
return l.headSeriesLimiter
7878
}
7979

80-
func (l *Limiter) ShouldRejectNewRequest() bool {
80+
func (l *Limiter) ShouldRejectNewRequest() (bool, string) {
8181
// maxPendingRequests doesn't change once set when a limiter instance is created.
8282
// So, it's safe to read it without a lock.
83-
if l.maxPendingRequests > 0 && l.pendingRequests.Load() >= l.maxPendingRequests {
84-
if l.maxPendingRequestLimitHit != nil {
85-
l.maxPendingRequestLimitHit.Inc()
83+
if l.maxPendingRequests > 0 {
84+
if pendingRequests := l.pendingRequests.Load(); pendingRequests >= l.maxPendingRequests {
85+
if l.maxPendingRequestLimitHit != nil {
86+
l.maxPendingRequestLimitHit.Inc()
87+
}
88+
if l.pendingRequestsGauge != nil {
89+
l.pendingRequestsGauge.Set(float64(pendingRequests))
90+
}
91+
return true, fmt.Sprintf("too many pending write requests: %d >= %d", l.pendingRequests.Load(), l.maxPendingRequests)
8692
}
87-
return true
8893
}
8994
newValue := l.pendingRequests.Add(1)
9095
if l.pendingRequestsGauge != nil {
9196
l.pendingRequestsGauge.Set(float64(newValue))
9297
}
93-
return false
98+
return false, ""
9499
}
95100

96101
func (l *Limiter) DecrementPendingRequests() {
97-
newValue := l.pendingRequests.Add(-1)
98-
if l.pendingRequestsGauge != nil {
99-
l.pendingRequestsGauge.Set(float64(newValue))
100-
}
102+
l.pendingRequests.Add(-1)
101103
}
102104

103105
// NewLimiter creates a new *Limiter given a configuration and prometheus

pkg/store/limiter.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package store
55

66
import (
7+
"fmt"
78
"sync"
89

910
"github.com/alecthomas/units"
@@ -160,8 +161,15 @@ func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer,
160161
}
161162

162163
func (s *limitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
163-
if s.maxPendingRequests > 0 && s.pendingRequests.Load() >= s.maxPendingRequests {
164-
return status.Error(codes.ResourceExhausted, "too many pending series requests")
164+
if s.maxPendingRequests > 0 {
165+
if pendingRequests := s.pendingRequests.Load(); pendingRequests >= s.maxPendingRequests {
166+
s.maxPendingRequestLimitHit.Inc()
167+
s.pendingRequestsGauge.Set(float64(pendingRequests))
168+
return status.Error(
169+
codes.ResourceExhausted,
170+
fmt.Sprintf("too many pending series requests: %d >= %d", s.pendingRequests.Load(), s.maxPendingRequests),
171+
)
172+
}
165173
}
166174
s.pendingRequestsGauge.Set(float64(s.pendingRequests.Add(1)))
167175
defer s.pendingRequests.Add(-1)

0 commit comments

Comments
 (0)