Skip to content

Commit 6a6de95

Browse files
committed
rpc,server: add grpc server interceptor to instrument rpcs
Adds a new grpc.UnaryServerInterceptor that records the duration of a grpc request. The metric is labeled by the grpc full method name and status code. This is disabled by default and can be enabled by setting `server.grpc.metrics.enabled` to true. Only grpc methods that begin with "/cockroach.server" or "/cockroach.ts" will be recorded. This metric uses metric.HistogramVec and is exported to external collectors. Epic: None Release note: None
1 parent 8b14b92 commit 6a6de95

File tree

13 files changed

+324
-5
lines changed

13 files changed

+324
-5
lines changed

docs/generated/metrics/metrics.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1594,6 +1594,7 @@
15941594
<tr><td>APPLICATION</td><td>rpc.connection.inactive</td><td>Gauge of current connections in an inactive state and pending deletion; these are not healthy but are not tracked as unhealthy either because there is reason to believe that the connection is no longer relevant,for example if the node has since been seen under a new address</td><td>Connections</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
15951595
<tr><td>APPLICATION</td><td>rpc.connection.unhealthy</td><td>Gauge of current connections in an unhealthy state (not bidirectionally connected or heartbeating)</td><td>Connections</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
15961596
<tr><td>APPLICATION</td><td>rpc.connection.unhealthy_nanos</td><td>Gauge of nanoseconds of unhealthy connection time.<br/><br/>On the prometheus endpoint scraped with the cluster setting &#39;server.child_metrics.enabled&#39; set,<br/>the constituent parts of this metric are available on a per-peer basis and one can read off<br/>for how long a given peer has been unreachable</td><td>Nanoseconds</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
1597+
<tr><td>APPLICATION</td><td>rpc.server.request.duration.nanos</td><td>Duration of an grpc request in nanoseconds.</td><td>Duration</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
15971598
<tr><td>APPLICATION</td><td>schedules.BACKUP.failed</td><td>Number of BACKUP jobs failed</td><td>Jobs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
15981599
<tr><td>APPLICATION</td><td>schedules.BACKUP.last-completed-time</td><td>The unix timestamp of the most recently completed backup by a schedule specified as maintaining this metric</td><td>Jobs</td><td>GAUGE</td><td>TIMESTAMP_SEC</td><td>AVG</td><td>NONE</td></tr>
15991600
<tr><td>APPLICATION</td><td>schedules.BACKUP.last-completed-time-by-virtual_cluster</td><td>The unix timestamp of the most recently completed host scheduled backup by virtual cluster specified as maintaining this metric</td><td>Jobs</td><td>GAUGE</td><td>TIMESTAMP_SEC</td><td>AVG</td><td>NONE</td></tr>

pkg/roachprod/opentelemetry/cockroachdb_metrics.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1505,6 +1505,10 @@ var cockroachdbMetrics = map[string]string{
15051505
"rpc_method_transferlease_recv": "rpc.method.transferlease.recv",
15061506
"rpc_method_truncatelog_recv": "rpc.method.truncatelog.recv",
15071507
"rpc_method_writebatch_recv": "rpc.method.writebatch.recv",
1508+
"rpc_server_request_duration_nanos": "rpc.server.request.duration.nanos",
1509+
"rpc_server_request_duration_nanos_bucket": "rpc.server.request.duration.nanos.bucket",
1510+
"rpc_server_request_duration_nanos_count": "rpc.server.request.duration.nanos.count",
1511+
"rpc_server_request_duration_nanos_sum": "rpc.server.request.duration.nanos.sum",
15081512
"rpc_streams_mux_rangefeed_active": "rpc.streams.mux_rangefeed.active",
15091513
"rpc_streams_mux_rangefeed_recv": "rpc.streams.mux_rangefeed.recv",
15101514
"rpc_streams_rangefeed_active": "rpc.streams.rangefeed.active",

pkg/rpc/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ go_library(
7474
"@com_github_golang_protobuf//proto",
7575
"@com_github_golang_snappy//:snappy",
7676
"@com_github_montanaflynn_stats//:stats",
77+
"@com_github_prometheus_client_model//go",
7778
"@com_github_vividcortex_ewma//:ewma",
7879
"@io_opentelemetry_go_otel//attribute",
7980
"@io_storj_drpc//:drpc",

pkg/rpc/context.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,12 @@ func NewServerEx(
143143
})
144144
})
145145

