Skip to content

Commit ab3dc5f

Browse files
committed
TUN-8701: Simplify flow registration logs for datagram v3
To help reduce the volume of logs during the happy path of flow registration, there will only be one log message reported when a flow is completed. There are additional fields added to all flow log messages: 1. `src`: local address 2. `dst`: origin address 3. `durationMS`: capturing the total duration of the flow in milliseconds Additional logs were added to capture when a flow was migrated or when cloudflared sent off a registration response retry. Closes TUN-8701
1 parent 1f3e304 commit ab3dc5f

File tree

6 files changed

+108
-55
lines changed

6 files changed

+108
-55
lines changed

ingress/origin_udp_proxy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func DialUDPAddrPort(dest netip.AddrPort) (*net.UDPConn, error) {
3939
// address as context.
4040
udpConn, err := net.DialUDP("udp", nil, addr)
4141
if err != nil {
42-
return nil, fmt.Errorf("unable to create UDP proxy to origin (%v:%v): %w", dest.Addr(), dest.Port(), err)
42+
return nil, fmt.Errorf("unable to dial udp to origin %s: %w", dest, err)
4343
}
4444

4545
return udpConn, nil

quic/v3/manager.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"sync"
88

99
"github.com/rs/zerolog"
10-
11-
"github.com/cloudflare/cloudflared/ingress"
1210
)
1311

