Skip to content

Commit 30197e7

Browse files
committed
TUN-8422: Add metrics for capnp method calls
Adds new suite of metrics to capture the following for capnp rpcs operations: - Method calls - Method call failures - Method call latencies Each of the operations is labeled by the handler that serves the method and the method of operation invoked. Additionally, each of these are split between if the operation was called by a client or served.
1 parent 654a326 commit 30197e7

File tree

8 files changed

+250
-23
lines changed

8 files changed

+250
-23
lines changed

connection/metrics.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ type localConfigMetrics struct {
4343
}
4444

4545
type tunnelMetrics struct {
46-
timerRetries prometheus.Gauge
4746
serverLocations *prometheus.GaugeVec
4847
// locationLock is a mutex for oldServerLocations
4948
locationLock sync.Mutex
@@ -351,15 +350,6 @@ func initTunnelMetrics() *tunnelMetrics {
351350
)
352351
prometheus.MustRegister(maxConcurrentRequestsPerTunnel)
353352

354-
timerRetries := prometheus.NewGauge(
355-
prometheus.GaugeOpts{
356-
Namespace: MetricsNamespace,
357-
Subsystem: TunnelSubsystem,
358-
Name: "timer_retries",
359-
Help: "Unacknowledged heart beats count",
360-
})
361-
prometheus.MustRegister(timerRetries)
362-
363353
serverLocations := prometheus.NewGaugeVec(
364354
prometheus.GaugeOpts{
365355
Namespace: MetricsNamespace,
@@ -416,7 +406,6 @@ func initTunnelMetrics() *tunnelMetrics {
416406
prometheus.MustRegister(registerSuccess)
417407

418408
return &tunnelMetrics{
419-
timerRetries: timerRetries,
420409
serverLocations: serverLocations,
421410
oldServerLocations: make(map[string]string),
422411
muxerMetrics: newMuxerMetrics(),

tunnelrpc/metrics/metrics.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package metrics
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
)
6+
7+
const (
8+
metricsNamespace = "cloudflared"
9+
rpcSubsystem = "rpc"
10+
)
11+
12+
// CloudflaredServer operation labels
13+
// CloudflaredServer is an extension of SessionManager with additional methods, but it's helpful
14+
// to visualize it separately in the metrics since they are technically different client/servers.
15+
const (
16+
Cloudflared = "cloudflared"
17+
)
18+
19+
// ConfigurationManager operation labels
20+
const (
21+
ConfigurationManager = "config"
22+
23+
OperationUpdateConfiguration = "update_configuration"
24+
)
25+
26+
// SessionManager operation labels
27+
const (
28+
SessionManager = "session"
29+
30+
OperationRegisterUdpSession = "register_udp_session"
31+
OperationUnregisterUdpSession = "unregister_udp_session"
32+
)
33+
34+
// RegistrationServer operation labels
35+
const (
36+
Registration = "registration"
37+
38+
OperationRegisterConnection = "register_connection"
39+
OperationUnregisterConnection = "unregister_connection"
40+
OperationUpdateLocalConfiguration = "update_local_configuration"
41+
)
42+
43+
type rpcMetrics struct {
44+
serverOperations *prometheus.CounterVec
45+
serverFailures *prometheus.CounterVec
46+
serverOperationsLatency *prometheus.HistogramVec
47+
48+
ClientOperations *prometheus.CounterVec
49+
ClientFailures *prometheus.CounterVec
50+
ClientOperationsLatency *prometheus.HistogramVec
51+
}
52+
53+
var CapnpMetrics *rpcMetrics = &rpcMetrics{
54+
serverOperations: prometheus.NewCounterVec(
55+
prometheus.CounterOpts{
56+
Namespace: metricsNamespace,
57+
Subsystem: rpcSubsystem,
58+
Name: "server_operations",
59+
Help: "Number of rpc methods by handler served",
60+
},
61+
[]string{"handler", "method"},
62+
),
63+
serverFailures: prometheus.NewCounterVec(
64+
prometheus.CounterOpts{
65+
Namespace: metricsNamespace,
66+
Subsystem: rpcSubsystem,
67+
Name: "server_failures",
68+
Help: "Number of rpc methods failures by handler served",
69+
},
70+
[]string{"handler", "method"},
71+
),
72+
serverOperationsLatency: prometheus.NewHistogramVec(
73+
prometheus.HistogramOpts{
74+
Namespace: metricsNamespace,
75+
Subsystem: rpcSubsystem,
76+
Name: "server_latency_secs",
77+
Help: "Latency of rpc methods by handler served",
78+
// Bucket starts at 50ms, each bucket grows by a factor of 3, up to 5 buckets and is expressed as seconds:
79+
// 50ms, 150ms, 450ms, 1350ms, 4050ms
80+
Buckets: prometheus.ExponentialBuckets(0.05, 3, 5),
81+
},
82+
[]string{"handler", "method"},
83+
),
84+
ClientOperations: prometheus.NewCounterVec(
85+
prometheus.CounterOpts{
86+
Namespace: metricsNamespace,
87+
Subsystem: rpcSubsystem,
88+
Name: "client_operations",
89+
Help: "Number of rpc methods by handler requested",
90+
},
91+
[]string{"handler", "method"},
92+
),
93+
ClientFailures: prometheus.NewCounterVec(
94+
prometheus.CounterOpts{
95+
Namespace: metricsNamespace,
96+
Subsystem: rpcSubsystem,
97+
Name: "client_failures",
98+
Help: "Number of rpc method failures by handler requested",
99+
},
100+
[]string{"handler", "method"},
101+
),
102+
ClientOperationsLatency: prometheus.NewHistogramVec(
103+
prometheus.HistogramOpts{
104+
Namespace: metricsNamespace,
105+
Subsystem: rpcSubsystem,
106+
Name: "client_latency_secs",
107+
Help: "Latency of rpc methods by handler requested",
108+
// Bucket starts at 50ms, each bucket grows by a factor of 3, up to 5 buckets and is expressed as seconds:
109+
// 50ms, 150ms, 450ms, 1350ms, 4050ms
110+
Buckets: prometheus.ExponentialBuckets(0.05, 3, 5),
111+
},
112+
[]string{"handler", "method"},
113+
),
114+
}
115+
116+
func ObserveServerHandler(inner func() error, handler, method string) error {
117+
defer CapnpMetrics.serverOperations.WithLabelValues(handler, method).Inc()
118+
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(s float64) {
119+
CapnpMetrics.serverOperationsLatency.WithLabelValues(handler, method).Observe(s)
120+
}))
121+
defer timer.ObserveDuration()
122+
123+
err := inner()
124+
if err != nil {
125+
CapnpMetrics.serverFailures.WithLabelValues(handler, method).Inc()
126+
}
127+
return err
128+
}
129+
130+
func NewClientOperationLatencyObserver(server string, method string) *prometheus.Timer {
131+
return prometheus.NewTimer(prometheus.ObserverFunc(func(s float64) {
132+
CapnpMetrics.ClientOperationsLatency.WithLabelValues(server, method).Observe(s)
133+
}))
134+
}
135+
136+
func init() {
137+
prometheus.MustRegister(CapnpMetrics.serverOperations)
138+
prometheus.MustRegister(CapnpMetrics.serverFailures)
139+
prometheus.MustRegister(CapnpMetrics.serverOperationsLatency)
140+
prometheus.MustRegister(CapnpMetrics.ClientOperations)
141+
prometheus.MustRegister(CapnpMetrics.ClientFailures)
142+
prometheus.MustRegister(CapnpMetrics.ClientOperationsLatency)
143+
}

tunnelrpc/pogs/configuration_manager.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ import (
88
"zombiezen.com/go/capnproto2/rpc"
99
"zombiezen.com/go/capnproto2/server"
1010

11+
"github.com/cloudflare/cloudflared/tunnelrpc/metrics"
1112
"github.com/cloudflare/cloudflared/tunnelrpc/proto"
1213
)
1314

1415
type ConfigurationManager interface {
16+
// UpdateConfiguration is the call provided to cloudflared to load the latest remote configuration.
1517
UpdateConfiguration(ctx context.Context, version int32, config []byte) *UpdateConfigurationResponse
1618
}
1719

@@ -24,6 +26,10 @@ func ConfigurationManager_ServerToClient(c ConfigurationManager) proto.Configura
2426
}
2527

2628
func (i ConfigurationManager_PogsImpl) UpdateConfiguration(p proto.ConfigurationManager_updateConfiguration) error {
29+
return metrics.ObserveServerHandler(func() error { return i.updateConfiguration(p) }, metrics.ConfigurationManager, metrics.OperationUpdateConfiguration)
30+
}
31+
32+
func (i ConfigurationManager_PogsImpl) updateConfiguration(p proto.ConfigurationManager_updateConfiguration) error {
2733
server.Ack(p.Options)
2834

2935
version := p.Params.Version()

tunnelrpc/pogs/registration_server.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,18 @@ import (
1212
"zombiezen.com/go/capnproto2/rpc"
1313
"zombiezen.com/go/capnproto2/server"
1414

15+
"github.com/cloudflare/cloudflared/tunnelrpc/metrics"
1516
"github.com/cloudflare/cloudflared/tunnelrpc/proto"
1617
)
1718

1819
type RegistrationServer interface {
20+
// RegisterConnection is the call typically handled by the edge to initiate and authenticate a new connection
21+
// for cloudflared.
1922
RegisterConnection(ctx context.Context, auth TunnelAuth, tunnelID uuid.UUID, connIndex byte, options *ConnectionOptions) (*ConnectionDetails, error)
23+
// UnregisterConnection is the call typically handled by the edge to close an existing connection for cloudflared.
2024
UnregisterConnection(ctx context.Context)
25+
// UpdateLocalConfiguration is the call typically handled by the edge for cloudflared to provide the current
26+
// configuration it is operating with.
2127
UpdateLocalConfiguration(ctx context.Context, config []byte) error
2228
}
2329

@@ -30,6 +36,10 @@ func RegistrationServer_ServerToClient(s RegistrationServer) proto.RegistrationS
3036
}
3137

3238
func (i RegistrationServer_PogsImpl) RegisterConnection(p proto.RegistrationServer_registerConnection) error {
39+
return metrics.ObserveServerHandler(func() error { return i.registerConnection(p) }, metrics.Registration, metrics.OperationRegisterConnection)
40+
}
41+
42+
func (i RegistrationServer_PogsImpl) registerConnection(p proto.RegistrationServer_registerConnection) error {
3343
server.Ack(p.Options)
3444

3545
auth, err := p.Params.Auth()
@@ -83,13 +93,18 @@ func (i RegistrationServer_PogsImpl) RegisterConnection(p proto.RegistrationServ
8393
}
8494

8595
func (i RegistrationServer_PogsImpl) UnregisterConnection(p proto.RegistrationServer_unregisterConnection) error {
86-
server.Ack(p.Options)
96+
return metrics.ObserveServerHandler(func() error {
97+
server.Ack(p.Options)
98+
i.impl.UnregisterConnection(p.Ctx)
99+
return nil // No metrics will be reported for failure as this method has no return value
100+
}, metrics.Registration, metrics.OperationUnregisterConnection)
101+
}
87102

88-
i.impl.UnregisterConnection(p.Ctx)
89-
return nil
103+
func (i RegistrationServer_PogsImpl) UpdateLocalConfiguration(p proto.RegistrationServer_updateLocalConfiguration) error {
104+
return metrics.ObserveServerHandler(func() error { return i.updateLocalConfiguration(p) }, metrics.Registration, metrics.OperationUpdateLocalConfiguration)
90105
}
91106

92-
func (i RegistrationServer_PogsImpl) UpdateLocalConfiguration(c proto.RegistrationServer_updateLocalConfiguration) error {
107+
func (i RegistrationServer_PogsImpl) updateLocalConfiguration(c proto.RegistrationServer_updateLocalConfiguration) error {
93108
server.Ack(c.Options)
94109

95110
configBytes, err := c.Params.Config()

tunnelrpc/pogs/session_manager.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"zombiezen.com/go/capnproto2/rpc"
1212
"zombiezen.com/go/capnproto2/server"
1313

14+
"github.com/cloudflare/cloudflared/tunnelrpc/metrics"
1415
"github.com/cloudflare/cloudflared/tunnelrpc/proto"
1516
)
1617

@@ -32,6 +33,10 @@ func SessionManager_ServerToClient(s SessionManager) proto.SessionManager {
3233
}
3334

3435
func (i SessionManager_PogsImpl) RegisterUdpSession(p proto.SessionManager_registerUdpSession) error {
36+
return metrics.ObserveServerHandler(func() error { return i.registerUdpSession(p) }, metrics.SessionManager, metrics.OperationRegisterUdpSession)
37+
}
38+
39+
func (i SessionManager_PogsImpl) registerUdpSession(p proto.SessionManager_registerUdpSession) error {
3540
server.Ack(p.Options)
3641

3742
sessionIDRaw, err := p.Params.SessionId()
@@ -78,6 +83,10 @@ func (i SessionManager_PogsImpl) RegisterUdpSession(p proto.SessionManager_regis
7883
}
7984

8085
func (i SessionManager_PogsImpl) UnregisterUdpSession(p proto.SessionManager_unregisterUdpSession) error {
86+
return metrics.ObserveServerHandler(func() error { return i.unregisterUdpSession(p) }, metrics.SessionManager, metrics.OperationUnregisterUdpSession)
87+
}
88+
89+
func (i SessionManager_PogsImpl) unregisterUdpSession(p proto.SessionManager_unregisterUdpSession) error {
8190
server.Ack(p.Options)
8291

8392
sessionIDRaw, err := p.Params.SessionId()

tunnelrpc/quic/cloudflared_client.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/google/uuid"
1313

1414
"github.com/cloudflare/cloudflared/tunnelrpc"
15+
"github.com/cloudflare/cloudflared/tunnelrpc/metrics"
1516
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
1617
)
1718

@@ -43,19 +44,43 @@ func NewCloudflaredClient(ctx context.Context, stream io.ReadWriteCloser, reques
4344
func (c *CloudflaredClient) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfterHint time.Duration, traceContext string) (*pogs.RegisterUdpSessionResponse, error) {
4445
ctx, cancel := context.WithTimeout(ctx, c.requestTimeout)
4546
defer cancel()
46-
return c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext)
47+
defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Cloudflared, metrics.OperationRegisterUdpSession).Inc()
48+
timer := metrics.NewClientOperationLatencyObserver(metrics.Cloudflared, metrics.OperationRegisterUdpSession)
49+
defer timer.ObserveDuration()
50+
51+
resp, err := c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext)
52+
if err != nil {
53+
metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Cloudflared, metrics.OperationRegisterUdpSession).Inc()
54+
}
55+
return resp, err
4756
}
4857

