Skip to content

Commit f60016a

Browse files
authored
Refactor metrics interceptor and fix tests (#413)
1 parent 0c9d258 commit f60016a

14 files changed

+355
-310
lines changed

interceptors/client.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package interceptors
77

88
import (
99
"context"
10+
"io"
1011
"time"
1112

1213
"google.golang.org/grpc"
@@ -74,7 +75,10 @@ func (s *monitoredClientStream) RecvMsg(m interface{}) error {
7475
if err == nil {
7576
return nil
7677
}
77-
78-
s.reporter.PostCall(err, time.Since(s.startTime))
78+
var postErr error
79+
if err != io.EOF {
80+
postErr = err
81+
}
82+
s.reporter.PostCall(postErr, time.Since(s.startTime))
7983
return err
8084
}

interceptors/client_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (m *mockReportable) Equal(t *testing.T, expected []*mockReport) {
5050
require.NoError(t, err)
5151
continue
5252
}
53-
require.Equal(t, expected[i].postCalls[k].Error(), err.Error(), "%v %v", i, k)
53+
require.EqualError(t, err, expected[i].postCalls[k].Error(), "%v %v", i, k)
5454
}
5555
require.Len(t, expected[i].postMsgSends, len(e.postMsgSends), "%v", i)
5656
for k, err := range e.postMsgSends {
@@ -276,7 +276,7 @@ func (s *ClientInterceptorTestSuite) TestListReporting() {
276276
typ: ServerStream,
277277
svcName: testpb.TestServiceFullName,
278278
methodName: "PingList",
279-
postCalls: []error{io.EOF},
279+
postCalls: []error{nil},
280280
postMsgReceives: append(make([]error, testpb.ListResponseCount), io.EOF),
281281
postMsgSends: []error{nil},
282282
}})
@@ -329,7 +329,9 @@ func (s *ClientInterceptorTestSuite) TestBiStreamingReporting() {
329329
if err == io.EOF {
330330
break
331331
}
332-
require.NoError(s.T(), err, "reading pingStream shouldn't fail")
332+
if !s.Assert().NoError(err, "reading pingStream shouldn't fail") {
333+
break
334+
}
333335
count++
334336
}
335337
}()
@@ -340,12 +342,12 @@ func (s *ClientInterceptorTestSuite) TestBiStreamingReporting() {
340342
require.NoError(s.T(), ss.CloseSend())
341343
wg.Wait()
342344

343-
require.EqualValues(s.T(), count, 100, "Number of received msg on the wire must match")
345+
require.EqualValues(s.T(), 100, count, "Number of received msg on the wire must match")
344346
s.mock.Equal(s.T(), []*mockReport{{
345347
typ: BidiStream,
346348
svcName: testpb.TestServiceFullName,
347349
methodName: "PingStream",
348-
postCalls: []error{io.EOF},
350+
postCalls: []error{nil},
349351
postMsgReceives: append(make([]error, 100), io.EOF),
350352
postMsgSends: make([]error, 100),
351353
}})
Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,21 @@
11
package metrics
22

33
import (
4-
openmetrics "github.com/prometheus/client_golang/prometheus"
54
"google.golang.org/grpc"
65

76
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors"
87
)
98

10-
// RegisterClientMetrics returns a custom ClientMetrics object registered
11-
// with the user's registry, and registers some common metrics associated
12-
// with every instance.
13-
func RegisterClientMetrics(registry openmetrics.Registerer) *ClientMetrics {
14-
customClientMetrics := NewClientMetrics(registry)
15-
customClientMetrics.MustRegister(customClientMetrics.clientStartedCounter)
16-
customClientMetrics.MustRegister(customClientMetrics.clientHandledCounter)
17-
customClientMetrics.MustRegister(customClientMetrics.clientStreamMsgReceived)
18-
customClientMetrics.MustRegister(customClientMetrics.clientStreamMsgSent)
19-
20-
return customClientMetrics
21-
}
22-
239
// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
24-
func UnaryClientInterceptor(clientRegister openmetrics.Registerer) grpc.UnaryClientInterceptor {
10+
func UnaryClientInterceptor(clientMetrics *ClientMetrics) grpc.UnaryClientInterceptor {
2511
return interceptors.UnaryClientInterceptor(&reportable{
26-
registry: clientRegister,
12+
clientMetrics: clientMetrics,
2713
})
2814
}
2915

3016
// StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
31-
func StreamClientInterceptor(clientRegister openmetrics.Registerer) grpc.StreamClientInterceptor {
17+
func StreamClientInterceptor(clientMetrics *ClientMetrics) grpc.StreamClientInterceptor {
3218
return interceptors.StreamClientInterceptor(&reportable{
33-
registry: clientRegister,
19+
clientMetrics: clientMetrics,
3420
})
3521
}
Lines changed: 62 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1,99 +1,103 @@
11
package metrics
22

33
import (
4-
"github.com/prometheus/client_golang/prometheus"
54
openmetrics "github.com/prometheus/client_golang/prometheus"
65
)
76

87
// ClientMetrics represents a collection of metrics to be registered on a
98
// Prometheus metrics registry for a gRPC client.
109
type ClientMetrics struct {
11-
clientRegister openmetrics.Registerer
12-
1310
clientStartedCounter *openmetrics.CounterVec
1411
clientHandledCounter *openmetrics.CounterVec
1512
clientStreamMsgReceived *openmetrics.CounterVec
1613
clientStreamMsgSent *openmetrics.CounterVec
1714

18-
clientHandledHistogramEnabled bool
19-
clientHandledHistogramOpts openmetrics.HistogramOpts
20-
clientHandledHistogram *openmetrics.HistogramVec
21-
22-
clientStreamRecvHistogramEnabled bool
23-
clientStreamRecvHistogramOpts openmetrics.HistogramOpts
24-
clientStreamRecvHistogram *openmetrics.HistogramVec
25-
26-
clientStreamSendHistogramEnabled bool
27-
clientStreamSendHistogramOpts openmetrics.HistogramOpts
28-
clientStreamSendHistogram *openmetrics.HistogramVec
15+
// clientHandledHistogram can be nil
16+
clientHandledHistogram *openmetrics.HistogramVec
17+
// clientStreamRecvHistogram can be nil
18+
clientStreamRecvHistogram *openmetrics.HistogramVec
19+
// clientStreamSendHistogram can be nil
20+
clientStreamSendHistogram *openmetrics.HistogramVec
2921
}
3022

31-
// NewClientMetrics returns a ClientMetrics object. Use a new instance of
32-
// ClientMetrics when not using the default Prometheus metrics registry, for
33-
// example when wanting to control which metrics are added to a registry as
34-
// opposed to automatically adding metrics via init functions.
35-
func NewClientMetrics(clientRegistry prometheus.Registerer, counterOpts ...CounterOption) *ClientMetrics {
36-
opts := counterOptions(counterOpts)
23+
// NewClientMetrics returns a new ClientMetrics object.
24+
func NewClientMetrics(opts ...ClientMetricsOption) *ClientMetrics {
25+
var config clientMetricsConfig
26+
config.apply(opts)
3727
return &ClientMetrics{
38-
clientRegister: clientRegistry,
3928
clientStartedCounter: openmetrics.NewCounterVec(
40-
opts.apply(openmetrics.CounterOpts{
29+
config.counterOpts.apply(openmetrics.CounterOpts{
4130
Name: "grpc_client_started_total",
4231
Help: "Total number of RPCs started on the client.",
4332
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
4433

4534
clientHandledCounter: openmetrics.NewCounterVec(
46-
opts.apply(openmetrics.CounterOpts{
35+
config.counterOpts.apply(openmetrics.CounterOpts{
4736
Name: "grpc_client_handled_total",
4837
Help: "Total number of RPCs completed by the client, regardless of success or failure.",
4938
}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
5039

5140
clientStreamMsgReceived: openmetrics.NewCounterVec(
52-
opts.apply(openmetrics.CounterOpts{
41+
config.counterOpts.apply(openmetrics.CounterOpts{
5342
Name: "grpc_client_msg_received_total",
5443
Help: "Total number of RPC stream messages received by the client.",
5544
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
5645

5746
clientStreamMsgSent: openmetrics.NewCounterVec(
58-
opts.apply(openmetrics.CounterOpts{
47+
config.counterOpts.apply(openmetrics.CounterOpts{
5948
Name: "grpc_client_msg_sent_total",
6049
Help: "Total number of gRPC stream messages sent by the client.",
6150
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
6251

63-
clientHandledHistogramEnabled: false,
64-
clientHandledHistogramOpts: openmetrics.HistogramOpts{
65-
Name: "grpc_client_handling_seconds",
66-
Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
67-
Buckets: openmetrics.DefBuckets,
68-
},
69-
clientHandledHistogram: nil,
70-
clientStreamRecvHistogramEnabled: false,
71-
clientStreamRecvHistogramOpts: openmetrics.HistogramOpts{
72-
Name: "grpc_client_msg_recv_handling_seconds",
73-
Help: "Histogram of response latency (seconds) of the gRPC single message receive.",
74-
Buckets: openmetrics.DefBuckets,
75-
},
76-
clientStreamRecvHistogram: nil,
77-
clientStreamSendHistogramEnabled: false,
78-
clientStreamSendHistogramOpts: openmetrics.HistogramOpts{
79-
Name: "grpc_client_msg_send_handling_seconds",
80-
Help: "Histogram of response latency (seconds) of the gRPC single message send.",
81-
Buckets: openmetrics.DefBuckets,
82-
},
83-
clientStreamSendHistogram: nil,
52+
clientHandledHistogram: config.clientHandledHistogram,
53+
clientStreamRecvHistogram: config.clientStreamRecvHistogram,
54+
clientStreamSendHistogram: config.clientStreamSendHistogram,
8455
}
8556
}
8657

87-
// Register registers the provided Collector with the custom register.
58+
// NewRegisteredClientMetrics returns a custom ClientMetrics object registered
59+
// with the user's registry, and registers some common metrics associated
60+
// with every instance.
61+
func NewRegisteredClientMetrics(registry openmetrics.Registerer, opts ...ClientMetricsOption) *ClientMetrics {
62+
customClientMetrics := NewClientMetrics(opts...)
63+
customClientMetrics.MustRegister(registry)
64+
return customClientMetrics
65+
}
66+
67+
// Register registers the metrics with the registry.
8868
// returns error much like DefaultRegisterer of Prometheus.
89-
func (m *ClientMetrics) Register(c openmetrics.Collector) error {
90-
return m.clientRegister.Register(c)
69+
func (m *ClientMetrics) Register(registry openmetrics.Registerer) error {
70+
for _, collector := range m.toRegister() {
71+
if err := registry.Register(collector); err != nil {
72+
return err
73+
}
74+
}
75+
return nil
9176
}
9277

93-
// MustRegister registers the provided Collectors with the custom Registerer
78+
// MustRegister registers the metrics with the registry
9479
// and panics if any error occurs much like DefaultRegisterer of Prometheus.
95-
func (m *ClientMetrics) MustRegister(c openmetrics.Collector) {
96-
m.clientRegister.MustRegister(c)
80+
func (m *ClientMetrics) MustRegister(registry openmetrics.Registerer) {
81+
registry.MustRegister(m.toRegister()...)
82+
}
83+
84+
func (m *ClientMetrics) toRegister() []openmetrics.Collector {
85+
res := []openmetrics.Collector{
86+
m.clientStartedCounter,
87+
m.clientHandledCounter,
88+
m.clientStreamMsgReceived,
89+
m.clientStreamMsgSent,
90+
}
91+
if m.clientHandledHistogram != nil {
92+
res = append(res, m.clientHandledHistogram)
93+
}
94+
if m.clientStreamRecvHistogram != nil {
95+
res = append(res, m.clientStreamRecvHistogram)
96+
}
97+
if m.clientStreamSendHistogram != nil {
98+
res = append(res, m.clientStreamSendHistogram)
99+
}
100+
return res
97101
}
98102

99103
// Describe sends the super-set of all possible descriptors of metrics
@@ -104,13 +108,13 @@ func (m *ClientMetrics) Describe(ch chan<- *openmetrics.Desc) {
104108
m.clientHandledCounter.Describe(ch)
105109
m.clientStreamMsgReceived.Describe(ch)
106110
m.clientStreamMsgSent.Describe(ch)
107-
if m.clientHandledHistogramEnabled {
111+
if m.clientHandledHistogram != nil {
108112
m.clientHandledHistogram.Describe(ch)
109113
}
110-
if m.clientStreamRecvHistogramEnabled {
114+
if m.clientStreamRecvHistogram != nil {
111115
m.clientStreamRecvHistogram.Describe(ch)
112116
}
113-
if m.clientStreamSendHistogramEnabled {
117+
if m.clientStreamSendHistogram != nil {
114118
m.clientStreamSendHistogram.Describe(ch)
115119
}
116120
}
@@ -123,65 +127,13 @@ func (m *ClientMetrics) Collect(ch chan<- openmetrics.Metric) {
123127
m.clientHandledCounter.Collect(ch)
124128
m.clientStreamMsgReceived.Collect(ch)
125129
m.clientStreamMsgSent.Collect(ch)
126-
if m.clientHandledHistogramEnabled {
130+
if m.clientHandledHistogram != nil {
127131
m.clientHandledHistogram.Collect(ch)
128132
}
129-
if m.clientStreamRecvHistogramEnabled {
133+
if m.clientStreamRecvHistogram != nil {
130134
m.clientStreamRecvHistogram.Collect(ch)
131135
}
132-
if m.clientStreamSendHistogramEnabled {
136+
if m.clientStreamSendHistogram != nil {
133137
m.clientStreamSendHistogram.Collect(ch)
134138
}
135139
}
136-
137-
// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
138-
// Histogram metrics can be very expensive for Prometheus to retain and query.
139-
func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOption) error {
140-
for _, o := range opts {
141-
o(&m.clientHandledHistogramOpts)
142-
}
143-
if !m.clientHandledHistogramEnabled {
144-
m.clientHandledHistogram = openmetrics.NewHistogramVec(
145-
m.clientHandledHistogramOpts,
146-
[]string{"grpc_type", "grpc_service", "grpc_method"},
147-
)
148-
}
149-
m.clientHandledHistogramEnabled = true
150-
return m.clientRegister.Register(m.clientHandledHistogram)
151-
}
152-
153-
// EnableClientStreamReceiveTimeHistogram turns on recording of single message receive time of streaming RPCs.
154-
// Histogram metrics can be very expensive for Prometheus to retain and query.
155-
func (m *ClientMetrics) EnableClientStreamReceiveTimeHistogram(opts ...HistogramOption) error {
156-
for _, o := range opts {
157-
o(&m.clientStreamRecvHistogramOpts)
158-
}
159-
160-
if !m.clientStreamRecvHistogramEnabled {
161-
m.clientStreamRecvHistogram = openmetrics.NewHistogramVec(
162-
m.clientStreamRecvHistogramOpts,
163-
[]string{"grpc_type", "grpc_service", "grpc_method"},
164-
)
165-
}
166-
167-
m.clientStreamRecvHistogramEnabled = true
168-
return m.clientRegister.Register(m.clientStreamRecvHistogram)
169-
}
170-
171-
// EnableClientStreamSendTimeHistogram turns on recording of single message send time of streaming RPCs.
172-
// Histogram metrics can be very expensive for Prometheus to retain and query.
173-
func (m *ClientMetrics) EnableClientStreamSendTimeHistogram(opts ...HistogramOption) error {
174-
for _, o := range opts {
175-
o(&m.clientStreamSendHistogramOpts)
176-
}
177-
178-
if !m.clientStreamSendHistogramEnabled {
179-
m.clientStreamSendHistogram = openmetrics.NewHistogramVec(
180-
m.clientStreamSendHistogramOpts,
181-
[]string{"grpc_type", "grpc_service", "grpc_method"},
182-
)
183-
}
184-
185-
m.clientStreamSendHistogramEnabled = true
186-
return m.clientRegister.Register(m.clientStreamSendHistogram)
187-
}

0 commit comments

Comments
 (0)