Skip to content

Commit eea3d11

Browse files
chungthuangsssilver
authored andcommitted
TUN-5301: Separate datagram multiplex and session management logic from quic connection logic
1 parent dd32dc1 commit eea3d11

File tree

10 files changed

+674
-162
lines changed

10 files changed

+674
-162
lines changed

connection/quic.go

Lines changed: 74 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/rs/zerolog"
1717
"golang.org/x/sync/errgroup"
1818

19+
"github.com/cloudflare/cloudflared/datagramsession"
1920
quicpogs "github.com/cloudflare/cloudflared/quic"
2021
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
2122
)
@@ -32,10 +33,11 @@ const (
3233

3334
// QUICConnection represents the type that facilitates Proxying via QUIC streams.
3435
type QUICConnection struct {
35-
session quic.Session
36-
logger *zerolog.Logger
37-
httpProxy OriginProxy
38-
udpSessions *udpSessions
36+
session quic.Session
37+
logger *zerolog.Logger
38+
httpProxy OriginProxy
39+
sessionManager datagramsession.Manager
40+
localIP net.IP
3941
}
4042

4143
// NewQUICConnection returns a new instance of QUICConnection.
@@ -49,12 +51,6 @@ func NewQUICConnection(
4951
controlStreamHandler ControlStreamHandler,
5052
observer *Observer,
5153
) (*QUICConnection, error) {
52-
localIP, err := GetLocalIP()
53-
if err != nil {
54-
return nil, err
55-
}
56-
observer.log.Info().Msgf("UDP proxy will use %s as packet source IP", localIP)
57-
udpSessions := newUDPSessions(localIP)
5854
session, err := quic.DialAddr(edgeAddr.String(), tlsConfig, quicConfig)
5955
if err != nil {
6056
return nil, fmt.Errorf("failed to dial to edge: %w", err)
@@ -71,23 +67,35 @@ func NewQUICConnection(
7167
return nil, err
7268
}
7369

70+
datagramMuxer, err := quicpogs.NewDatagramMuxer(session)
71+
if err != nil {
72+
return nil, err
73+
}
74+
75+
sessionManager := datagramsession.NewManager(datagramMuxer, observer.log)
76+
77+
localIP, err := getLocalIP()
78+
if err != nil {
79+
return nil, err
80+
}
81+
7482
return &QUICConnection{
75-
session: session,
76-
httpProxy: httpProxy,
77-
logger: observer.log,
78-
udpSessions: udpSessions,
83+
session: session,
84+
httpProxy: httpProxy,
85+
logger: observer.log,
86+
sessionManager: sessionManager,
87+
localIP: localIP,
7988
}, nil
8089
}
8190

8291
// Serve starts a QUIC session that begins accepting streams.
8392
func (q *QUICConnection) Serve(ctx context.Context) error {
8493
errGroup, ctx := errgroup.WithContext(ctx)
8594
errGroup.Go(func() error {
86-
return q.listenEdgeDatagram()
95+
return q.acceptStream(ctx)
8796
})
88-
8997
errGroup.Go(func() error {
90-
return q.acceptStream(ctx)
98+
return q.sessionManager.Serve(ctx)
9199
})
92100
return errGroup.Wait()
93101
}
@@ -111,26 +119,6 @@ func (q *QUICConnection) acceptStream(ctx context.Context) error {
111119
}
112120
}
113121

114-
// listenEdgeDatagram listens for datagram from edge, parse the session ID and find the UDPConn to send the payload
115-
func (q *QUICConnection) listenEdgeDatagram() error {
116-
for {
117-
msg, err := q.session.ReceiveMessage()
118-
if err != nil {
119-
return err
120-
}
121-
go func(msg []byte) {
122-
sessionID, msgWithoutID, err := quicpogs.ExtractSessionID(msg)
123-
if err != nil {
124-
q.logger.Err(err).Msg("Failed to parse session ID from datagram")
125-
return
126-
}
127-
if err := q.udpSessions.send(sessionID, msgWithoutID); err != nil {
128-
q.logger.Err(err).Msg("Failed to send UDP to origin")
129-
}
130-
}(msg)
131-
}
132-
}
133-
134122
// Close closes the session with no errors specified.
135123
func (q *QUICConnection) Close() {
136124
q.session.CloseWithError(0, "")
@@ -186,46 +174,29 @@ func (q *QUICConnection) handleRPCStream(rpcStream *quicpogs.RPCServerStream) er
186174
}
187175

188176
func (q *QUICConnection) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16) error {
189-
udpConn, err := q.udpSessions.register(sessionID, dstIP, dstPort)
177+
// Each session is a series of datagram from an eyeball to a dstIP:dstPort.
178+
// (src port, dst IP, dst port) uniquely identifies a session, so it needs a dedicated connected socket.
179+
originProxy, err := q.newUDPProxy(dstIP, dstPort)
180+
if err != nil {
181+
q.logger.Err(err).Msgf("Failed to create udp proxy to %s:%d", dstIP, dstPort)
182+
return err
183+
}
184+
session, err := q.sessionManager.RegisterSession(ctx, sessionID, originProxy)
190185
if err != nil {
186+
q.logger.Err(err).Msgf("Failed to register udp session %s", sessionID)
191187
return err
192188
}
193-
q.logger.Debug().Msgf("Register session %v, %v, %v", sessionID, dstIP, dstPort)
194-
go q.listenOriginUDP(sessionID, udpConn)
189+
go func() {
190+
defer q.sessionManager.UnregisterSession(q.session.Context(), sessionID)
191+
if err := session.Serve(q.session.Context()); err != nil {
192+
q.logger.Debug().Err(err).Str("sessionID", sessionID.String()).Msg("session terminated")
193+
}
194+
}()
195+
q.logger.Debug().Msgf("Registered session %v, %v, %v", sessionID, dstIP, dstPort)
195196
return nil
196197
}
197198

