Skip to content

Commit 6a6c890

Browse files
committed
TUN-8667: Add datagram v3 session manager
New session manager leverages similar functionality that was previously provided with datagram v2, with the distinct difference that the sessions are registered via QUIC Datagrams and unregistered via timeouts only; the sessions will no longer attempt to unregister sessions remotely with the edge service. The Session Manager is shared across all QUIC connections that cloudflared uses to connect to the edge (typically 4). This will help cloudflared be able to monitor all sessions across the connections and help correlate in the future if sessions migrate across connections. The UDP payload size is still limited to 1280 bytes across all OS's. Any UDP packet that provides a payload size of greater than 1280 will cause cloudflared to report (as it currently does) a log error and drop the packet. Closes TUN-8667
1 parent 599ba52 commit 6a6c890

File tree

11 files changed

+743
-7
lines changed

11 files changed

+743
-7
lines changed

ingress/origin_udp_proxy.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"io"
66
"net"
7+
"net/netip"
78
)
89

910
type UDPProxy interface {
@@ -30,3 +31,16 @@ func DialUDP(dstIP net.IP, dstPort uint16) (UDPProxy, error) {
3031

3132
return &udpProxy{udpConn}, nil
3233
}
34+
35+
func DialUDPAddrPort(dest netip.AddrPort) (*net.UDPConn, error) {
36+
addr := net.UDPAddrFromAddrPort(dest)
37+
38+
// We use nil as local addr to force runtime to find the best suitable local address IP given the destination
39+
// address as context.
40+
udpConn, err := net.DialUDP("udp", nil, addr)
41+
if err != nil {
42+
return nil, fmt.Errorf("unable to create UDP proxy to origin (%v:%v): %w", dest.Addr(), dest.Port(), err)
43+
}
44+
45+
return udpConn, nil
46+
}

quic/v3/datagram.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ const (
2424
datagramTypeLen = 1
2525

2626
// 1280 is the default datagram packet length used before MTU discovery: https://github.com/quic-go/quic-go/blob/v0.45.0/internal/protocol/params.go#L12
27-
maxDatagramLen = 1280
27+
maxDatagramPayloadLen = 1280
2828
)
2929

3030
func parseDatagramType(data []byte) (DatagramType, error) {
@@ -100,10 +100,10 @@ func (s *UDPSessionRegistrationDatagram) MarshalBinary() (data []byte, err error
100100
}
101101
var maxPayloadLen int
102102
if ipv6 {
103-
maxPayloadLen = maxDatagramLen - sessionRegistrationIPv6DatagramHeaderLen
103+
maxPayloadLen = maxDatagramPayloadLen + sessionRegistrationIPv6DatagramHeaderLen
104104
flags |= sessionRegistrationFlagsIPMask
105105
} else {
106-
maxPayloadLen = maxDatagramLen - sessionRegistrationIPv4DatagramHeaderLen
106+
maxPayloadLen = maxDatagramPayloadLen + sessionRegistrationIPv4DatagramHeaderLen
107107
}
108108
// Make sure that the payload being bundled can actually fit in the payload destination
109109
if len(s.Payload) > maxPayloadLen {
@@ -195,7 +195,7 @@ const (
195195
datagramPayloadHeaderLen = datagramTypeLen + datagramRequestIdLen
196196

197197
// The maximum size that a proxied UDP payload can be in a [UDPSessionPayloadDatagram]
198-
maxPayloadPlusHeaderLen = maxDatagramLen - datagramPayloadHeaderLen
198+
maxPayloadPlusHeaderLen = maxDatagramPayloadLen + datagramPayloadHeaderLen
199199
)
200200

201201
// The datagram structure for UDPSessionPayloadDatagram is:
@@ -270,7 +270,7 @@ const (
270270
datagramSessionRegistrationResponseLen = datagramTypeLen + datagramRespTypeLen + datagramRequestIdLen + datagramRespErrMsgLen
271271

272272
// The maximum size that an error message can be in a [UDPSessionRegistrationResponseDatagram].
273-
maxResponseErrorMessageLen = maxDatagramLen - datagramSessionRegistrationResponseLen
273+
maxResponseErrorMessageLen = maxDatagramPayloadLen - datagramSessionRegistrationResponseLen
274274
)
275275

276276
// SessionRegistrationResp represents all of the responses that a UDP session registration response

quic/v3/datagram_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func makePayload(size int) []byte {
2121
}
2222

2323
func TestSessionRegistration_MarshalUnmarshal(t *testing.T) {
24-
payload := makePayload(1254)
24+
payload := makePayload(1280)
2525
tests := []*v3.UDPSessionRegistrationDatagram{
2626
// Default (IPv4)
2727
{
@@ -236,7 +236,7 @@ func TestSessionPayload(t *testing.T) {
236236
})
237237

238238
t.Run("payload size too large", func(t *testing.T) {
239-
datagram := makePayload(17 + 1264) // 1263 is the largest payload size allowed
239+
datagram := makePayload(17 + 1281) // 1280 is the largest payload size allowed
240240
err := v3.MarshalPayloadHeaderTo(testRequestID, datagram)
241241
if err != nil {
242242
t.Error(err)

quic/v3/manager.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package v3
2+
3+
import (
4+
"errors"
5+
"net"
6+
"net/netip"
7+
"sync"
8+
9+
"github.com/rs/zerolog"
10+
11+
"github.com/cloudflare/cloudflared/ingress"
12+
)
13+
14+
var (
15+
ErrSessionNotFound = errors.New("session not found")
16+
ErrSessionBoundToOtherConn = errors.New("session is in use by another connection")
17+
)
18+
19+
type SessionManager interface {
20+
// RegisterSession will register a new session if it does not already exist for the request ID.
21+
// During new session creation, the session will also bind the UDP socket for the origin.
22+
// If the session exists for a different connection, it will return [ErrSessionBoundToOtherConn].
23+
RegisterSession(request *UDPSessionRegistrationDatagram, conn DatagramWriter) (Session, error)
24+
// GetSession returns an active session if available for the provided connection.
25+
// If the session does not exist, it will return [ErrSessionNotFound]. If the session exists for a different
26+
// connection, it will return [ErrSessionBoundToOtherConn].
27+
GetSession(requestID RequestID) (Session, error)
28+
// UnregisterSession will remove a session from the current session manager. It will attempt to close the session
29+
// before removal.
30+
UnregisterSession(requestID RequestID)
31+
}
32+
33+
type DialUDP func(dest netip.AddrPort) (*net.UDPConn, error)
34+
35+
type sessionManager struct {
36+
sessions map[RequestID]Session
37+
mutex sync.RWMutex
38+
log *zerolog.Logger
39+
}
40+
41+
func NewSessionManager(log *zerolog.Logger, originDialer DialUDP) SessionManager {
42+
return &sessionManager{
43+
sessions: make(map[RequestID]Session),
44+
log: log,
45+
}
46+
}
47+
48+
func (s *sessionManager) RegisterSession(request *UDPSessionRegistrationDatagram, conn DatagramWriter) (Session, error) {
49+
s.mutex.Lock()
50+
defer s.mutex.Unlock()
51+
// Check to make sure session doesn't already exist for requestID
52+
_, exists := s.sessions[request.RequestID]
53+
if exists {
54+
return nil, ErrSessionBoundToOtherConn
55+
}
56+
// Attempt to bind the UDP socket for the new session
57+
origin, err := ingress.DialUDPAddrPort(request.Dest)
58+
if err != nil {
59+
return nil, err
60+
}
61+
// Create and insert the new session in the map
62+
session := NewSession(request.RequestID, request.IdleDurationHint, origin, conn, s.log)
63+
s.sessions[request.RequestID] = session
64+
return session, nil
65+
}
66+
67+
func (s *sessionManager) GetSession(requestID RequestID) (Session, error) {
68+
s.mutex.RLock()
69+
defer s.mutex.RUnlock()
70+
session, exists := s.sessions[requestID]
71+
if exists {
72+
return session, nil
73+
}
74+
return nil, ErrSessionNotFound
75+
}
76+
77+
func (s *sessionManager) UnregisterSession(requestID RequestID) {
78+
s.mutex.Lock()
79+
defer s.mutex.Unlock()
80+
// Get the session and make sure to close it if it isn't already closed
81+
session, exists := s.sessions[requestID]
82+
if exists {
83+
// We ignore any errors when attempting to close the session
84+
_ = session.Close()
85+
}
86+
delete(s.sessions, requestID)
87+
}

quic/v3/manager_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package v3_test
2+
3+
import (
4+
"errors"
5+
"net/netip"
6+
"strings"
7+
"testing"
8+
"time"
9+
10+
"github.com/rs/zerolog"
11+
12+
"github.com/cloudflare/cloudflared/ingress"
13+
v3 "github.com/cloudflare/cloudflared/quic/v3"
14+
)
15+
16+
func TestRegisterSession(t *testing.T) {
17+
log := zerolog.Nop()
18+
manager := v3.NewSessionManager(&log, ingress.DialUDPAddrPort)
19+
20+
request := v3.UDPSessionRegistrationDatagram{
21+
RequestID: testRequestID,
22+
Dest: netip.MustParseAddrPort("127.0.0.1:5000"),
23+
Traced: false,
24+
IdleDurationHint: 5 * time.Second,
25+
Payload: nil,
26+
}
27+
session, err := manager.RegisterSession(&request, &noopEyeball{})
28+
if err != nil {
29+
t.Fatalf("register session should've succeeded: %v", err)
30+
}
31+
if request.RequestID != session.ID() {
32+
t.Fatalf("session id doesn't match: %v != %v", request.RequestID, session.ID())
33+
}
34+
35+
// We shouldn't be able to register another session with the same request id
36+
_, err = manager.RegisterSession(&request, &noopEyeball{})
37+
if !errors.Is(err, v3.ErrSessionBoundToOtherConn) {
38+
t.Fatalf("session should not be able to be registered again: %v", err)
39+
}
40+
41+
// Get session
42+
sessionGet, err := manager.GetSession(request.RequestID)
43+
if err != nil {
44+
t.Fatalf("get session failed: %v", err)
45+
}
46+
if session.ID() != sessionGet.ID() {
47+
t.Fatalf("session's do not match: %v != %v", session.ID(), sessionGet.ID())
48+
}
49+
50+
// Remove the session
51+
manager.UnregisterSession(request.RequestID)
52+
53+
// Get session should fail
54+
_, err = manager.GetSession(request.RequestID)
55+
if !errors.Is(err, v3.ErrSessionNotFound) {
56+
t.Fatalf("get session failed: %v", err)
57+
}
58+
59+
// Closing the original session should return that the socket is already closed (by the session unregistration)
60+
err = session.Close()
61+
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
62+
t.Fatalf("session should've closed without issue: %v", err)
63+
}
64+
}
65+
66+
func TestGetSession_Empty(t *testing.T) {
67+
log := zerolog.Nop()
68+
manager := v3.NewSessionManager(&log, ingress.DialUDPAddrPort)
69+
70+
_, err := manager.GetSession(testRequestID)
71+
if !errors.Is(err, v3.ErrSessionNotFound) {
72+
t.Fatalf("get session find no session: %v", err)
73+
}
74+
}

quic/v3/muxer.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package v3
2+
3+
// DatagramWriter provides the Muxer interface to create proper Datagrams when sending over a connection.
4+
type DatagramWriter interface {
5+
SendUDPSessionDatagram(datagram []byte) error
6+
SendUDPSessionResponse(id RequestID, resp SessionRegistrationResp) error
7+
//SendICMPPacket(packet packet.IP) error
8+
}

quic/v3/muxer_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package v3_test
2+
3+
import v3 "github.com/cloudflare/cloudflared/quic/v3"
4+
5+
type noopEyeball struct{}
6+
7+
func (noopEyeball) SendUDPSessionDatagram(datagram []byte) error {
8+
return nil
9+
}
10+
11+
func (noopEyeball) SendUDPSessionResponse(id v3.RequestID, resp v3.SessionRegistrationResp) error {
12+
return nil
13+
}
14+
15+
type mockEyeball struct {
16+
// datagram sent via SendUDPSessionDatagram
17+
recvData chan []byte
18+
// responses sent via SendUDPSessionResponse
19+
recvResp chan struct {
20+
id v3.RequestID
21+
resp v3.SessionRegistrationResp
22+
}
23+
}
24+
25+
func newMockEyeball() mockEyeball {
26+
return mockEyeball{
27+
recvData: make(chan []byte, 1),
28+
recvResp: make(chan struct {
29+
id v3.RequestID
30+
resp v3.SessionRegistrationResp
31+
}, 1),
32+
}
33+
}
34+
35+
func (m *mockEyeball) SendUDPSessionDatagram(datagram []byte) error {
36+
b := make([]byte, len(datagram))
37+
copy(b, datagram)
38+
m.recvData <- b
39+
return nil
40+
}
41+
42+
func (m *mockEyeball) SendUDPSessionResponse(id v3.RequestID, resp v3.SessionRegistrationResp) error {
43+
m.recvResp <- struct {
44+
id v3.RequestID
45+
resp v3.SessionRegistrationResp
46+
}{
47+
id, resp,
48+
}
49+
return nil
50+
}

quic/v3/request.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package v3
33
import (
44
"encoding/binary"
55
"errors"
6+
"fmt"
67
)
78

89
const (
@@ -37,6 +38,10 @@ func RequestIDFromSlice(data []byte) (RequestID, error) {
3738
}, nil
3839
}
3940

41+
func (id RequestID) String() string {
42+
return fmt.Sprintf("%016x%016x", id.hi, id.lo)
43+
}
44+
4045
// Compare returns an integer comparing two IPs.
4146
// The result will be 0 if id == id2, -1 if id < id2, and +1 if id > id2.
4247
// The definition of "less than" is the same as the [RequestID.Less] method.

0 commit comments

Comments
 (0)