Skip to content

Commit 5891c0d

Browse files
committed
TUN-8700: Add datagram v3 muxer
The datagram muxer will wrap a QUIC Connection datagram read-writer operations to unmarshal datagrams from the connection to the origin with the session manager. Incoming datagram session registration operations will create new UDP sockets for sessions to proxy UDP packets between the edge and the origin. The muxer is also responsible for marshalling UDP packets and operations into datagrams for communication over the QUIC connection towards the edge. Closes TUN-8700
1 parent d29017f commit 5891c0d

File tree

4 files changed

+670
-2
lines changed

4 files changed

+670
-2
lines changed

quic/v3/datagram.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,8 @@ const (
284284
ResponseDestinationUnreachable SessionRegistrationResp = 0x01
285285
// Session registration was unable to bind to a local UDP socket.
286286
ResponseUnableToBindSocket SessionRegistrationResp = 0x02
287+
// Session registration is already bound to another connection.
288+
ResponseSessionAlreadyConnected SessionRegistrationResp = 0x03
287289
// Session registration failed with an unexpected error but provided a message.
288290
ResponseErrorWithMsg SessionRegistrationResp = 0xff
289291
)

quic/v3/datagram_errors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77

88
var (
99
ErrInvalidDatagramType error = errors.New("invalid datagram type expected")
10-
ErrDatagramHeaderTooSmall error = fmt.Errorf("datagram should have at least %d bytes", datagramTypeLen)
10+
ErrDatagramHeaderTooSmall error = fmt.Errorf("datagram should have at least %d byte", datagramTypeLen)
1111
ErrDatagramPayloadTooLarge error = errors.New("payload length is too large to be bundled in datagram")
1212
ErrDatagramPayloadHeaderTooSmall error = errors.New("payload length is too small to fit the datagram header")
1313
ErrDatagramPayloadInvalidSize error = errors.New("datagram provided is an invalid size")

quic/v3/muxer.go

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,227 @@
11
package v3
22

3+
import (
4+
"context"
5+
"errors"
6+
7+
"github.com/rs/zerolog"
8+
)
9+
10+
const (
11+
// Allocating a 16 channel buffer here allows for the writer to be slightly faster than the reader.
12+
// This has worked previously well for datagramv2, so we will start with this as well
13+
demuxChanCapacity = 16
14+
)
15+
16+
// DatagramConn is the bridge that multiplexes writes and reads of datagrams for UDP sessions and ICMP packets to
17+
// a connection.
18+
type DatagramConn interface {
19+
DatagramWriter
20+
// Serve provides a server interface to process and handle incoming QUIC datagrams and demux their datagram v3 payloads.
21+
Serve(context.Context) error
22+
}
23+
324
// DatagramWriter provides the Muxer interface to create proper Datagrams when sending over a connection.
425
type DatagramWriter interface {
526
SendUDPSessionDatagram(datagram []byte) error
627
SendUDPSessionResponse(id RequestID, resp SessionRegistrationResp) error
728
//SendICMPPacket(packet packet.IP) error
829
}
30+
31+
// QuicConnection provides an interface that matches [quic.Connection] for only the datagram operations.
32+
//
33+
// We currently rely on the mutex for the [quic.Connection.SendDatagram] and [quic.Connection.ReceiveDatagram] and
34+
// do not have any locking for them. If the implementation in quic-go were to ever change, we would need to make
35+
// sure that we lock properly on these operations.
36+
type QuicConnection interface {
37+
Context() context.Context
38+
SendDatagram(payload []byte) error
39+
ReceiveDatagram(context.Context) ([]byte, error)
40+
}
41+
42+
type datagramConn struct {
43+
conn QuicConnection
44+
sessionManager SessionManager
45+
logger *zerolog.Logger
46+
47+
datagrams chan []byte
48+
readErrors chan error
49+
}
50+
51+
func NewDatagramConn(conn QuicConnection, sessionManager SessionManager, logger *zerolog.Logger) DatagramConn {
52+
log := logger.With().Uint8("datagramVersion", 3).Logger()
53+
return &datagramConn{
54+
conn: conn,
55+
sessionManager: sessionManager,
56+
logger: &log,
57+
datagrams: make(chan []byte, demuxChanCapacity),
58+
readErrors: make(chan error, 2),
59+
}
60+
}
61+
62+
func (c *datagramConn) SendUDPSessionDatagram(datagram []byte) error {
63+
return c.conn.SendDatagram(datagram)
64+
}
65+
66+
func (c *datagramConn) SendUDPSessionResponse(id RequestID, resp SessionRegistrationResp) error {
67+
datagram := UDPSessionRegistrationResponseDatagram{
68+
RequestID: id,
69+
ResponseType: resp,
70+
}
71+
data, err := datagram.MarshalBinary()
72+
if err != nil {
73+
return err
74+
}
75+
return c.conn.SendDatagram(data)
76+
}
77+
78+
var errReadTimeout error = errors.New("receive datagram timeout")
79+
80+
// pollDatagrams will read datagrams from the underlying connection until the provided context is done.
81+
func (c *datagramConn) pollDatagrams(ctx context.Context) {
82+
for ctx.Err() == nil {
83+
datagram, err := c.conn.ReceiveDatagram(ctx)
84+
// If the read returns an error, we want to return the failure to the channel.
85+
if err != nil {
86+
c.readErrors <- err
87+
return
88+
}
89+
c.datagrams <- datagram
90+
}
91+
if ctx.Err() != nil {
92+
c.readErrors <- ctx.Err()
93+
}
94+
}
95+
96+
// Serve will begin the process of receiving datagrams from the [quic.Connection] and demuxing them to their destination.
97+
// The [DatagramConn] when serving, will be responsible for the sessions it accepts.
98+
func (c *datagramConn) Serve(ctx context.Context) error {
99+
connCtx := c.conn.Context()
100+
// We want to make sure that we cancel the reader context if the Serve method returns. This could also mean that the
101+
// underlying connection is also closing, but that is handled outside of the context of the datagram muxer.
102+
readCtx, cancel := context.WithCancel(connCtx)
103+
defer cancel()
104+
go c.pollDatagrams(readCtx)
105+
for {
106+
// We make sure to monitor the context of cloudflared and the underlying connection to return if any errors occur.
107+
var datagram []byte
108+
select {
109+
// Monitor the context of cloudflared
110+
case <-ctx.Done():
111+
return ctx.Err()
112+
// Monitor the context of the underlying connection
113+
case <-connCtx.Done():
114+
return connCtx.Err()
115+
// Monitor for any hard errors from reading the connection
116+
case err := <-c.readErrors:
117+
return err
118+
// Otherwise, wait and dequeue datagrams as they come in
119+
case d := <-c.datagrams:
120+
datagram = d
121+
}
122+
123+
// Each incoming datagram will be processed in a new go routine to handle the demuxing and action associated.
124+
go func() {
125+
typ, err := parseDatagramType(datagram)
126+
if err != nil {
127+
c.logger.Err(err).Msgf("unable to parse datagram type: %d", typ)
128+
return
129+
}
130+
switch typ {
131+
case UDPSessionRegistrationType:
132+
reg := &UDPSessionRegistrationDatagram{}
133+
err := reg.UnmarshalBinary(datagram)
134+
if err != nil {
135+
c.logger.Err(err).Msgf("unable to unmarshal session registration datagram")
136+
return
137+
}
138+
// We bind the new session to the quic connection context instead of cloudflared context to allow for the
139+
// quic connection to close and close only the sessions bound to it. Closing of cloudflared will also
140+
// initiate the close of the quic connection, so we don't have to worry about the application context
141+
// in the scope of a session.
142+
c.handleSessionRegistrationDatagram(connCtx, reg)
143+
case UDPSessionPayloadType:
144+
payload := &UDPSessionPayloadDatagram{}
145+
err := payload.UnmarshalBinary(datagram)
146+
if err != nil {
147+
c.logger.Err(err).Msgf("unable to unmarshal session payload datagram")
148+
return
149+
}
150+
c.handleSessionPayloadDatagram(payload)
151+
case UDPSessionRegistrationResponseType:
152+
// cloudflared should never expect to receive UDP session responses as it will not initiate new
153+
// sessions towards the edge.
154+
c.logger.Error().Msgf("unexpected datagram type received: %d", UDPSessionRegistrationResponseType)
155+
return
156+
default:
157+
c.logger.Error().Msgf("unknown datagram type received: %d", typ)
158+
}
159+
}()
160+
}
161+
}
162+
163+
// This method handles new registrations of a session and the serve loop for the session.
164+
func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, datagram *UDPSessionRegistrationDatagram) {
165+
session, err := c.sessionManager.RegisterSession(datagram, c)
166+
if err != nil {
167+
c.logger.Err(err).Msgf("session registration failure")
168+
c.handleSessionRegistrationFailure(datagram.RequestID, err)
169+
return
170+
}
171+
// Make sure to eventually remove the session from the session manager when the session is closed
172+
defer c.sessionManager.UnregisterSession(session.ID())
173+
174+
// Respond that we are able to process the new session
175+
err = c.SendUDPSessionResponse(datagram.RequestID, ResponseOk)
176+
if err != nil {
177+
c.logger.Err(err).Msgf("session registration failure: unable to send session registration response")
178+
return
179+
}
180+
181+
// We bind the context of the session to the [quic.Connection] that initiated the session.
182+
// [Session.Serve] is blocking and will continue this go routine till the end of the session lifetime.
183+
err = session.Serve(ctx)
184+
if err == nil {
185+
// We typically don't expect a session to close without some error response. [SessionIdleErr] is the typical
186+
// expected error response.
187+
c.logger.Warn().Msg("session was closed without explicit close or timeout")
188+
return
189+
}
190+
// SessionIdleErr and SessionCloseErr are valid and successful error responses to end a session.
191+
if errors.Is(err, SessionIdleErr{}) || errors.Is(err, SessionCloseErr) {
192+
c.logger.Debug().Msg(err.Error())
193+
return
194+
}
195+
196+
// All other errors should be reported as errors
197+
c.logger.Err(err).Msgf("session was closed with an error")
198+
}
199+
200+
func (c *datagramConn) handleSessionRegistrationFailure(requestID RequestID, regErr error) {
201+
var errResp SessionRegistrationResp
202+
switch regErr {
203+
case ErrSessionBoundToOtherConn:
204+
errResp = ResponseSessionAlreadyConnected
205+
default:
206+
errResp = ResponseUnableToBindSocket
207+
}
208+
err := c.SendUDPSessionResponse(requestID, errResp)
209+
if err != nil {
210+
c.logger.Err(err).Msgf("unable to send session registration error response (%d)", errResp)
211+
}
212+
}
213+
214+
// Handles incoming datagrams that need to be sent to a registered session.
215+
func (c *datagramConn) handleSessionPayloadDatagram(datagram *UDPSessionPayloadDatagram) {
216+
s, err := c.sessionManager.GetSession(datagram.RequestID)
217+
if err != nil {
218+
c.logger.Err(err).Msgf("unable to find session")
219+
return
220+
}
221+
// We ignore the bytes written to the socket because any partial write must return an error.
222+
_, err = s.Write(datagram.Payload)
223+
if err != nil {
224+
c.logger.Err(err).Msgf("unable to write payload for unavailable session")
225+
return
226+
}
227+
}

0 commit comments

Comments
 (0)