Skip to content

Commit 215af81

Browse files
Add OpenMetrics(Prometheus) in the provider module (#379)
* Added a prometheus provider Signed-off-by: Yash Sharma <[email protected]> * Moved the constants to interceptor package Signed-off-by: Yash Sharma <[email protected]> * renamed prometheus to open metrics Signed-off-by: Yash Sharma <[email protected]> * changed the interface of StartTimeCall to previous interface, and use reporter to utilise the timer call function of open metrics Signed-off-by: Yash Sharma <[email protected]> * Added options for open metrics Signed-off-by: Yash Sharma <[email protected]> * changed prom client to openmetrics for nomenclature Signed-off-by: Yash Sharma <[email protected]> * modified go sum Signed-off-by: Yash Sharma <[email protected]> * changed register to mustregister, so that code panics if registry is incorrect, also added some nits Signed-off-by: Yash Sharma <[email protected]> * Added tests for openmetrics provider Signed-off-by: Yash Sharma <[email protected]> * Added buf suggested proto structuring of proto files for testing in openmetrics Signed-off-by: Yash Sharma <[email protected]> * nitpickings due to buf suggesting changes Signed-off-by: Yash Sharma <[email protected]> * modified openmetrics gomod Signed-off-by: Yash Sharma <[email protected]> * Add custom registry support for Prometheus Signed-off-by: Yash Sharma <[email protected]> * linting changes Signed-off-by: Yash Sharma <[email protected]> * added a function for custom registered metrics object Signed-off-by: Yash Sharma <[email protected]> * Added a function to remove the global metrics object, and return a server/client metrics object with some default metrics initialisation Signed-off-by: Yash Sharma <[email protected]>
1 parent 20b2825 commit 215af81

16 files changed

+2538
-1
lines changed

interceptors/reporter.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ type ServerReportable interface {
6969

7070
type Reporter interface {
7171
PostCall(err error, rpcDuration time.Duration)
72-
7372
PostMsgSend(reqProto interface{}, err error, sendDuration time.Duration)
7473
PostMsgReceive(replyProto interface{}, err error, recvDuration time.Duration)
7574
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package metrics
2+
3+
import (
4+
openmetrics "github.com/prometheus/client_golang/prometheus"
5+
"google.golang.org/grpc"
6+
7+
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors"
8+
)
9+
10+
// RegisterClientMetrics returns a custom ClientMetrics object registered
11+
// with the user's registry, and registers some common metrics associated
12+
// with every instance.
13+
func RegisterClientMetrics(registry openmetrics.Registerer) *ClientMetrics {
14+
customClientMetrics := NewClientMetrics(registry)
15+
customClientMetrics.MustRegister(customClientMetrics.clientStartedCounter)
16+
customClientMetrics.MustRegister(customClientMetrics.clientHandledCounter)
17+
customClientMetrics.MustRegister(customClientMetrics.clientStreamMsgReceived)
18+
customClientMetrics.MustRegister(customClientMetrics.clientStreamMsgSent)
19+
20+
return customClientMetrics
21+
}
22+
23+
// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
24+
func UnaryClientInterceptor(clientRegister openmetrics.Registerer) grpc.UnaryClientInterceptor {
25+
return interceptors.UnaryClientInterceptor(&reportable{
26+
registry: clientRegister,
27+
})
28+
}
29+
30+
// StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
31+
func StreamClientInterceptor(clientRegister openmetrics.Registerer) grpc.StreamClientInterceptor {
32+
return interceptors.StreamClientInterceptor(&reportable{
33+
registry: clientRegister,
34+
})
35+
}
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
package metrics
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
openmetrics "github.com/prometheus/client_golang/prometheus"
6+
)
7+
8+
// ClientMetrics represents a collection of metrics to be registered on a
9+
// Prometheus metrics registry for a gRPC client.
10+
type ClientMetrics struct {
11+
clientRegister openmetrics.Registerer
12+
13+
clientStartedCounter *openmetrics.CounterVec
14+
clientHandledCounter *openmetrics.CounterVec
15+
clientStreamMsgReceived *openmetrics.CounterVec
16+
clientStreamMsgSent *openmetrics.CounterVec
17+
18+
clientHandledHistogramEnabled bool
19+
clientHandledHistogramOpts openmetrics.HistogramOpts
20+
clientHandledHistogram *openmetrics.HistogramVec
21+
22+
clientStreamRecvHistogramEnabled bool
23+
clientStreamRecvHistogramOpts openmetrics.HistogramOpts
24+
clientStreamRecvHistogram *openmetrics.HistogramVec
25+
26+
clientStreamSendHistogramEnabled bool
27+
clientStreamSendHistogramOpts openmetrics.HistogramOpts
28+
clientStreamSendHistogram *openmetrics.HistogramVec
29+
}
30+
31+
// NewClientMetrics returns a ClientMetrics object. Use a new instance of
32+
// ClientMetrics when not using the default Prometheus metrics registry, for
33+
// example when wanting to control which metrics are added to a registry as
34+
// opposed to automatically adding metrics via init functions.
35+
func NewClientMetrics(clientRegistry prometheus.Registerer, counterOpts ...CounterOption) *ClientMetrics {
36+
opts := counterOptions(counterOpts)
37+
return &ClientMetrics{
38+
clientRegister: clientRegistry,
39+
clientStartedCounter: openmetrics.NewCounterVec(
40+
opts.apply(openmetrics.CounterOpts{
41+
Name: "grpc_client_started_total",
42+
Help: "Total number of RPCs started on the client.",
43+
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
44+
45+
clientHandledCounter: openmetrics.NewCounterVec(
46+
opts.apply(openmetrics.CounterOpts{
47+
Name: "grpc_client_handled_total",
48+
Help: "Total number of RPCs completed by the client, regardless of success or failure.",
49+
}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
50+
51+
clientStreamMsgReceived: openmetrics.NewCounterVec(
52+
opts.apply(openmetrics.CounterOpts{
53+
Name: "grpc_client_msg_received_total",
54+
Help: "Total number of RPC stream messages received by the client.",
55+
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
56+
57+
clientStreamMsgSent: openmetrics.NewCounterVec(
58+
opts.apply(openmetrics.CounterOpts{
59+
Name: "grpc_client_msg_sent_total",
60+
Help: "Total number of gRPC stream messages sent by the client.",
61+
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
62+
63+
clientHandledHistogramEnabled: false,
64+
clientHandledHistogramOpts: openmetrics.HistogramOpts{
65+
Name: "grpc_client_handling_seconds",
66+
Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
67+
Buckets: openmetrics.DefBuckets,
68+
},
69+
clientHandledHistogram: nil,
70+
clientStreamRecvHistogramEnabled: false,
71+
clientStreamRecvHistogramOpts: openmetrics.HistogramOpts{
72+
Name: "grpc_client_msg_recv_handling_seconds",
73+
Help: "Histogram of response latency (seconds) of the gRPC single message receive.",
74+
Buckets: openmetrics.DefBuckets,
75+
},
76+
clientStreamRecvHistogram: nil,
77+
clientStreamSendHistogramEnabled: false,
78+
clientStreamSendHistogramOpts: openmetrics.HistogramOpts{
79+
Name: "grpc_client_msg_send_handling_seconds",
80+
Help: "Histogram of response latency (seconds) of the gRPC single message send.",
81+
Buckets: openmetrics.DefBuckets,
82+
},
83+
clientStreamSendHistogram: nil,
84+
}
85+
}
86+
87+
// Register registers the provided Collector with the custom register.
88+
// returns error much like DefaultRegisterer of Prometheus.
89+
func (m *ClientMetrics) Register(c openmetrics.Collector) error {
90+
return m.clientRegister.Register(c)
91+
}
92+
93+
// MustRegister registers the provided Collectors with the custom Registerer
94+
// and panics if any error occurs much like DefaultRegisterer of Prometheus.
95+
func (m *ClientMetrics) MustRegister(c openmetrics.Collector) {
96+
m.clientRegister.MustRegister(c)
97+
}
98+
99+
// Describe sends the super-set of all possible descriptors of metrics
100+
// collected by this Collector to the provided channel and returns once
101+
// the last descriptor has been sent.
102+
func (m *ClientMetrics) Describe(ch chan<- *openmetrics.Desc) {
103+
m.clientStartedCounter.Describe(ch)
104+
m.clientHandledCounter.Describe(ch)
105+
m.clientStreamMsgReceived.Describe(ch)
106+
m.clientStreamMsgSent.Describe(ch)
107+
if m.clientHandledHistogramEnabled {
108+
m.clientHandledHistogram.Describe(ch)
109+
}
110+
if m.clientStreamRecvHistogramEnabled {
111+
m.clientStreamRecvHistogram.Describe(ch)
112+
}
113+
if m.clientStreamSendHistogramEnabled {
114+
m.clientStreamSendHistogram.Describe(ch)
115+
}
116+
}
117+
118+
// Collect is called by the Prometheus registry when collecting
119+
// metrics. The implementation sends each collected metric via the
120+
// provided channel and returns once the last metric has been sent.
121+
func (m *ClientMetrics) Collect(ch chan<- openmetrics.Metric) {
122+
m.clientStartedCounter.Collect(ch)
123+
m.clientHandledCounter.Collect(ch)
124+
m.clientStreamMsgReceived.Collect(ch)
125+
m.clientStreamMsgSent.Collect(ch)
126+
if m.clientHandledHistogramEnabled {
127+
m.clientHandledHistogram.Collect(ch)
128+
}
129+
if m.clientStreamRecvHistogramEnabled {
130+
m.clientStreamRecvHistogram.Collect(ch)
131+
}
132+
if m.clientStreamSendHistogramEnabled {
133+
m.clientStreamSendHistogram.Collect(ch)
134+
}
135+
}
136+
137+
// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
138+
// Histogram metrics can be very expensive for Prometheus to retain and query.
139+
func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOption) error {
140+
for _, o := range opts {
141+
o(&m.clientHandledHistogramOpts)
142+
}
143+
if !m.clientHandledHistogramEnabled {
144+
m.clientHandledHistogram = openmetrics.NewHistogramVec(
145+
m.clientHandledHistogramOpts,
146+
[]string{"grpc_type", "grpc_service", "grpc_method"},
147+
)
148+
}
149+
m.clientHandledHistogramEnabled = true
150+
return m.clientRegister.Register(m.clientHandledHistogram)
151+
}
152+
153+
// EnableClientStreamReceiveTimeHistogram turns on recording of single message receive time of streaming RPCs.
154+
// Histogram metrics can be very expensive for Prometheus to retain and query.
155+
func (m *ClientMetrics) EnableClientStreamReceiveTimeHistogram(opts ...HistogramOption) error {
156+
for _, o := range opts {
157+
o(&m.clientStreamRecvHistogramOpts)
158+
}
159+
160+
if !m.clientStreamRecvHistogramEnabled {
161+
m.clientStreamRecvHistogram = openmetrics.NewHistogramVec(
162+
m.clientStreamRecvHistogramOpts,
163+
[]string{"grpc_type", "grpc_service", "grpc_method"},
164+
)
165+
}
166+
167+
m.clientStreamRecvHistogramEnabled = true
168+
return m.clientRegister.Register(m.clientStreamRecvHistogram)
169+
}
170+
171+
// EnableClientStreamSendTimeHistogram turns on recording of single message send time of streaming RPCs.
172+
// Histogram metrics can be very expensive for Prometheus to retain and query.
173+
func (m *ClientMetrics) EnableClientStreamSendTimeHistogram(opts ...HistogramOption) error {
174+
for _, o := range opts {
175+
o(&m.clientStreamSendHistogramOpts)
176+
}
177+
178+
if !m.clientStreamSendHistogramEnabled {
179+
m.clientStreamSendHistogram = openmetrics.NewHistogramVec(
180+
m.clientStreamSendHistogramOpts,
181+
[]string{"grpc_type", "grpc_service", "grpc_method"},
182+
)
183+
}
184+
185+
m.clientStreamSendHistogramEnabled = true
186+
return m.clientRegister.Register(m.clientStreamSendHistogram)
187+
}

providers/openmetrics/client_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package metrics
2+
3+
import (
4+
"context"
5+
"io"
6+
"net"
7+
"testing"
8+
"time"
9+
10+
pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/providers/openmetrics/v2/testproto/v1"
11+
"github.com/prometheus/client_golang/prometheus"
12+
"github.com/stretchr/testify/require"
13+
"github.com/stretchr/testify/suite"
14+
"google.golang.org/grpc"
15+
"google.golang.org/grpc/codes"
16+
"google.golang.org/grpc/status"
17+
)
18+
19+
var (
20+
DefaultClientMetrics *ClientMetrics = NewClientMetrics(prometheus.DefaultRegisterer)
21+
)
22+
23+
func TestClientInterceptorSuite(t *testing.T) {
24+
suite.Run(t, &ClientInterceptorTestSuite{})
25+
}
26+
27+
type ClientInterceptorTestSuite struct {
28+
suite.Suite
29+
30+
serverListener net.Listener
31+
server *grpc.Server
32+
clientConn *grpc.ClientConn
33+
testClient pb_testproto.TestServiceClient
34+
ctx context.Context
35+
cancel context.CancelFunc
36+
}
37+
38+
func (s *ClientInterceptorTestSuite) SetupSuite() {
39+
var err error
40+
41+
DefaultClientMetrics.EnableClientHandlingTimeHistogram()
42+
43+
s.serverListener, err = net.Listen("tcp", "127.0.0.1:0")
44+
require.NoError(s.T(), err, "must be able to allocate a port for serverListener")
45+
46+
// This is the point where we hook up the interceptor
47+
s.server = grpc.NewServer()
48+
pb_testproto.RegisterTestServiceServer(s.server, &testService{t: s.T()})
49+
50+
go func() {
51+
s.server.Serve(s.serverListener)
52+
}()
53+
54+
s.clientConn, err = grpc.Dial(
55+
s.serverListener.Addr().String(),
56+
grpc.WithInsecure(),
57+
grpc.WithBlock(),
58+
grpc.WithUnaryInterceptor(UnaryClientInterceptor(prometheus.DefaultRegisterer)),
59+
grpc.WithStreamInterceptor(StreamClientInterceptor(prometheus.DefaultRegisterer)),
60+
grpc.WithTimeout(2*time.Second))
61+
require.NoError(s.T(), err, "must not error on client Dial")
62+
s.testClient = pb_testproto.NewTestServiceClient(s.clientConn)
63+
}
64+
65+
func (s *ClientInterceptorTestSuite) SetupTest() {
66+
// Make all RPC calls last at most 2 sec, meaning all async issues or deadlock will not kill tests.
67+
s.ctx, s.cancel = context.WithTimeout(context.TODO(), 2*time.Second)
68+
69+
// Make sure every test starts with same fresh, intialized metric state.
70+
DefaultClientMetrics.clientStartedCounter.Reset()
71+
DefaultClientMetrics.clientHandledCounter.Reset()
72+
DefaultClientMetrics.clientHandledHistogram.Reset()
73+
DefaultClientMetrics.clientStreamMsgReceived.Reset()
74+
DefaultClientMetrics.clientStreamMsgSent.Reset()
75+
}
76+
77+
func (s *ClientInterceptorTestSuite) TearDownSuite() {
78+
if s.serverListener != nil {
79+
s.server.Stop()
80+
s.T().Logf("stopped grpc.Server at: %v", s.serverListener.Addr().String())
81+
s.serverListener.Close()
82+
83+
}
84+
if s.clientConn != nil {
85+
s.clientConn.Close()
86+
}
87+
}
88+
89+
func (s *ClientInterceptorTestSuite) TearDownTest() {
90+
s.cancel()
91+
}
92+
93+
func (s *ClientInterceptorTestSuite) TestUnaryIncrementsMetrics() {
94+
_, err := s.testClient.PingEmpty(s.ctx, &pb_testproto.PingEmptyRequest{}) // should return with code=OK
95+
require.NoError(s.T(), err)
96+
requireValue(s.T(), 1, DefaultClientMetrics.clientStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingEmpty"))
97+
requireValue(s.T(), 1, DefaultClientMetrics.clientHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingEmpty", "OK"))
98+
requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingEmpty"))
99+
100+
_, err = s.testClient.PingError(s.ctx, &pb_testproto.PingErrorRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition
101+
require.Error(s.T(), err)
102+
requireValue(s.T(), 1, DefaultClientMetrics.clientStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingError"))
103+
requireValue(s.T(), 1, DefaultClientMetrics.clientHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingError", "FailedPrecondition"))
104+
requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingError"))
105+
}
106+
107+
func (s *ClientInterceptorTestSuite) TestStartedStreamingIncrementsStarted() {
108+
_, err := s.testClient.PingList(s.ctx, &pb_testproto.PingListRequest{})
109+
require.NoError(s.T(), err)
110+
requireValue(s.T(), 1, DefaultClientMetrics.clientStartedCounter.WithLabelValues("server_stream", "mwitkow.testproto.TestService", "PingList"))
111+
112+
_, err = s.testClient.PingList(s.ctx, &pb_testproto.PingListRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition
113+
require.NoError(s.T(), err, "PingList must not fail immediately")
114+
requireValue(s.T(), 2, DefaultClientMetrics.clientStartedCounter.WithLabelValues("server_stream", "mwitkow.testproto.TestService", "PingList"))
115+
}
116+
117+
func (s *ClientInterceptorTestSuite) TestStreamingIncrementsMetrics() {
118+
ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingListRequest{}) // should return with code=OK
119+
// Do a read, just for kicks.
120+
count := 0
121+
for {
122+
_, err := ss.Recv()
123+
if err == io.EOF {
124+
break
125+
}
126+
require.NoError(s.T(), err, "reading pingList shouldn't fail")
127+
count++
128+
}
129+
require.EqualValues(s.T(), countListResponses, count, "Number of received msg on the wire must match")
130+
131+
requireValue(s.T(), 1, DefaultClientMetrics.clientStartedCounter.WithLabelValues("server_stream", "mwitkow.testproto.TestService", "PingList"))
132+
requireValue(s.T(), 1, DefaultClientMetrics.clientHandledCounter.WithLabelValues("server_stream", "mwitkow.testproto.TestService", "PingList", "OK"))
133+
requireValue(s.T(), countListResponses, DefaultClientMetrics.clientStreamMsgReceived.WithLabelValues("server_stream", "mwitkow.testproto.TestService", "PingList"))
134+
requireValue(s.T(), 1, DefaultClientMetrics.clientStreamMsgSent.WithLabelValues("server_stream", "mwitkow.testproto.TestService", "PingList"))
135+
requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientHandledHistogram.WithLabelValues("server_stream", "mwitkow.testproto.TestService", "PingList"))
136+
137+
ss, err := s.testClient.PingList(s.ctx, &pb_testproto.PingListRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition
138+
require.NoError(s.T(), err, "PingList must not fail immediately")
139+
140+
// Do a read, just to progate errors.
141+
_, err = ss.Recv()
142+
st, _ := status.FromError(err)
143+
require.Equal(s.T(), codes.FailedPrecondition, st.Code(), "Recv must return FailedPrecondition, otherwise the test is wrong")
144+
145+
requireValue(s.T(), 2, DefaultClientMetrics.clientStartedCounter.WithLabelValues("server_stream", "mwitkow.testproto.TestService", "PingList"))
146+
requireValue(s.T(), 1, DefaultClientMetrics.clientHandledCounter.WithLabelValues("server_stream", "mwitkow.testproto.TestService", "PingList", "FailedPrecondition"))
147+
requireValueHistCount(s.T(), 2, DefaultClientMetrics.clientHandledHistogram.WithLabelValues("server_stream", "mwitkow.testproto.TestService", "PingList"))
148+
}

0 commit comments

Comments
 (0)