1412
var (
@@ -37,17 +35,19 @@ type SessionManager interface {
3735
type DialUDP func(dest netip.AddrPort) (*net.UDPConn, error)
3836

3937
type sessionManager struct {
40-
sessions map[RequestID]Session
41-
mutex sync.RWMutex
42-
metrics Metrics
43-
log *zerolog.Logger
38+
sessions map[RequestID]Session
39+
mutex sync.RWMutex
40+
originDialer DialUDP
41+
metrics Metrics
42+
log *zerolog.Logger
4443
}
4544

4645
func NewSessionManager(metrics Metrics, log *zerolog.Logger, originDialer DialUDP) SessionManager {
4746
return &sessionManager{
48-
sessions: make(map[RequestID]Session),
49-
metrics: metrics,
50-
log: log,
47+
sessions: make(map[RequestID]Session),
48+
originDialer: originDialer,
49+
metrics: metrics,
50+
log: log,
5151
}
5252
}
5353

@@ -62,12 +62,20 @@ func (s *sessionManager) RegisterSession(request *UDPSessionRegistrationDatagram
6262
return nil, ErrSessionBoundToOtherConn
6363
}
6464
// Attempt to bind the UDP socket for the new session
65-
origin, err := ingress.DialUDPAddrPort(request.Dest)
65+
origin, err := s.originDialer(request.Dest)
6666
if err != nil {
6767
return nil, err
6868
}
6969
// Create and insert the new session in the map
70-
session := NewSession(request.RequestID, request.IdleDurationHint, origin, conn, s.metrics, s.log)
70+
session := NewSession(
71+
request.RequestID,
72+
request.IdleDurationHint,
73+
origin,
74+
origin.RemoteAddr(),
75+
origin.LocalAddr(),
76+
conn,
77+
s.metrics,
78+
s.log)
7179
s.sessions[request.RequestID] = session
7280
return session, nil
7381
}

quic/v3/muxer.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package v3
33
import (
44
"context"
55
"errors"
6+
"time"
67

78
"github.com/rs/zerolog"
89
)
@@ -11,6 +12,10 @@ const (
1112
// Allocating a 16 channel buffer here allows for the writer to be slightly faster than the reader.
1213
// This has worked previously well for datagramv2, so we will start with this as well
1314
demuxChanCapacity = 16
15+
16+
logSrcKey = "src"
17+
logDstKey = "dst"
18+
logDurationKey = "durationMS"
1419
)
1520

1621
// DatagramConn is the bridge that multiplexes writes and reads of datagrams for UDP sessions and ICMP packets to
@@ -174,23 +179,28 @@ func (c *datagramConn) Serve(ctx context.Context) error {
174179

175180
// This method handles new registrations of a session and the serve loop for the session.
176181
func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, datagram *UDPSessionRegistrationDatagram, logger *zerolog.Logger) {
182+
log := logger.With().
183+
Str(logFlowID, datagram.RequestID.String()).
184+
Str(logDstKey, datagram.Dest.String()).
185+
Logger()
177186
session, err := c.sessionManager.RegisterSession(datagram, c)
178187
switch err {
179188
case nil:
180189
// Continue as normal
181190
case ErrSessionAlreadyRegistered:
182191
// Session is already registered and likely the response got lost
183-
c.handleSessionAlreadyRegistered(datagram.RequestID, logger)
192+
c.handleSessionAlreadyRegistered(datagram.RequestID, &log)
184193
return
185194
case ErrSessionBoundToOtherConn:
186195
// Session is already registered but to a different connection
187-
c.handleSessionMigration(datagram.RequestID, logger)
196+
c.handleSessionMigration(datagram.RequestID, &log)
188197
return
189198
default:
190-
logger.Err(err).Msgf("flow registration failure")
191-
c.handleSessionRegistrationFailure(datagram.RequestID, logger)
199+
log.Err(err).Msgf("flow registration failure")
200+
c.handleSessionRegistrationFailure(datagram.RequestID, &log)
192201
return
193202
}
203+
log = log.With().Str(logSrcKey, session.LocalAddr().String()).Logger()
194204
c.metrics.IncrementFlows()
195205
// Make sure to eventually remove the session from the session manager when the session is closed
196206
defer c.sessionManager.UnregisterSession(session.ID())
@@ -199,27 +209,30 @@ func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, da
199209
// Respond that we are able to process the new session
200210
err = c.SendUDPSessionResponse(datagram.RequestID, ResponseOk)
201211
if err != nil {
202-
logger.Err(err).Msgf("flow registration failure: unable to send session registration response")
212+
log.Err(err).Msgf("flow registration failure: unable to send session registration response")
203213
return
204214
}
205215

206216
// We bind the context of the session to the [quic.Connection] that initiated the session.
207217
// [Session.Serve] is blocking and will continue this go routine till the end of the session lifetime.
218+
start := time.Now()
208219
err = session.Serve(ctx)
220+
elapsedMS := time.Now().Sub(start).Milliseconds()
221+
log = log.With().Int64(logDurationKey, elapsedMS).Logger()
209222
if err == nil {
210223
// We typically don't expect a session to close without some error response. [SessionIdleErr] is the typical
211224
// expected error response.
212-
logger.Warn().Msg("flow was closed without explicit close or timeout")
225+
log.Warn().Msg("flow closed: no explicit close or timeout elapsed")
213226
return
214227
}
215228
// SessionIdleErr and SessionCloseErr are valid and successful error responses to end a session.
216229
if errors.Is(err, SessionIdleErr{}) || errors.Is(err, SessionCloseErr) {
217-
logger.Debug().Msg(err.Error())
230+
log.Debug().Msgf("flow closed: %s", err.Error())
218231
return
219232
}
220233

221234
// All other errors should be reported as errors
222-
logger.Err(err).Msgf("flow was closed with an error")
235+
log.Err(err).Msgf("flow closed with an error")
223236
}
224237

225238
func (c *datagramConn) handleSessionAlreadyRegistered(requestID RequestID, logger *zerolog.Logger) {
@@ -240,6 +253,7 @@ func (c *datagramConn) handleSessionAlreadyRegistered(requestID RequestID, logge
240253
// packets have come down yet.
241254
session.ResetIdleTimer()
242255
c.metrics.RetryFlowResponse()
256+
logger.Debug().Msgf("flow registration response retry")
243257
}
244258

245259
func (c *datagramConn) handleSessionMigration(requestID RequestID, logger *zerolog.Logger) {
@@ -252,14 +266,16 @@ func (c *datagramConn) handleSessionMigration(requestID RequestID, logger *zerol
252266
}
253267

254268
// Migrate the session to use this edge connection instead of the currently running one.
255-
session.Migrate(c)
269+
// We also pass in this connection's logger to override the existing logger for the session.
270+
session.Migrate(c, c.logger)
256271

257272
// Send another registration response since the session is already active
258273
err = c.SendUDPSessionResponse(requestID, ResponseOk)
259274
if err != nil {
260275
logger.Err(err).Msgf("flow registration failure: unable to send an additional flow registration response")
261276
return
262277
}
278+
logger.Debug().Msgf("flow registration migration")
263279
}
264280

265281
func (c *datagramConn) handleSessionRegistrationFailure(requestID RequestID, logger *zerolog.Logger) {

quic/v3/muxer_test.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -619,16 +619,12 @@ func newMockSession() mockSession {
619619
}
620620
}
621621

622-
func (m *mockSession) ID() v3.RequestID {
623-
return testRequestID
624-
}
625-
626-
func (m *mockSession) ConnectionID() uint8 {
627-
return 0
628-
}
629-
630-
func (m *mockSession) Migrate(conn v3.DatagramConn) { m.migrated <- conn.ID() }
631-
func (m *mockSession) ResetIdleTimer() {}
622+
func (m *mockSession) ID() v3.RequestID { return testRequestID }
623+
func (m *mockSession) RemoteAddr() net.Addr { return testOriginAddr }
624+
func (m *mockSession) LocalAddr() net.Addr { return testLocalAddr }
625+
func (m *mockSession) ConnectionID() uint8 { return 0 }
626+
func (m *mockSession) Migrate(conn v3.DatagramConn, log *zerolog.Logger) { m.migrated <- conn.ID() }
627+
func (m *mockSession) ResetIdleTimer() {}
632628

633629
func (m *mockSession) Serve(ctx context.Context) error {
634630
close(m.served)

quic/v3/session.go

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ const (
2222
// this value (maxDatagramPayloadLen).
2323
maxOriginUDPPacketSize = 1500
2424

25-
logFlowID = "flowID"
25+
logFlowID = "flowID"
26+
logPacketSizeKey = "packetSize"
2627
)
2728

2829
// SessionCloseErr indicates that the session's Close method was called.
29-
var SessionCloseErr error = errors.New("flow was closed")
30+
var SessionCloseErr error = errors.New("flow was closed directly")
3031

3132
// SessionIdleErr is returned when the session was closed because there was no communication
3233
// in either direction over the session for the timeout period.
@@ -35,7 +36,7 @@ type SessionIdleErr struct {
3536
}
3637

3738
func (e SessionIdleErr) Error() string {
38-
return fmt.Sprintf("flow idle for %v", e.timeout)
39+
return fmt.Sprintf("flow was idle for %v", e.timeout)
3940
}
4041

4142
func (e SessionIdleErr) Is(target error) bool {
@@ -51,8 +52,10 @@ type Session interface {
5152
io.WriteCloser
5253
ID() RequestID
5354
ConnectionID() uint8
55+
RemoteAddr() net.Addr
56+
LocalAddr() net.Addr
5457
ResetIdleTimer()
55-
Migrate(eyeball DatagramConn)
58+
Migrate(eyeball DatagramConn, logger *zerolog.Logger)
5659
// Serve starts the event loop for processing UDP packets
5760
Serve(ctx context.Context) error
5861
}
@@ -61,6 +64,8 @@ type session struct {
6164
id RequestID
6265
closeAfterIdle time.Duration
6366
origin io.ReadWriteCloser
67+
originAddr net.Addr
68+
localAddr net.Addr
6469
eyeball atomic.Pointer[DatagramConn]
6570
// activeAtChan is used to communicate the last read/write time
6671
activeAtChan chan time.Time
@@ -69,12 +74,23 @@ type session struct {
6974
log *zerolog.Logger
7075
}
7176

72-
func NewSession(id RequestID, closeAfterIdle time.Duration, origin io.ReadWriteCloser, eyeball DatagramConn, metrics Metrics, log *zerolog.Logger) Session {
77+
func NewSession(
78+
id RequestID,
79+
closeAfterIdle time.Duration,
80+
origin io.ReadWriteCloser,
81+
originAddr net.Addr,
82+
localAddr net.Addr,
83+
eyeball DatagramConn,
84+
metrics Metrics,
85+
log *zerolog.Logger,
86+
) Session {
7387
logger := log.With().Str(logFlowID, id.String()).Logger()
7488
session := &session{
7589
id: id,
7690
closeAfterIdle: closeAfterIdle,
7791
origin: origin,
92+
originAddr: originAddr,
93+
localAddr: localAddr,
7894
eyeball: atomic.Pointer[DatagramConn]{},
7995
// activeAtChan has low capacity. It can be full when there are many concurrent read/write. markActive() will
8096
// drop instead of blocking because last active time only needs to be an approximation
@@ -91,16 +107,26 @@ func (s *session) ID() RequestID {
91107
return s.id
92108
}
93109

110+
func (s *session) RemoteAddr() net.Addr {
111+
return s.originAddr
112+
}
113+
114+
func (s *session) LocalAddr() net.Addr {
115+
return s.localAddr
116+
}
117+
94118
func (s *session) ConnectionID() uint8 {
95119
eyeball := *(s.eyeball.Load())
96120
return eyeball.ID()
97121
}
98122

99-
func (s *session) Migrate(eyeball DatagramConn) {
123+
func (s *session) Migrate(eyeball DatagramConn, logger *zerolog.Logger) {
100124
current := *(s.eyeball.Load())
101125
// Only migrate if the connection ids are different.
102126
if current.ID() != eyeball.ID() {
103127
s.eyeball.Store(&eyeball)
128+
log := logger.With().Str(logFlowID, s.id.String()).Logger()
129+
s.log = &log
104130
}
105131
// The session is already running so we want to restart the idle timeout since no proxied packets have come down yet.
106132
s.markActive()
@@ -119,20 +145,21 @@ func (s *session) Serve(ctx context.Context) error {
119145
for {
120146
// Read from the origin UDP socket
121147
n, err := s.origin.Read(readBuffer[DatagramPayloadHeaderLen:])
122-
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
123-
s.log.Debug().Msg("Flow (origin) connection closed")
124-
}
125148
if err != nil {
149+
if errors.Is(err, io.EOF) ||
150+
errors.Is(err, io.ErrUnexpectedEOF) {
151+
s.log.Debug().Msgf("flow (origin) connection closed: %v", err)
152+
}
126153
s.closeChan <- err
127154
return
128155
}
129156
if n < 0 {
130-
s.log.Warn().Int("packetSize", n).Msg("Flow (origin) packet read was negative and was dropped")
157+
s.log.Warn().Int(logPacketSizeKey, n).Msg("flow (origin) packet read was negative and was dropped")
131158
continue
132159
}
133160
if n > maxDatagramPayloadLen {
134161
s.metrics.PayloadTooLarge()
135-
s.log.Error().Int("packetSize", n).Msg("Flow (origin) packet read was too large and was dropped")
162+
s.log.Error().Int(logPacketSizeKey, n).Msg("flow (origin) packet read was too large and was dropped")
136163
continue
137164
}
138165
// We need to synchronize on the eyeball in-case that the connection was migrated. This should be rarely a point
@@ -155,12 +182,12 @@ func (s *session) Serve(ctx context.Context) error {
155182
func (s *session) Write(payload []byte) (n int, err error) {
156183
n, err = s.origin.Write(payload)
157184
if err != nil {
158-
s.log.Err(err).Msg("Failed to write payload to flow (remote)")
185+
s.log.Err(err).Msg("failed to write payload to flow (remote)")
159186
return n, err
160187
}
161188
// Write must return a non-nil error if it returns n < len(p). https://pkg.go.dev/io#Writer
162189
if n < len(payload) {
163-
s.log.Err(io.ErrShortWrite).Msg("Failed to write the full payload to flow (remote)")
190+
s.log.Err(io.ErrShortWrite).Msg("failed to write the full payload to flow (remote)")
164191
return n, io.ErrShortWrite
165192
}
166193
// Mark the session as active since we proxied a packet to the origin.

0 commit comments

Comments
 (0)