146+
// If the metrics interceptor is set, it should be registered second so
147+
// that all other interceptors are included in the response time durations.
148+
if o.metricsInterceptor != nil {
149+
unaryInterceptor = append(unaryInterceptor, grpc.UnaryServerInterceptor(o.metricsInterceptor))
150+
}
151+
146152
if !rpcCtx.ContextOptions.Insecure {
147153
a := kvAuth{
148154
sv: &rpcCtx.Settings.SV,

pkg/rpc/context_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"fmt"
1212
"io"
1313
"net"
14+
"reflect"
1415
"strings"
1516
"sync"
1617
"sync/atomic"
@@ -2343,3 +2344,37 @@ func BenchmarkGRPCPing(b *testing.B) {
23432344
})
23442345
}
23452346
}
2347+
2348+
func TestMetricsInterceptor(t *testing.T) {
2349+
defer leaktest.AfterTest(t)()
2350+
ctx := context.Background()
2351+
2352+
stopper := stop.NewStopper()
2353+
defer stopper.Stop(context.Background())
2354+
stopper.SetTracer(tracing.NewTracer())
2355+
2356+
clock := timeutil.NewManualTime(timeutil.Unix(0, 1))
2357+
maxOffset := time.Duration(0)
2358+
2359+
serverCtx := newTestContext(uuid.MakeV4(), clock, maxOffset, stopper)
2360+
serverCtx.AdvertiseAddr = "127.0.0.1:8888"
2361+
serverCtx.NodeID.Set(context.Background(), 1)
2362+
2363+
var interceptor RequestMetricsInterceptor = func(
2364+
ctx context.Context,
2365+
req interface{},
2366+
info *grpc.UnaryServerInfo,
2367+
handler grpc.UnaryHandler,
2368+
) (interface{}, error) {
2369+
return nil, nil
2370+
}
2371+
2372+
_, _, serverInterceptors, err := NewServerEx(ctx, serverCtx, WithMetricsServerInterceptor(interceptor))
2373+
require.NoError(t, err)
2374+
require.GreaterOrEqual(t, len(serverInterceptors.UnaryInterceptors), 2)
2375+
// make sure that the RequestMetricsInterceptor is the second registered
2376+
// interceptor.
2377+
require.Equal(t,
2378+
reflect.ValueOf(interceptor).Pointer(),
2379+
reflect.ValueOf(serverInterceptors.UnaryInterceptors[1]).Pointer())
2380+
}

pkg/rpc/metrics.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,19 @@
66
package rpc
77

88
import (
9+
"context"
910
"strings"
1011

1112
"github.com/VividCortex/ewma"
1213
"github.com/cockroachdb/cockroach/pkg/roachpb"
1314
"github.com/cockroachdb/cockroach/pkg/util/metric"
1415
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
1516
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
17+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
18+
prometheusgo "github.com/prometheus/client_model/go"
19+
"google.golang.org/grpc"
20+
"google.golang.org/grpc/codes"
21+
"google.golang.org/grpc/status"
1622
)
1723

1824
var (
@@ -130,6 +136,13 @@ over this connection.
130136
Help: `Counter of TCP bytes received via gRPC on connections we initiated.`,
131137
Measurement: "Bytes",
132138
}
139+
metaRequestDuration = metric.Metadata{
140+
Name: "rpc.server.request.duration.nanos",
141+
Help: "Duration of an grpc request in nanoseconds.",
142+
Measurement: "Duration",
143+
Unit: metric.Unit_NANOSECONDS,
144+
MetricType: prometheusgo.MetricType_HISTOGRAM,
145+
}
133146
)
134147

