Skip to content

Commit e34d669

Browse files
committed
Instrument pending read/write requests in Receive
1 parent 1c69c7e commit e34d669

File tree

2 files changed

+24
-0
lines changed

2 files changed

+24
-0
lines changed

pkg/receive/handler.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"strconv"
1818
"strings"
1919
"sync"
20+
"sync/atomic"
2021
"time"
2122

2223
"github.com/go-kit/log"
@@ -141,6 +142,9 @@ type Handler struct {
141142
writeTimeseriesTotal *prometheus.HistogramVec
142143
writeE2eLatency *prometheus.HistogramVec
143144

145+
pendingWriteRequests prometheus.Gauge
146+
pendingWriteRequestsCounter atomic.Int32
147+
144148
Limiter *Limiter
145149
}
146150

@@ -231,6 +235,12 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
231235
Buckets: []float64{1, 5, 10, 20, 30, 40, 50, 60, 90, 120, 300, 600, 900, 1200, 1800, 3600},
232236
}, []string{"code", "tenant", "rollup"},
233237
),
238+
pendingWriteRequests: promauto.With(registerer).NewGauge(
239+
prometheus.GaugeOpts{
240+
Name: "thanos_receive_pending_write_requests",
241+
Help: "The number of pending write requests.",
242+
},
243+
),
234244
}
235245

236246
h.forwardRequests.WithLabelValues(labelSuccess)
@@ -1076,6 +1086,9 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
10761086
span, ctx := tracing.StartSpan(ctx, "receive_grpc")
10771087
defer span.Finish()
10781088

1089+
h.pendingWriteRequests.Set(float64(h.pendingWriteRequestsCounter.Add(1)))
1090+
defer h.pendingWriteRequestsCounter.Add(-1)
1091+
10791092
_, err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries})
10801093
if err != nil {
10811094
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)

pkg/store/telemetry.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package store
66
import (
77
"sort"
88
"strconv"
9+
"sync/atomic"
910

1011
"github.com/prometheus/client_golang/prometheus"
1112
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -155,6 +156,9 @@ type instrumentedStoreServer struct {
155156
storepb.StoreServer
156157
seriesRequested prometheus.Histogram
157158
chunksRequested prometheus.Histogram
159+
160+
pendingRequests prometheus.Gauge
161+
pendingRequestsCounter atomic.Int32
158162
}
159163

160164
// NewInstrumentedStoreServer creates a new instrumentedStoreServer.
@@ -171,11 +175,18 @@ func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreSe
171175
Help: "Number of requested chunks for Series calls",
172176
Buckets: []float64{1, 100, 1000, 10000, 100000, 10000000, 100000000, 1000000000},
173177
}),
178+
pendingRequests: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
179+
Name: "thanos_store_server_pending_series_requests",
180+
Help: "Number of pending series requests",
181+
}),
174182
}
175183
}
176184

177185
func (s *instrumentedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
178186
instrumented := newInstrumentedServer(srv)
187+
s.pendingRequests.Set(float64(s.pendingRequestsCounter.Add(1)))
188+
defer s.pendingRequestsCounter.Add(-1)
189+
179190
if err := s.StoreServer.Series(req, instrumented); err != nil {
180191
return err
181192
}

0 commit comments

Comments
 (0)