Skip to content

Commit 15da742

Browse files
author
Michael Xu
committed
kvserver/rangefeed: add metrics for lockedMuxStream.Send calls
Adds observability into the latency of `lockedMuxStream.Send` with a histogram metric that uses the `IOLatencyBuckets` bucket configuration (see #147440 (comment)). Also adds a counter for the number of slow sends, using the same criteria as the log line added in #146765. Resolves: #147435 Epic: CRDB-51061 Release note: None
1 parent 19c51dc commit 15da742

File tree

3 files changed

+64
-6
lines changed

3 files changed

+64
-6
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12370,6 +12370,22 @@ layers:
1237012370
unit: BYTES
1237112371
aggregation: AVG
1237212372
derivative: NONE
12373+
- name: kv.rangefeed.mux_stream_send.latency
12374+
exported_name: kv_rangefeed_mux_stream_send_latency
12375+
description: Latency of sending RangeFeed events to the client
12376+
y_axis_label: Latency
12377+
type: HISTOGRAM
12378+
unit: NANOSECONDS
12379+
aggregation: AVG
12380+
derivative: NONE
12381+
- name: kv.rangefeed.mux_stream_send.slow_events
12382+
exported_name: kv_rangefeed_mux_stream_send_slow_events
12383+
description: Number of RangeFeed events that took longer than 10s to send to the client
12384+
y_axis_label: Events
12385+
type: COUNTER
12386+
unit: COUNT
12387+
aggregation: AVG
12388+
derivative: NON_NEGATIVE_DERIVATIVE
1237312389
- name: kv.rangefeed.output_loop_unbuffered_registration_nanos
1237412390
exported_name: kv_rangefeed_output_loop_unbuffered_registration_nanos
1237512391
description: Duration of the Rangefeed O(range) output loop goroutine. This is only applicable for unbuffered registrations since buffered registrations spawns long-living goroutines.

pkg/kv/kvserver/rangefeed/metrics.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,3 +289,38 @@ func NewBufferedSenderMetrics() *BufferedSenderMetrics {
289289
BufferedSenderQueueSize: metric.NewGauge(metaBufferedSenderQueueSize),
290290
}
291291
}
292+
293+
var (
294+
metaRangeFeedMuxStreamSendLatencyNanos = metric.Metadata{
295+
Name: "kv.rangefeed.mux_stream_send.latency",
296+
Help: "Latency of sending RangeFeed events to the client",
297+
Measurement: "Latency",
298+
Unit: metric.Unit_NANOSECONDS,
299+
}
300+
metaRangeFeedMuxStreamSlowSends = metric.Metadata{
301+
Name: "kv.rangefeed.mux_stream_send.slow_events",
302+
Help: "Number of RangeFeed events that took longer than 10s to send to the client",
303+
Measurement: "Events",
304+
Unit: metric.Unit_COUNT,
305+
}
306+
)
307+
308+
type LockedMuxStreamMetrics struct {
309+
SendLatencyNanos metric.IHistogram
310+
SlowSends *metric.Counter
311+
}
312+
313+
// MetricStruct implements metrics.Struct interface.
314+
func (*LockedMuxStreamMetrics) MetricStruct() {}
315+
316+
func NewLockedMuxStreamMetrics(histogramWindow time.Duration) *LockedMuxStreamMetrics {
317+
return &LockedMuxStreamMetrics{
318+
SendLatencyNanos: metric.NewHistogram(metric.HistogramOptions{
319+
Mode: metric.HistogramModePrometheus,
320+
Metadata: metaRangeFeedMuxStreamSendLatencyNanos,
321+
Duration: histogramWindow,
322+
BucketConfig: metric.IOLatencyBuckets,
323+
}),
324+
SlowSends: metric.NewCounter(metaRangeFeedMuxStreamSlowSends),
325+
}
326+
}

pkg/server/node.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,8 @@ type nodeMetrics struct {
260260
StreamManagerMetrics *rangefeed.StreamManagerMetrics
261261
// BufferedSenderMetrics is for monitoring of BufferedSenders for rangefeed.
262262
// Note that there could be multiple buffered senders in a node.
263-
BufferedSenderMetrics *rangefeed.BufferedSenderMetrics
263+
BufferedSenderMetrics *rangefeed.BufferedSenderMetrics
264+
LockedMuxStreamMetrics *rangefeed.LockedMuxStreamMetrics
264265
}
265266

266267
func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) *nodeMetrics {
@@ -282,6 +283,7 @@ func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) *nodeM
282283
CrossZoneBatchResponseBytes: metric.NewCounter(metaCrossZoneBatchResponse),
283284
StreamManagerMetrics: rangefeed.NewStreamManagerMetrics(),
284285
BufferedSenderMetrics: rangefeed.NewBufferedSenderMetrics(),
286+
LockedMuxStreamMetrics: rangefeed.NewLockedMuxStreamMetrics(histogramWindow),
285287
}
286288

287289
for i := range nm.MethodCounts {
@@ -2104,6 +2106,7 @@ func (n *Node) RangeLookup(
21042106
type lockedMuxStream struct {
21052107
wrapped kvpb.Internal_MuxRangeFeedServer
21062108
sendMu syncutil.Mutex
2109+
metrics *rangefeed.LockedMuxStreamMetrics
21072110
}
21082111

21092112
func (s *lockedMuxStream) SendIsThreadSafe() {}
@@ -2122,10 +2125,11 @@ func (s *lockedMuxStream) Send(e *kvpb.MuxRangeFeedEvent) error {
21222125
// factors, e.g., the number of rangefeeds contending for the lockedMuxStream.
21232126
start := crtime.NowMono()
21242127
defer func() {
2125-
if dur := start.Elapsed(); dur > slowMuxStreamSendThreshold {
2126-
log.Infof(s.wrapped.Context(),
2127-
"slow send on stream %d for r%d took %s",
2128-
e.StreamID, e.RangeID, dur)
2128+
dur := start.Elapsed()
2129+
s.metrics.SendLatencyNanos.RecordValue(dur.Nanoseconds())
2130+
if dur > slowMuxStreamSendThreshold {
2131+
s.metrics.SlowSends.Inc(1)
2132+
log.Infof(s.wrapped.Context(), "slow send on stream %d for r%d took %s", e.StreamID, e.RangeID, dur)
21292133
}
21302134
}()
21312135

@@ -2196,7 +2200,10 @@ func (n *Node) defaultRangefeedConsumerID() int64 {
21962200

21972201
// MuxRangeFeed implements the roachpb.InternalServer interface.
21982202
func (n *Node) MuxRangeFeed(muxStream kvpb.Internal_MuxRangeFeedServer) error {
2199-
lockedMuxStream := &lockedMuxStream{wrapped: muxStream}
2203+
lockedMuxStream := &lockedMuxStream{
2204+
wrapped: muxStream,
2205+
metrics: n.metrics.LockedMuxStreamMetrics,
2206+
}
22002207

22012208
// All context created below should derive from this context, which is
22022209
// cancelled once MuxRangeFeed exits.

0 commit comments

Comments
 (0)