4958
func (c *CloudflaredClient) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error {
5059
ctx, cancel := context.WithTimeout(ctx, c.requestTimeout)
5160
defer cancel()
52-
return c.client.UnregisterUdpSession(ctx, sessionID, message)
61+
defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Cloudflared, metrics.OperationUnregisterUdpSession).Inc()
62+
timer := metrics.NewClientOperationLatencyObserver(metrics.Cloudflared, metrics.OperationUnregisterUdpSession)
63+
defer timer.ObserveDuration()
64+
65+
err := c.client.UnregisterUdpSession(ctx, sessionID, message)
66+
if err != nil {
67+
metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Cloudflared, metrics.OperationUnregisterUdpSession).Inc()
68+
}
69+
return err
5370
}
5471

5572
func (c *CloudflaredClient) UpdateConfiguration(ctx context.Context, version int32, config []byte) (*pogs.UpdateConfigurationResponse, error) {
5673
ctx, cancel := context.WithTimeout(ctx, c.requestTimeout)
5774
defer cancel()
58-
return c.client.UpdateConfiguration(ctx, version, config)
75+
defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Cloudflared, metrics.OperationUpdateConfiguration).Inc()
76+
timer := metrics.NewClientOperationLatencyObserver(metrics.Cloudflared, metrics.OperationUpdateConfiguration)
77+
defer timer.ObserveDuration()
78+
79+
resp, err := c.client.UpdateConfiguration(ctx, version, config)
80+
if err != nil {
81+
metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Cloudflared, metrics.OperationUpdateConfiguration).Inc()
82+
}
83+
return resp, err
5984
}
6085

