Skip to content

Commit 1cc15c6

Browse files
committed
TUN-9882: Improve metrics for datagram v3
Adds new metrics for: - Dropped UDP datagrams for reads and write paths - Dropped ICMP packets for write paths - Failures that preemptively close UDP flows Closes TUN-9882
1 parent 51c5ef7 commit 1cc15c6

File tree

4 files changed

+94
-26
lines changed

4 files changed

+94
-26
lines changed

quic/v3/metrics.go

Lines changed: 71 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,28 +9,59 @@ import (
99
)
1010

1111
const (
12-
namespace = "cloudflared"
13-
subsystem = "udp"
12+
namespace = "cloudflared"
13+
subsystem_udp = "udp"
14+
subsystem_icmp = "icmp"
1415

1516
commandMetricLabel = "command"
17+
reasonMetricLabel = "reason"
1618
)
1719

20+
type DroppedReason int
21+
22+
const (
23+
DroppedWriteFailed DroppedReason = iota
24+
DroppedWriteDeadlineExceeded
25+
DroppedWriteFull
26+
DroppedWriteFlowUnknown
27+
DroppedReadFailed
28+
// Origin payloads that are too large to proxy.
29+
DroppedReadTooLarge
30+
)
31+
32+
var droppedReason = map[DroppedReason]string{
33+
DroppedWriteFailed: "write_failed",
34+
DroppedWriteDeadlineExceeded: "write_deadline_exceeded",
35+
DroppedWriteFull: "write_full",
36+
DroppedWriteFlowUnknown: "write_flow_unknown",
37+
DroppedReadFailed: "read_failed",
38+
DroppedReadTooLarge: "read_too_large",
39+
}
40+
41+
func (dr DroppedReason) String() string {
42+
return droppedReason[dr]
43+
}
44+
1845
type Metrics interface {
1946
IncrementFlows(connIndex uint8)
2047
DecrementFlows(connIndex uint8)
21-
PayloadTooLarge(connIndex uint8)
48+
FailedFlow(connIndex uint8)
2249
RetryFlowResponse(connIndex uint8)
2350
MigrateFlow(connIndex uint8)
2451
UnsupportedRemoteCommand(connIndex uint8, command string)
52+
DroppedUDPDatagram(connIndex uint8, reason DroppedReason)
53+
DroppedICMPPackets(connIndex uint8, reason DroppedReason)
2554
}
2655

2756
type metrics struct {
2857
activeUDPFlows *prometheus.GaugeVec
2958
totalUDPFlows *prometheus.CounterVec
30-
payloadTooLarge *prometheus.CounterVec
3159
retryFlowResponses *prometheus.CounterVec
3260
migratedFlows *prometheus.CounterVec
3361
unsupportedRemoteCommands *prometheus.CounterVec
62+
droppedUDPDatagrams *prometheus.CounterVec
63+
droppedICMPPackets *prometheus.CounterVec
64+
failedFlows *prometheus.CounterVec
3465
}
3566