135148
func (m *Metrics) makeLabels(k peerKey, remoteLocality roachpb.Locality) []string {
@@ -334,3 +347,59 @@ func (m *Metrics) acquire(k peerKey, l roachpb.Locality) (peerMetrics, localityM
334347
pm.ConnectionInactive.Inc(1)
335348
return pm, lm
336349
}
350+
351+
const (
352+
RpcMethodLabel = "methodName"
353+
RpcStatusCodeLabel = "statusCode"
354+
)
355+
356+
// RequestMetrics contains metrics for RPC requests.
357+
type RequestMetrics struct {
358+
Duration *metric.HistogramVec
359+
}
360+
361+
func NewRequestMetrics() *RequestMetrics {
362+
return &RequestMetrics{
363+
Duration: metric.NewExportedHistogramVec(
364+
metaRequestDuration,
365+
metric.ResponseTime30sBuckets,
366+
[]string{RpcMethodLabel, RpcStatusCodeLabel}),
367+
}
368+
}
369+
370+
type RequestMetricsInterceptor grpc.UnaryServerInterceptor
371+
372+
// NewRequestMetricsInterceptor creates a new gRPC server interceptor that records
373+
// the duration of each RPC. The metric is labeled by the method name and the
374+
// status code of the RPC. The interceptor will only record durations if
375+
// shouldRecord returns true. Otherwise, this interceptor will be a no-op.
376+
func NewRequestMetricsInterceptor(
377+
requestMetrics *RequestMetrics, shouldRecord func(fullMethodName string) bool,
378+
) RequestMetricsInterceptor {
379+
return func(
380+
ctx context.Context,
381+
req interface{},
382+
info *grpc.UnaryServerInfo,
383+
handler grpc.UnaryHandler,
384+
) (interface{}, error) {
385+
if !shouldRecord(info.FullMethod) {
386+
return handler(ctx, req)
387+
}
388+
389+
startTime := timeutil.Now()
390+
resp, err := handler(ctx, req)
391+
duration := timeutil.Since(startTime)
392+
var code codes.Code
393+
if err != nil {
394+
code = status.Code(err)
395+
} else {
396+
code = codes.OK
397+
}
398+
399+
requestMetrics.Duration.Observe(map[string]string{
400+
RpcMethodLabel: info.FullMethod,
401+
RpcStatusCodeLabel: code.String(),
402+
}, float64(duration.Nanoseconds()))
403+
return resp, err
404+
}
405+
}

pkg/rpc/metrics_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,20 @@
66
package rpc
77

88
import (
9+
"context"
10+
"fmt"
911
"reflect"
1012
"testing"
13+
"time"
1114

1215
"github.com/cockroachdb/cockroach/pkg/roachpb"
1316
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
17+
"github.com/cockroachdb/cockroach/pkg/util/log"
1418
io_prometheus_client "github.com/prometheus/client_model/go"
1519
"github.com/stretchr/testify/require"
20+
"google.golang.org/grpc"
21+
"google.golang.org/grpc/codes"
22+
"google.golang.org/grpc/status"
1623
)
1724

1825
// TestMetricsRelease verifies that peerMetrics.release() removes tracking for
@@ -82,3 +89,84 @@ func TestMetricsRelease(t *testing.T) {
8289
// We added one extra peer and one extra locality, verify counts.
8390
require.Equal(t, expectedCount, verifyAllFields(m, 2))
8491
}
92+
93+
func TestServerRequestInstrumentInterceptor(t *testing.T) {
94+
defer leaktest.AfterTest(t)()
95+
defer log.Scope(t).Close(t)
96+
97+
requestMetrics := NewRequestMetrics()
98+
99+
ctx := context.Background()
100+
req := struct{}{}
101+
102+
testcase := []struct {
103+
methodName string
104+
statusCode codes.Code
105+
shouldRecord bool
106+
}{
107+
{"rpc/test/method", codes.OK, true},
108+
{"rpc/test/method", codes.Internal, true},
109+
{"rpc/test/method", codes.Aborted, true},
110+
{"rpc/test/notRecorded", codes.OK, false},
111+
}
112+
113+
for _, tc := range testcase {
114+
t.Run(fmt.Sprintf("%s %s", tc.methodName, tc.statusCode), func(t *testing.T) {
115+
info := &grpc.UnaryServerInfo{FullMethod: tc.methodName}
116+
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
117+
if tc.statusCode == codes.OK {
118+
time.Sleep(time.Millisecond)
119+
return struct{}{}, nil
120+
}
121+
return nil, status.Error(tc.statusCode, tc.statusCode.String())
122+
}
123+
interceptor := NewRequestMetricsInterceptor(requestMetrics, func(fullMethodName string) bool {
124+
return tc.shouldRecord
125+
})
126+
_, err := interceptor(ctx, req, info, handler)
127+
if err != nil {
128+
require.Equal(t, tc.statusCode, status.Code(err))
129+
}
130+
var expectedCount uint64
131+
if tc.shouldRecord {
132+
expectedCount = 1
133+
}
134+
assertGrpcMetrics(t, requestMetrics.Duration.ToPrometheusMetrics(), map[string]uint64{
135+
fmt.Sprintf("%s %s", tc.methodName, tc.statusCode): expectedCount,
136+
})
137+
})
138+
}
139+
}
140+
141+
func assertGrpcMetrics(
142+
t *testing.T, metrics []*io_prometheus_client.Metric, expected map[string]uint64,
143+
) {
144+
t.Helper()
145+
actual := map[string]*io_prometheus_client.Histogram{}
146+
for _, m := range metrics {
147+
var method, statusCode string
148+
for _, l := range m.Label {
149+
switch *l.Name {
150+
case RpcMethodLabel:
151+
method = *l.Value
152+
case RpcStatusCodeLabel:
153+
statusCode = *l.Value
154+
}
155+
}
156+
histogram := m.Histogram
157+
require.NotNil(t, histogram, "expected histogram")
158+
key := fmt.Sprintf("%s %s", method, statusCode)
159+
actual[key] = histogram
160+
}
161+
162+
for key, val := range expected {
163+
histogram, ok := actual[key]
164+
if val == 0 {
165+
require.False(t, ok, "expected `%s` to not exist", key)
166+
} else {
167+
require.True(t, ok)
168+
require.Greater(t, *histogram.SampleSum, float64(0), "expected `%s` to have a SampleSum > 0", key)
169+
require.Equal(t, val, *histogram.SampleCount, "expected `%s` to have SampleCount of %d", key, val)
170+
}
171+
}
172+
}