6186
func (c *CloudflaredClient) Close() {

tunnelrpc/quic/session_client.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"zombiezen.com/go/capnproto2/rpc"
1212

1313
"github.com/cloudflare/cloudflared/tunnelrpc"
14+
"github.com/cloudflare/cloudflared/tunnelrpc/metrics"
1415
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
1516
)
1617

@@ -41,13 +42,29 @@ func NewSessionClient(ctx context.Context, stream io.ReadWriteCloser, requestTim
4142
func (c *SessionClient) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfterHint time.Duration, traceContext string) (*pogs.RegisterUdpSessionResponse, error) {
4243
ctx, cancel := context.WithTimeout(ctx, c.requestTimeout)
4344
defer cancel()
44-
return c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext)
45+
defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.SessionManager, metrics.OperationRegisterUdpSession).Inc()
46+
timer := metrics.NewClientOperationLatencyObserver(metrics.SessionManager, metrics.OperationRegisterUdpSession)
47+
defer timer.ObserveDuration()
48+
49+
resp, err := c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext)
50+
if err != nil {
51+
metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.SessionManager, metrics.OperationRegisterUdpSession).Inc()
52+
}
53+
return resp, err
4554
}
4655

4756
func (c *SessionClient) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error {
4857
ctx, cancel := context.WithTimeout(ctx, c.requestTimeout)
4958
defer cancel()
50-
return c.client.UnregisterUdpSession(ctx, sessionID, message)
59+
defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.SessionManager, metrics.OperationUnregisterUdpSession).Inc()
60+
timer := metrics.NewClientOperationLatencyObserver(metrics.SessionManager, metrics.OperationUnregisterUdpSession)
61+
defer timer.ObserveDuration()
62+
63+
err := c.client.UnregisterUdpSession(ctx, sessionID, message)
64+
if err != nil {
65+
metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.SessionManager, metrics.OperationUnregisterUdpSession).Inc()
66+
}
67+
return err
5168
}
5269

5370
func (c *SessionClient) Close() {

0 commit comments

Comments
 (0)