3667
func (m *metrics) IncrementFlows(connIndex uint8) {
@@ -42,8 +73,8 @@ func (m *metrics) DecrementFlows(connIndex uint8) {
4273
m.activeUDPFlows.WithLabelValues(fmt.Sprintf("%d", connIndex)).Dec()
4374
}
4475

45-
func (m *metrics) PayloadTooLarge(connIndex uint8) {
46-
m.payloadTooLarge.WithLabelValues(fmt.Sprintf("%d", connIndex)).Inc()
76+
func (m *metrics) FailedFlow(connIndex uint8) {
77+
m.failedFlows.WithLabelValues(fmt.Sprintf("%d", connIndex)).Inc()
4778
}
4879

4980
func (m *metrics) RetryFlowResponse(connIndex uint8) {
@@ -58,52 +89,74 @@ func (m *metrics) UnsupportedRemoteCommand(connIndex uint8, command string) {
5889
m.unsupportedRemoteCommands.WithLabelValues(fmt.Sprintf("%d", connIndex), command).Inc()
5990
}
6091

92+
func (m *metrics) DroppedUDPDatagram(connIndex uint8, reason DroppedReason) {
93+
m.droppedUDPDatagrams.WithLabelValues(fmt.Sprintf("%d", connIndex), reason.String()).Inc()
94+
}
95+
96+
func (m *metrics) DroppedICMPPackets(connIndex uint8, reason DroppedReason) {
97+
m.droppedICMPPackets.WithLabelValues(fmt.Sprintf("%d", connIndex), reason.String()).Inc()
98+
}
99+
61100
func NewMetrics(registerer prometheus.Registerer) Metrics {
62101
m := &metrics{
63102
activeUDPFlows: prometheus.NewGaugeVec(prometheus.GaugeOpts{
64103
Namespace: namespace,
65-
Subsystem: subsystem,
104+
Subsystem: subsystem_udp,
66105
Name: "active_flows",
67106
Help: "Concurrent count of UDP flows that are being proxied to any origin",
68107
}, []string{quic.ConnectionIndexMetricLabel}),
69108
totalUDPFlows: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
70109
Namespace: namespace,
71-
Subsystem: subsystem,
110+
Subsystem: subsystem_udp,
72111
Name: "total_flows",
73112
Help: "Total count of UDP flows that have been proxied to any origin",
74113
}, []string{quic.ConnectionIndexMetricLabel}),
75-
payloadTooLarge: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
114+
failedFlows: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
76115
Namespace: namespace,
77-
Subsystem: subsystem,
78-
Name: "payload_too_large",
79-
Help: "Total count of UDP flows that have had origin payloads that are too large to proxy",
116+
Subsystem: subsystem_udp,
117+
Name: "failed_flows",
118+
Help: "Total count of flows that errored and closed",
80119
}, []string{quic.ConnectionIndexMetricLabel}),
81120
retryFlowResponses: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
82121
Namespace: namespace,
83-
Subsystem: subsystem,
122+
Subsystem: subsystem_udp,
84123
Name: "retry_flow_responses",
85124
Help: "Total count of UDP flows that have had to send their registration response more than once",
86125
}, []string{quic.ConnectionIndexMetricLabel}),
87126
migratedFlows: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
88127
Namespace: namespace,
89-
Subsystem: subsystem,
128+
Subsystem: subsystem_udp,
90129
Name: "migrated_flows",
91130
Help: "Total count of UDP flows have been migrated across local connections",
92131
}, []string{quic.ConnectionIndexMetricLabel}),
93-
unsupportedRemoteCommands: prometheus.NewCounterVec(prometheus.CounterOpts{
132+
unsupportedRemoteCommands: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
94133
Namespace: namespace,
95-
Subsystem: subsystem,
134+
Subsystem: subsystem_udp,
96135
Name: "unsupported_remote_command_total",
97-
Help: "Total count of unsupported remote RPC commands for the ",
136+
Help: "Total count of unsupported remote RPC commands called",
98137
}, []string{quic.ConnectionIndexMetricLabel, commandMetricLabel}),
138+
droppedUDPDatagrams: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
139+
Namespace: namespace,
140+
Subsystem: subsystem_udp,
141+
Name: "dropped_datagrams",
142+
Help: "Total count of UDP dropped datagrams",
143+
}, []string{quic.ConnectionIndexMetricLabel, reasonMetricLabel}),
144+
droppedICMPPackets: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter
145+
Namespace: namespace,
146+
Subsystem: subsystem_icmp,
147+
Name: "dropped_packets",
148+
Help: "Total count of ICMP dropped datagrams",
149+
}, []string{quic.ConnectionIndexMetricLabel, reasonMetricLabel}),
99150
}
100151
registerer.MustRegister(
101152
m.activeUDPFlows,
102153
m.totalUDPFlows,
103-
m.payloadTooLarge,
154+
m.failedFlows,
104155
m.retryFlowResponses,
105156
m.migratedFlows,
106157
m.unsupportedRemoteCommands,
158+
m.droppedUDPDatagrams,
159+
m.droppedICMPPackets,
107160
)
108161
return m
109162
}

quic/v3/metrics_test.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package v3_test
22

3+
import v3 "github.com/cloudflare/cloudflared/quic/v3"
4+
35
type noopMetrics struct{}
46

5-
func (noopMetrics) IncrementFlows(connIndex uint8) {}
6-
func (noopMetrics) DecrementFlows(connIndex uint8) {}
7-
func (noopMetrics) PayloadTooLarge(connIndex uint8) {}
8-
func (noopMetrics) RetryFlowResponse(connIndex uint8) {}
9-
func (noopMetrics) MigrateFlow(connIndex uint8) {}
10-
func (noopMetrics) UnsupportedRemoteCommand(connIndex uint8, command string) {}
7+
func (noopMetrics) IncrementFlows(connIndex uint8) {}
8+
func (noopMetrics) DecrementFlows(connIndex uint8) {}
9+
func (noopMetrics) FailedFlow(connIndex uint8) {}
10+
func (noopMetrics) PayloadTooLarge(connIndex uint8) {}
11+
func (noopMetrics) RetryFlowResponse(connIndex uint8) {}
12+
func (noopMetrics) MigrateFlow(connIndex uint8) {}
13+
func (noopMetrics) UnsupportedRemoteCommand(connIndex uint8, command string) {}
14+
func (noopMetrics) DroppedUDPDatagram(connIndex uint8, reason v3.DroppedReason) {}
15+
func (noopMetrics) DroppedICMPPackets(connIndex uint8, reason v3.DroppedReason) {}