pkg/rpc/settings.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ var sourceAddr = func() net.Addr {
119119
}()
120120

121121
type serverOpts struct {
122-
interceptor func(fullMethod string) error
122+
interceptor func(fullMethod string) error
123+
metricsInterceptor RequestMetricsInterceptor
123124
}
124125

125126
// ServerOption is a configuration option passed to NewServer.
@@ -142,3 +143,10 @@ func WithInterceptor(f func(fullMethod string) error) ServerOption {
142143
}
143144
}
144145
}
146+
147+
// WithMetricsServerInterceptor adds a RequestMetricsInterceptor to the grpc server.
148+
func WithMetricsServerInterceptor(interceptor RequestMetricsInterceptor) ServerOption {
149+
return func(opts *serverOpts) {
150+
opts.metricsInterceptor = interceptor
151+
}
152+
}

pkg/server/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,7 @@ go_test(
451451
"get_local_files_test.go",
452452
"graphite_test.go",
453453
"grpc_gateway_test.go",
454+
"grpc_server_test.go",
454455
"helpers_test.go",
455456
"http_metrics_test.go",
456457
"index_usage_stats_test.go",

pkg/server/grpc_server.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@ package server
77

88
import (
99
"context"
10+
"strings"
1011
"sync/atomic"
1112

1213
"github.com/cockroachdb/cockroach/pkg/rpc"
1314
"github.com/cockroachdb/cockroach/pkg/server/srverrors"
15+
"github.com/cockroachdb/cockroach/pkg/settings"
16+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
17+
"github.com/cockroachdb/cockroach/pkg/util/metric"
1418
"github.com/cockroachdb/errors"
1519
"google.golang.org/grpc"
1620
"google.golang.org/grpc/codes"
@@ -27,13 +31,21 @@ type grpcServer struct {
2731
mode serveMode
2832
}
2933

30-
func newGRPCServer(ctx context.Context, rpcCtx *rpc.Context) (*grpcServer, error) {
34+
func newGRPCServer(
35+
ctx context.Context, rpcCtx *rpc.Context, metricsRegistry *metric.Registry,
36+
) (*grpcServer, error) {
3137
s := &grpcServer{}
3238
s.mode.set(modeInitializing)
39+
requestMetrics := rpc.NewRequestMetrics()
40+
metricsRegistry.AddMetricStruct(requestMetrics)
3341
srv, dsrv, interceptorInfo, err := rpc.NewServerEx(
3442
ctx, rpcCtx, rpc.WithInterceptor(func(path string) error {
3543
return s.intercept(path)
36-
}))
44+
}), rpc.WithMetricsServerInterceptor(
45+
rpc.NewRequestMetricsInterceptor(requestMetrics, func(method string) bool {
46+
return shouldRecordRequestDuration(rpcCtx.Settings, method)
47+
},
48+
)))
3749
if err != nil {
3850
return nil, err
3951
}
@@ -116,3 +128,25 @@ func NewWaitingForInitError(methodName string) error {
116128
// NB: this error string is sadly matched in grpcutil.IsWaitingForInit().
117129
return grpcstatus.Errorf(codes.Unavailable, "node waiting for init; %s not available", methodName)
118130
}
131+
132+
const (
133+
serverPrefix = "/cockroach.server"
134+
tsdbPrefix = "/cockroach.ts"
135+
)
136+
137+
// serverGRPCRequestMetricsEnabled is a cluster setting that enables the
138+
// collection of gRPC request duration metrics. This uses export only
139+
// metrics so the metrics are only exported to external sources such as
140+
// /_status/vars and DataDog.
141+
var serverGRPCRequestMetricsEnabled = settings.RegisterBoolSetting(
142+
settings.ApplicationLevel,
143+
"server.grpc.request_metrics.enabled",
144+
"enables the collection of grpc metrics",
145+
false,
146+
)
147+
148+
func shouldRecordRequestDuration(settings *cluster.Settings, method string) bool {
149+
return serverGRPCRequestMetricsEnabled.Get(&settings.SV) &&
150+
(strings.HasPrefix(method, serverPrefix) ||
151+
strings.HasPrefix(method, tsdbPrefix))
152+
}

0 commit comments

Comments
 (0)