@@ -195,6 +195,30 @@ This metric is thus not an indicator of KV health.`,
195195 Measurement : "Bytes" ,
196196 Unit : metric .Unit_BYTES ,
197197 }
198+ metaActiveRangeFeed = metric.Metadata {
199+ Name : "rpc.streams.rangefeed.active" ,
200+ Help : `Number of currently running RangeFeed streams` ,
201+ Measurement : "Streams" ,
202+ Unit : metric .Unit_COUNT ,
203+ }
204+ metaTotalRangeFeed = metric.Metadata {
205+ Name : "rpc.streams.rangefeed.recv" ,
206+ Help : `Total number of RangeFeed streams` ,
207+ Measurement : "Streams" ,
208+ Unit : metric .Unit_COUNT ,
209+ }
210+ metaActiveMuxRangeFeed = metric.Metadata {
211+ Name : "rpc.streams.mux_rangefeed.active" ,
212+ Help : `Number of currently running MuxRangeFeed streams` ,
213+ Measurement : "Streams" ,
214+ Unit : metric .Unit_COUNT ,
215+ }
216+ metaTotalMuxRangeFeed = metric.Metadata {
217+ Name : "rpc.streams.mux_rangefeed.recv" ,
218+ Help : `Total number of MuxRangeFeed streams` ,
219+ Measurement : "Streams" ,
220+ Unit : metric .Unit_COUNT ,
221+ }
198222)
199223
200224// Cluster settings.
@@ -243,6 +267,10 @@ type nodeMetrics struct {
243267 CrossRegionBatchResponseBytes * metric.Counter
244268 CrossZoneBatchRequestBytes * metric.Counter
245269 CrossZoneBatchResponseBytes * metric.Counter
270+ NumRangeFeed * metric.Counter
271+ ActiveRangeFeed * metric.Gauge
272+ NumMuxRangeFeed * metric.Counter
273+ ActiveMuxRangeFeed * metric.Gauge
246274}
247275
248276func makeNodeMetrics (reg * metric.Registry , histogramWindow time.Duration ) nodeMetrics {
@@ -263,6 +291,10 @@ func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) nodeMe
263291 CrossRegionBatchResponseBytes : metric .NewCounter (metaCrossRegionBatchResponse ),
264292 CrossZoneBatchRequestBytes : metric .NewCounter (metaCrossZoneBatchRequest ),
265293 CrossZoneBatchResponseBytes : metric .NewCounter (metaCrossZoneBatchResponse ),
294+ ActiveRangeFeed : metric .NewGauge (metaActiveRangeFeed ),
295+ NumRangeFeed : metric .NewCounter (metaTotalRangeFeed ),
296+ ActiveMuxRangeFeed : metric .NewGauge (metaActiveMuxRangeFeed ),
297+ NumMuxRangeFeed : metric .NewCounter (metaTotalMuxRangeFeed ),
266298 }
267299
268300 for i := range nm .MethodCounts {
@@ -1630,6 +1662,10 @@ func (n *Node) RangeFeed(args *kvpb.RangeFeedRequest, stream kvpb.Internal_Range
16301662 _ , restore := pprofutil .SetProfilerLabelsFromCtxTags (ctx )
16311663 defer restore ()
16321664
1665+ n .metrics .NumRangeFeed .Inc (1 )
1666+ n .metrics .ActiveRangeFeed .Inc (1 )
1667+ defer n .metrics .ActiveRangeFeed .Inc (- 1 )
1668+
16331669 if err := errors .CombineErrors (future .Wait (ctx , n .stores .RangeFeed (args , stream ))); err != nil {
16341670 // Got stream context error, probably won't be able to propagate it to the stream,
16351671 // but give it a try anyway.
@@ -1765,6 +1801,10 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
17651801 }
17661802 defer cleanup ()
17671803
1804+ n .metrics .NumMuxRangeFeed .Inc (1 )
1805+ n .metrics .ActiveMuxRangeFeed .Inc (1 )
1806+ defer n .metrics .ActiveMuxRangeFeed .Inc (- 1 )
1807+
17681808 for {
17691809 req , err := stream .Recv ()
17701810 if err != nil {
@@ -1782,9 +1822,11 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
17821822 wrapped : muxStream ,
17831823 }
17841824
1785- // TODO(yevgeniy): Add observability into actively running rangefeeds.
1825+ n .metrics .NumMuxRangeFeed .Inc (1 )
1826+ n .metrics .ActiveMuxRangeFeed .Inc (1 )
17861827 f := n .stores .RangeFeed (req , & sink )
17871828 f .WhenReady (func (err error ) {
1829+ n .metrics .ActiveMuxRangeFeed .Inc (- 1 )
17881830 if err == nil {
17891831 cause := kvpb .RangeFeedRetryError_REASON_RANGEFEED_CLOSED
17901832 if ! n .storeCfg .Settings .Version .IsActive (stream .Context (), clusterversion .V23_2 ) {
0 commit comments