quic/v3/muxer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ func (c *datagramConn) handleSessionRegistrationRateLimited(datagram *UDPSession
368368
func (c *datagramConn) handleSessionPayloadDatagram(datagram *UDPSessionPayloadDatagram, logger *zerolog.Logger) {
369369
s, err := c.sessionManager.GetSession(datagram.RequestID)
370370
if err != nil {
371+
c.metrics.DroppedUDPDatagram(c.index, DroppedWriteFlowUnknown)
371372
logger.Err(err).Msgf("unable to find flow")
372373
return
373374
}
@@ -384,6 +385,7 @@ func (c *datagramConn) handleICMPPacket(datagram *ICMPDatagram) {
384385
case c.icmpDatagramChan <- datagram:
385386
default:
386387
// If the ICMP datagram channel is full, drop any additional incoming.
388+
c.metrics.DroppedICMPPackets(c.index, DroppedWriteFull)
387389
c.logger.Warn().Msg("failed to write icmp packet to origin: dropped")
388390
}
389391
}
@@ -413,20 +415,23 @@ func (c *datagramConn) writeICMPPacket(datagram *ICMPDatagram) {
413415
defer c.icmpDecoderPool.Put(cachedDecoder)
414416
decoder, ok := cachedDecoder.(*packet.ICMPDecoder)
415417
if !ok {
418+
c.metrics.DroppedICMPPackets(c.index, DroppedWriteFailed)
416419
c.logger.Error().Msg("Could not get ICMPDecoder from the pool. Dropping packet")
417420
return
418421
}
419422

420423
icmp, err := decoder.Decode(rawPacket)
421424

422425
if err != nil {
426+
c.metrics.DroppedICMPPackets(c.index, DroppedWriteFailed)
423427
c.logger.Err(err).Msgf("unable to marshal icmp packet")
424428
return
425429
}
426430

427431
// If the ICMP packet's TTL is expired, we won't send it to the origin and immediately return a TTL Exceeded Message
428432
if icmp.TTL <= 1 {
429433
if err := c.SendICMPTTLExceed(icmp, rawPacket); err != nil {
434+
c.metrics.DroppedICMPPackets(c.index, DroppedWriteFailed)
430435
c.logger.Err(err).Msg("failed to return ICMP TTL exceed error")
431436
}
432437
return
@@ -438,6 +443,7 @@ func (c *datagramConn) writeICMPPacket(datagram *ICMPDatagram) {
438443
// connection context which will have no tracing information available.
439444
err = c.icmpRouter.Request(c.conn.Context(), icmp, newPacketResponder(c, c.index))
440445
if err != nil {
446+
c.metrics.DroppedICMPPackets(c.index, DroppedWriteFailed)
441447
c.logger.Err(err).
442448
Str(logSrcKey, icmp.Src.String()).
443449
Str(logDstKey, icmp.Dst.String()).

quic/v3/session.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,12 +199,12 @@ func (s *session) readLoop() {
199199
return
200200
}
201201
if n < 0 {
202+
s.metrics.DroppedUDPDatagram(s.ConnectionID(), DroppedReadFailed)
202203
s.log.Warn().Int(logPacketSizeKey, n).Msg("flow (origin) packet read was negative and was dropped")
203204
continue
204205
}
205206
if n > maxDatagramPayloadLen {
206-
connectionIndex := s.ConnectionID()
207-
s.metrics.PayloadTooLarge(connectionIndex)
207+
s.metrics.DroppedUDPDatagram(s.ConnectionID(), DroppedReadTooLarge)
208208
s.log.Error().Int(logPacketSizeKey, n).Msg("flow (origin) packet read was too large and was dropped")
209209
continue
210210
}
@@ -227,6 +227,7 @@ func (s *session) Write(payload []byte) {
227227
select {
228228
case s.writeChan <- payload:
229229
default:
230+
s.metrics.DroppedUDPDatagram(s.ConnectionID(), DroppedWriteFull)
230231
s.log.Error().Msg("failed to write flow payload to origin: dropped")
231232
}
232233
}
@@ -244,6 +245,7 @@ func (s *session) writeLoop() {
244245
if err != nil {
245246
// Check if this is a write deadline exceeded to the connection
246247
if errors.Is(err, os.ErrDeadlineExceeded) {
248+
s.metrics.DroppedUDPDatagram(s.ConnectionID(), DroppedWriteDeadlineExceeded)
247249
s.log.Warn().Err(err).Msg("flow (write) deadline exceeded: dropping packet")
248250
continue
249251
}
@@ -257,6 +259,7 @@ func (s *session) writeLoop() {
257259
}
258260
// Write must return a non-nil error if it returns n < len(p). https://pkg.go.dev/io#Writer
259261
if n < len(payload) {
262+
s.metrics.DroppedUDPDatagram(s.ConnectionID(), DroppedWriteFailed)
260263
s.log.Err(io.ErrShortWrite).Msg("failed to write the full flow payload to origin")
261264
continue
262265
}
@@ -330,6 +333,7 @@ func (s *session) waitForCloseCondition(ctx context.Context, closeAfterIdle time
330333
case reason := <-s.errChan:
331334
// Any error returned here is from the read or write loops indicating that it can no longer process datagrams
332335
// and as such the session needs to close.
336+
s.metrics.FailedFlow(s.ConnectionID())
333337
return reason
334338
case <-checkIdleTimer.C:
335339
// The check idle timer will only return after an idle period since the last active

0 commit comments

Comments
 (0)