Skip to content

Commit a26ab5f

Browse files
authored
Instrument pending read/write requests in Receive (#113)
2 parents d3ed259 + 1cc6f6b commit a26ab5f

File tree

2 files changed

+25
-0
lines changed

2 files changed

+25
-0
lines changed

pkg/receive/handler.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/prometheus/prometheus/tsdb"
3737
"go.opentelemetry.io/otel/attribute"
3838
"go.opentelemetry.io/otel/trace"
39+
"go.uber.org/atomic"
3940
"golang.org/x/exp/slices"
4041
"google.golang.org/grpc"
4142
"google.golang.org/grpc/codes"
@@ -141,6 +142,9 @@ type Handler struct {
141142
writeTimeseriesTotal *prometheus.HistogramVec
142143
writeE2eLatency *prometheus.HistogramVec
143144

145+
pendingWriteRequests prometheus.Gauge
146+
pendingWriteRequestsCounter atomic.Int32
147+
144148
Limiter *Limiter
145149
}
146150

@@ -231,6 +235,12 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
231235
Buckets: []float64{1, 5, 10, 20, 30, 40, 50, 60, 90, 120, 300, 600, 900, 1200, 1800, 3600},
232236
}, []string{"code", "tenant", "rollup"},
233237
),
238+
pendingWriteRequests: promauto.With(registerer).NewGauge(
239+
prometheus.GaugeOpts{
240+
Name: "thanos_receive_pending_write_requests",
241+
Help: "The number of pending write requests.",
242+
},
243+
),
234244
}
235245

236246
h.forwardRequests.WithLabelValues(labelSuccess)
@@ -1076,6 +1086,9 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
10761086
span, ctx := tracing.StartSpan(ctx, "receive_grpc")
10771087
defer span.Finish()
10781088

1089+
h.pendingWriteRequests.Set(float64(h.pendingWriteRequestsCounter.Add(1)))
1090+
defer h.pendingWriteRequestsCounter.Add(-1)
1091+
10791092
_, err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries})
10801093
if err != nil {
10811094
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)

pkg/store/telemetry.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"github.com/prometheus/client_golang/prometheus/promauto"
1212

1313
"github.com/thanos-io/thanos/pkg/store/storepb"
14+
15+
"go.uber.org/atomic"
1416
)
1517

1618
// seriesStatsAggregator aggregates results from fanned-out queries into a histogram given their
@@ -155,6 +157,9 @@ type instrumentedStoreServer struct {
155157
storepb.StoreServer
156158
seriesRequested prometheus.Histogram
157159
chunksRequested prometheus.Histogram
160+
161+
pendingRequests prometheus.Gauge
162+
pendingRequestsCounter atomic.Int32
158163
}
159164

160165
// NewInstrumentedStoreServer creates a new instrumentedStoreServer.
@@ -171,11 +176,18 @@ func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreSe
171176
Help: "Number of requested chunks for Series calls",
172177
Buckets: []float64{1, 100, 1000, 10000, 100000, 10000000, 100000000, 1000000000},
173178
}),
179+
pendingRequests: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
180+
Name: "thanos_store_server_pending_series_requests",
181+
Help: "Number of pending series requests",
182+
}),
174183
}
175184
}
176185

177186
func (s *instrumentedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
178187
instrumented := newInstrumentedServer(srv)
188+
s.pendingRequests.Set(float64(s.pendingRequestsCounter.Add(1)))
189+
defer s.pendingRequestsCounter.Add(-1)
190+
179191
if err := s.StoreServer.Series(req, instrumented); err != nil {
180192
return err
181193
}

0 commit comments

Comments
 (0)