Skip to content

Commit e4d1523

Browse files
craig[bot]Michael Xu
andcommitted
Merge #147440
147440: kvserver/rangefeed: add metrics for lockedMuxStream.Send calls r=wenyihu6 a=xuchef Adds observability into the latency of `lockedMuxStream.Send` with a histogram metric that uses the `IOLatencyBuckets` bucket configuration (see #147440 (comment)). Also adds a counter for the number of slow sends, using the same criteria as the log line added in #146765. Resolves: #147435 Epic: [CRDB-51061](https://cockroachlabs.atlassian.net/browse/CRDB-51061) Release note: None Co-authored-by: Michael Xu <[email protected]>
2 parents a3835d3 + 15da742 commit e4d1523

File tree

3 files changed

+64
-6
lines changed

3 files changed

+64
-6
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12424,6 +12424,22 @@ layers:
1242412424
unit: BYTES
1242512425
aggregation: AVG
1242612426
derivative: NONE
12427+
- name: kv.rangefeed.mux_stream_send.latency
12428+
exported_name: kv_rangefeed_mux_stream_send_latency
12429+
description: Latency of sending RangeFeed events to the client
12430+
y_axis_label: Latency
12431+
type: HISTOGRAM
12432+
unit: NANOSECONDS
12433+
aggregation: AVG
12434+
derivative: NONE
12435+
- name: kv.rangefeed.mux_stream_send.slow_events
12436+
exported_name: kv_rangefeed_mux_stream_send_slow_events
12437+
description: Number of RangeFeed events that took longer than 10s to send to the client
12438+
y_axis_label: Events
12439+
type: COUNTER
12440+
unit: COUNT
12441+
aggregation: AVG
12442+
derivative: NON_NEGATIVE_DERIVATIVE
1242712443
- name: kv.rangefeed.output_loop_unbuffered_registration_nanos
1242812444
exported_name: kv_rangefeed_output_loop_unbuffered_registration_nanos
1242912445
description: Duration of the Rangefeed O(range) output loop goroutine. This is only applicable for unbuffered registrations since buffered registrations spawns long-living goroutines.

pkg/kv/kvserver/rangefeed/metrics.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,3 +289,38 @@ func NewBufferedSenderMetrics() *BufferedSenderMetrics {
289289
BufferedSenderQueueSize: metric.NewGauge(metaBufferedSenderQueueSize),
290290
}
291291
}
292+
293+
var (
294+
metaRangeFeedMuxStreamSendLatencyNanos = metric.Metadata{
295+
Name: "kv.rangefeed.mux_stream_send.latency",
296+
Help: "Latency of sending RangeFeed events to the client",
297+
Measurement: "Latency",
298+
Unit: metric.Unit_NANOSECONDS,
299+
}
300+
metaRangeFeedMuxStreamSlowSends = metric.Metadata{
301+
Name: "kv.rangefeed.mux_stream_send.slow_events",
302+
Help: "Number of RangeFeed events that took longer than 10s to send to the client",
303+
Measurement: "Events",
304+
Unit: metric.Unit_COUNT,
305+
}
306+
)
307+
308+
type LockedMuxStreamMetrics struct {
309+
SendLatencyNanos metric.IHistogram
310+
SlowSends *metric.Counter
311+
}
312+
313+
// MetricStruct implements metrics.Struct interface.
314+
func (*LockedMuxStreamMetrics) MetricStruct() {}
315+
316+
func NewLockedMuxStreamMetrics(histogramWindow time.Duration) *LockedMuxStreamMetrics {
317+
return &LockedMuxStreamMetrics{
318+
SendLatencyNanos: metric.NewHistogram(metric.HistogramOptions{
319+
Mode: metric.HistogramModePrometheus,
320+
Metadata: metaRangeFeedMuxStreamSendLatencyNanos,
321+
Duration: histogramWindow,
322+
BucketConfig: metric.IOLatencyBuckets,
323+
}),
324+
SlowSends: metric.NewCounter(metaRangeFeedMuxStreamSlowSends),
325+
}
326+
}

pkg/server/node.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,8 @@ type nodeMetrics struct {
260260
StreamManagerMetrics *rangefeed.StreamManagerMetrics
261261
// BufferedSenderMetrics is for monitoring of BufferedSenders for rangefeed.
262262
// Note that there could be multiple buffered senders in a node.
263-
BufferedSenderMetrics *rangefeed.BufferedSenderMetrics
263+
BufferedSenderMetrics *rangefeed.BufferedSenderMetrics
264+
LockedMuxStreamMetrics *rangefeed.LockedMuxStreamMetrics
264265
}
265266

266267
func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) *nodeMetrics {
@@ -282,6 +283,7 @@ func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) *nodeM
282283
CrossZoneBatchResponseBytes: metric.NewCounter(metaCrossZoneBatchResponse),
283284
StreamManagerMetrics: rangefeed.NewStreamManagerMetrics(),
284285
BufferedSenderMetrics: rangefeed.NewBufferedSenderMetrics(),
286+
LockedMuxStreamMetrics: rangefeed.NewLockedMuxStreamMetrics(histogramWindow),
285287
}
286288

287289
for i := range nm.MethodCounts {
@@ -2091,6 +2093,7 @@ func (n *Node) RangeLookup(
20912093
type lockedMuxStream struct {
20922094
wrapped kvpb.Internal_MuxRangeFeedServer
20932095
sendMu syncutil.Mutex
2096+
metrics *rangefeed.LockedMuxStreamMetrics
20942097
}
20952098

20962099
func (s *lockedMuxStream) SendIsThreadSafe() {}
@@ -2109,10 +2112,11 @@ func (s *lockedMuxStream) Send(e *kvpb.MuxRangeFeedEvent) error {
21092112
// factors, e.g., the number of rangefeeds contending for the lockedMuxStream.
21102113
start := crtime.NowMono()
21112114
defer func() {
2112-
if dur := start.Elapsed(); dur > slowMuxStreamSendThreshold {
2113-
log.Infof(s.wrapped.Context(),
2114-
"slow send on stream %d for r%d took %s",
2115-
e.StreamID, e.RangeID, dur)
2115+
dur := start.Elapsed()
2116+
s.metrics.SendLatencyNanos.RecordValue(dur.Nanoseconds())
2117+
if dur > slowMuxStreamSendThreshold {
2118+
s.metrics.SlowSends.Inc(1)
2119+
log.Infof(s.wrapped.Context(), "slow send on stream %d for r%d took %s", e.StreamID, e.RangeID, dur)
21162120
}
21172121
}()
21182122

@@ -2183,7 +2187,10 @@ func (n *Node) defaultRangefeedConsumerID() int64 {
21832187

21842188
// MuxRangeFeed implements the roachpb.InternalServer interface.
21852189
func (n *Node) MuxRangeFeed(muxStream kvpb.Internal_MuxRangeFeedServer) error {
2186-
lockedMuxStream := &lockedMuxStream{wrapped: muxStream}
2190+
lockedMuxStream := &lockedMuxStream{
2191+
wrapped: muxStream,
2192+
metrics: n.metrics.LockedMuxStreamMetrics,
2193+
}
21872194

21882195
// All context created below should derive from this context, which is
21892196
// cancelled once MuxRangeFeed exits.

0 commit comments

Comments
 (0)