198-
// listenOriginUDP reads UDP from origin in a loop, and returns when it cannot write to edge or cannot read from origin
199-
func (q *QUICConnection) listenOriginUDP(sessionID uuid.UUID, conn *net.UDPConn) {
200-
defer func() {
201-
q.udpSessions.unregister(sessionID)
202-
conn.Close()
203-
}()
204-
readBuffer := make([]byte, MaxDatagramFrameSize)
205-
for {
206-
n, err := conn.Read(readBuffer)
207-
if n > 0 {
208-
if n > MaxDatagramFrameSize-sessionIDLen {
209-
// TODO: TUN-5302 return ICMP packet too big message
210-
q.logger.Error().Msgf("Origin UDP payload has %d bytes, which exceeds transport MTU %d", n, MaxDatagramFrameSize-sessionIDLen)
211-
continue
212-
}
213-
msgWithID, err := quicpogs.SuffixSessionID(sessionID, readBuffer[:n])
214-
if err != nil {
215-
q.logger.Err(err).Msg("Failed to suffix session ID to datagram, it will be dropped")
216-
continue
217-
}
218-
if err := q.session.SendMessage(msgWithID); err != nil {
219-
q.logger.Err(err).Msg("Failed to send datagram back to edge")
220-
return
221-
}
222-
}
223-
if err != nil {
224-
q.logger.Err(err).Msg("Failed to read UDP from origin")
225-
return
226-
}
227-
}
228-
}
199+
// TODO: TUN-5422 Implement UnregisterUdpSession RPC
229200

230201
// streamReadWriteAcker is a light wrapper over QUIC streams with a callback to send response back to
231202
// the client.
@@ -320,3 +291,35 @@ func isTransferEncodingChunked(req *http.Request) bool {
320291
// separated value as well.
321292
return strings.Contains(strings.ToLower(transferEncodingVal), "chunked")
322293
}
294+
295+
// TODO: TUN-5303: Define an UDPProxy in ingress package
296+
func (q *QUICConnection) newUDPProxy(dstIP net.IP, dstPort uint16) (*net.UDPConn, error) {
297+
dstAddr := &net.UDPAddr{
298+
IP: dstIP,
299+
Port: int(dstPort),
300+
}
301+
return net.DialUDP("udp", nil, dstAddr)
302+
}
303+
304+
// TODO: TUN-5303: Find the local IP once in ingress package
305+
// TODO: TUN-5421 allow user to specify which IP to bind to
306+
func getLocalIP() (net.IP, error) {
307+
addrs, err := net.InterfaceAddrs()
308+
if err != nil {
309+
return nil, err
310+
}
311+
for _, addr := range addrs {
312+
// Find the IP that is not loop back
313+
var ip net.IP
314+
switch v := addr.(type) {
315+
case *net.IPNet:
316+
ip = v.IP
317+
case *net.IPAddr:
318+
ip = v.IP
319+
}
320+
if !ip.IsLoopback() {
321+
return ip, nil
322+
}
323+
}
324+
return nil, fmt.Errorf("cannot determine IP to bind to")
325+
}

connection/udp_session.go

Lines changed: 0 additions & 90 deletions
This file was deleted.

datagramsession/event.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package datagramsession
2+
3+
import (
4+
"io"
5+
6+
"github.com/google/uuid"
7+
)
8+
9+
// registerSessionEvent is an event to start tracking a new session
10+
type registerSessionEvent struct {
11+
sessionID uuid.UUID
12+
originProxy io.ReadWriteCloser
13+
resultChan chan *Session
14+
}
15+
16+
func newRegisterSessionEvent(sessionID uuid.UUID, originProxy io.ReadWriteCloser) *registerSessionEvent {
17+
return &registerSessionEvent{
18+
sessionID: sessionID,
19+
originProxy: originProxy,
20+
resultChan: make(chan *Session, 1),
21+
}
22+
}
23+
24+
// unregisterSessionEvent is an event to stop tracking and terminate the session.
25+
type unregisterSessionEvent struct {
26+
sessionID uuid.UUID
27+
}
28+
29+
// newDatagram is an event when transport receives new datagram
30+
type newDatagram struct {
31+
sessionID uuid.UUID
32+
payload []byte
33+
}

0 commit comments

Comments
 (0)