Skip to content

Commit 1fb4669

Browse files
committed
TUN-9882: Add buffers for UDP and ICMP datagrams in datagram v3
Instead of creating a go routine to process each incoming datagram from the tunnel, a single consumer (the demuxer) will process each of the datagrams in serial. Registration datagrams will still be spun out into separate go routines since they are responsible for managing the lifetime of the session once started via the `Serve` method. UDP payload datagrams will be handled in separate channels to allow for parallel writing inside of the scope of a session via a new write loop. This channel will have a small buffer to help unblock the demuxer from dequeueing other datagrams. ICMP datagrams will be funneled into a single channel across all possible origins with a single consumer to write to their respective destinations. Each of these changes is to prevent datagram reordering from occurring when dequeuing from the tunnel connection. By establishing a single demuxer that serializes the writes per session, each session will be able to write sequentially, but in parallel to their respective origins. Closes TUN-9882
1 parent fff1fc7 commit 1fb4669

File tree

7 files changed

+428
-255
lines changed

7 files changed

+428
-255
lines changed

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ ifdef PACKAGE_MANAGER
3636
VERSION_FLAGS := $(VERSION_FLAGS) -X "github.com/cloudflare/cloudflared/cmd/cloudflared/updater.BuiltForPackageManager=$(PACKAGE_MANAGER)"
3737
endif
3838

39-
ifdef CONTAINER_BUILD
39+
ifdef CONTAINER_BUILD
4040
VERSION_FLAGS := $(VERSION_FLAGS) -X "github.com/cloudflare/cloudflared/metrics.Runtime=virtual"
4141
endif
4242

@@ -119,7 +119,7 @@ ifneq ($(TARGET_ARM), )
119119
ARM_COMMAND := GOARM=$(TARGET_ARM)
120120
endif
121121

122-
ifeq ($(TARGET_ARM), 7)
122+
ifeq ($(TARGET_ARM), 7)
123123
PACKAGE_ARCH := armhf
124124
else
125125
PACKAGE_ARCH := $(TARGET_ARCH)
@@ -182,7 +182,7 @@ fuzz:
182182
@go test -fuzz=FuzzIPDecoder -fuzztime=600s ./packet
183183
@go test -fuzz=FuzzICMPDecoder -fuzztime=600s ./packet
184184
@go test -fuzz=FuzzSessionWrite -fuzztime=600s ./quic/v3
185-
@go test -fuzz=FuzzSessionServe -fuzztime=600s ./quic/v3
185+
@go test -fuzz=FuzzSessionRead -fuzztime=600s ./quic/v3
186186
@go test -fuzz=FuzzRegistrationDatagram -fuzztime=600s ./quic/v3
187187
@go test -fuzz=FuzzPayloadDatagram -fuzztime=600s ./quic/v3
188188
@go test -fuzz=FuzzRegistrationResponseDatagram -fuzztime=600s ./quic/v3

quic/v3/datagram_test.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package v3_test
22

33
import (
4+
"crypto/rand"
45
"encoding/binary"
56
"errors"
67
"net/netip"
@@ -14,12 +15,18 @@ import (
1415

1516
func makePayload(size int) []byte {
1617
payload := make([]byte, size)
17-
for i := range len(payload) {
18-
payload[i] = 0xfc
19-
}
18+
_, _ = rand.Read(payload)
2019
return payload
2120
}
2221

22+
func makePayloads(size int, count int) [][]byte {
23+
payloads := make([][]byte, count)
24+
for i := range payloads {
25+
payloads[i] = makePayload(size)
26+
}
27+
return payloads
28+
}
29+
2330
func TestSessionRegistration_MarshalUnmarshal(t *testing.T) {
2431
payload := makePayload(1280)
2532
tests := []*v3.UDPSessionRegistrationDatagram{

quic/v3/muxer.go

Lines changed: 112 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ const (
1717
// Allocating a 16 channel buffer here allows for the writer to be slightly faster than the reader.
1818
// This has worked previously well for datagramv2, so we will start with this as well
1919
demuxChanCapacity = 16
20+
// This provides a small buffer for the PacketRouter to poll ICMP packets from the QUIC connection
21+
// before writing them to the origin.
22+
icmpDatagramChanCapacity = 128
2023

2124
logSrcKey = "src"
2225
logDstKey = "dst"
@@ -59,14 +62,15 @@ type QuicConnection interface {
5962
}
6063

6164
type datagramConn struct {
62-
conn QuicConnection
63-
index uint8
64-
sessionManager SessionManager
65-
icmpRouter ingress.ICMPRouter
66-
metrics Metrics
67-
logger *zerolog.Logger
68-
datagrams chan []byte
69-
readErrors chan error
65+
conn QuicConnection
66+
index uint8
67+
sessionManager SessionManager
68+
icmpRouter ingress.ICMPRouter
69+
metrics Metrics
70+
logger *zerolog.Logger
71+
datagrams chan []byte
72+
icmpDatagramChan chan *ICMPDatagram
73+
readErrors chan error
7074

7175
icmpEncoderPool sync.Pool // a pool of *packet.Encoder
7276
icmpDecoderPool sync.Pool
@@ -75,14 +79,15 @@ type datagramConn struct {
7579
func NewDatagramConn(conn QuicConnection, sessionManager SessionManager, icmpRouter ingress.ICMPRouter, index uint8, metrics Metrics, logger *zerolog.Logger) DatagramConn {
7680
log := logger.With().Uint8("datagramVersion", 3).Logger()
7781
return &datagramConn{
78-
conn: conn,
79-
index: index,
80-
sessionManager: sessionManager,
81-
icmpRouter: icmpRouter,
82-
metrics: metrics,
83-
logger: &log,
84-
datagrams: make(chan []byte, demuxChanCapacity),
85-
readErrors: make(chan error, 2),
82+
conn: conn,
83+
index: index,
84+
sessionManager: sessionManager,
85+
icmpRouter: icmpRouter,
86+
metrics: metrics,
87+
logger: &log,
88+
datagrams: make(chan []byte, demuxChanCapacity),
89+
icmpDatagramChan: make(chan *ICMPDatagram, icmpDatagramChanCapacity),
90+
readErrors: make(chan error, 2),
8691
icmpEncoderPool: sync.Pool{
8792
New: func() any {
8893
return packet.NewEncoder()
@@ -168,6 +173,9 @@ func (c *datagramConn) Serve(ctx context.Context) error {
168173
readCtx, cancel := context.WithCancel(connCtx)
169174
defer cancel()
170175
go c.pollDatagrams(readCtx)
176+
// Processing ICMP datagrams also monitors the reader context since the ICMP datagrams from the reader are the input
177+
// for the routine.
178+
go c.processICMPDatagrams(readCtx)
171179
for {
172180
// We make sure to monitor the context of cloudflared and the underlying connection to return if any errors occur.
173181
var datagram []byte
@@ -181,58 +189,59 @@ func (c *datagramConn) Serve(ctx context.Context) error {
181189
// Monitor for any hard errors from reading the connection
182190
case err := <-c.readErrors:
183191
return err
184-
// Otherwise, wait and dequeue datagrams as they come in
192+
// Wait and dequeue datagrams as they come in
185193
case d := <-c.datagrams:
186194
datagram = d
187195
}
188196

189197
// Each incoming datagram will be processed in a new go routine to handle the demuxing and action associated.
190-
go func() {
191-
typ, err := ParseDatagramType(datagram)
198+
typ, err := ParseDatagramType(datagram)
199+
if err != nil {
200+
c.logger.Err(err).Msgf("unable to parse datagram type: %d", typ)
201+
continue
202+
}
203+
switch typ {
204+
case UDPSessionRegistrationType:
205+
reg := &UDPSessionRegistrationDatagram{}
206+
err := reg.UnmarshalBinary(datagram)
207+
if err != nil {
208+
c.logger.Err(err).Msgf("unable to unmarshal session registration datagram")
209+
continue
210+
}
211+
logger := c.logger.With().Str(logFlowID, reg.RequestID.String()).Logger()
212+
// We bind the new session to the quic connection context instead of cloudflared context to allow for the
213+
// quic connection to close and close only the sessions bound to it. Closing of cloudflared will also
214+
// initiate the close of the quic connection, so we don't have to worry about the application context
215+
// in the scope of a session.
216+
//
217+
// Additionally, we spin out the registration into a separate go routine to handle the Serve'ing of the
218+
// session in a separate routine from the demuxer.
219+
go c.handleSessionRegistrationDatagram(connCtx, reg, &logger)
220+
case UDPSessionPayloadType:
221+
payload := &UDPSessionPayloadDatagram{}
222+
err := payload.UnmarshalBinary(datagram)
192223
if err != nil {
193-
c.logger.Err(err).Msgf("unable to parse datagram type: %d", typ)
194-
return
224+
c.logger.Err(err).Msgf("unable to unmarshal session payload datagram")
225+
continue
195226
}
196-
switch typ {
197-
case UDPSessionRegistrationType:
198-
reg := &UDPSessionRegistrationDatagram{}
199-
err := reg.UnmarshalBinary(datagram)
200-
if err != nil {
201-
c.logger.Err(err).Msgf("unable to unmarshal session registration datagram")
202-
return
203-
}
204-
logger := c.logger.With().Str(logFlowID, reg.RequestID.String()).Logger()
205-
// We bind the new session to the quic connection context instead of cloudflared context to allow for the
206-
// quic connection to close and close only the sessions bound to it. Closing of cloudflared will also
207-
// initiate the close of the quic connection, so we don't have to worry about the application context
208-
// in the scope of a session.
209-
c.handleSessionRegistrationDatagram(connCtx, reg, &logger)
210-
case UDPSessionPayloadType:
211-
payload := &UDPSessionPayloadDatagram{}
212-
err := payload.UnmarshalBinary(datagram)
213-
if err != nil {
214-
c.logger.Err(err).Msgf("unable to unmarshal session payload datagram")
215-
return
216-
}
217-
logger := c.logger.With().Str(logFlowID, payload.RequestID.String()).Logger()
218-
c.handleSessionPayloadDatagram(payload, &logger)
219-
case ICMPType:
220-
packet := &ICMPDatagram{}
221-
err := packet.UnmarshalBinary(datagram)
222-
if err != nil {
223-
c.logger.Err(err).Msgf("unable to unmarshal icmp datagram")
224-
return
225-
}
226-
c.handleICMPPacket(packet)
227-
case UDPSessionRegistrationResponseType:
228-
// cloudflared should never expect to receive UDP session responses as it will not initiate new
229-
// sessions towards the edge.
230-
c.logger.Error().Msgf("unexpected datagram type received: %d", UDPSessionRegistrationResponseType)
231-
return
232-
default:
233-
c.logger.Error().Msgf("unknown datagram type received: %d", typ)
227+
logger := c.logger.With().Str(logFlowID, payload.RequestID.String()).Logger()
228+
c.handleSessionPayloadDatagram(payload, &logger)
229+
case ICMPType:
230+
packet := &ICMPDatagram{}
231+
err := packet.UnmarshalBinary(datagram)
232+
if err != nil {
233+
c.logger.Err(err).Msgf("unable to unmarshal icmp datagram")
234+
continue
234235
}
235-
}()
236+
c.handleICMPPacket(packet)
237+
case UDPSessionRegistrationResponseType:
238+
// cloudflared should never expect to receive UDP session responses as it will not initiate new
239+
// sessions towards the edge.
240+
c.logger.Error().Msgf("unexpected datagram type received: %d", UDPSessionRegistrationResponseType)
241+
continue
242+
default:
243+
c.logger.Error().Msgf("unknown datagram type received: %d", typ)
244+
}
236245
}
237246
}
238247

@@ -243,24 +252,21 @@ func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, da
243252
Str(logDstKey, datagram.Dest.String()).
244253
Logger()
245254
session, err := c.sessionManager.RegisterSession(datagram, c)
246-
switch err {
247-
case nil:
248-
// Continue as normal
249-
case ErrSessionAlreadyRegistered:
250-
// Session is already registered and likely the response got lost
251-
c.handleSessionAlreadyRegistered(datagram.RequestID, &log)
252-
return
253-
case ErrSessionBoundToOtherConn:
254-
// Session is already registered but to a different connection
255-
c.handleSessionMigration(datagram.RequestID, &log)
256-
return
257-
case ErrSessionRegistrationRateLimited:
258-
// There are too many concurrent sessions so we return an error to force a retry later
259-
c.handleSessionRegistrationRateLimited(datagram, &log)
260-
return
261-
default:
262-
log.Err(err).Msg("flow registration failure")
263-
c.handleSessionRegistrationFailure(datagram.RequestID, &log)
255+
if err != nil {
256+
switch err {
257+
case ErrSessionAlreadyRegistered:
258+
// Session is already registered and likely the response got lost
259+
c.handleSessionAlreadyRegistered(datagram.RequestID, &log)
260+
case ErrSessionBoundToOtherConn:
261+
// Session is already registered but to a different connection
262+
c.handleSessionMigration(datagram.RequestID, &log)
263+
case ErrSessionRegistrationRateLimited:
264+
// There are too many concurrent sessions so we return an error to force a retry later
265+
c.handleSessionRegistrationRateLimited(datagram, &log)
266+
default:
267+
log.Err(err).Msg("flow registration failure")
268+
c.handleSessionRegistrationFailure(datagram.RequestID, &log)
269+
}
264270
return
265271
}
266272
log = log.With().Str(logSrcKey, session.LocalAddr().String()).Logger()
@@ -365,21 +371,42 @@ func (c *datagramConn) handleSessionPayloadDatagram(datagram *UDPSessionPayloadD
365371
logger.Err(err).Msgf("unable to find flow")
366372
return
367373
}
368-
// We ignore the bytes written to the socket because any partial write must return an error.
369-
_, err = s.Write(datagram.Payload)
370-
if err != nil {
371-
logger.Err(err).Msgf("unable to write payload for the flow")
372-
return
373-
}
374+
s.Write(datagram.Payload)
374375
}
375376

376-
// Handles incoming ICMP datagrams.
377+
// Handles incoming ICMP datagrams into a serialized channel to be handled by a single consumer.
377378
func (c *datagramConn) handleICMPPacket(datagram *ICMPDatagram) {
378379
if c.icmpRouter == nil {
379380
// ICMPRouter is disabled so we drop the current packet and ignore all incoming ICMP packets
380381
return
381382
}
383+
select {
384+
case c.icmpDatagramChan <- datagram:
385+
default:
386+
// If the ICMP datagram channel is full, drop any additional incoming.
387+
c.logger.Warn().Msg("failed to write icmp packet to origin: dropped")
388+
}
389+
}
390+
391+
// Consumes from the ICMP datagram channel to write out the ICMP requests to an origin.
392+
func (c *datagramConn) processICMPDatagrams(ctx context.Context) {
393+
if c.icmpRouter == nil {
394+
// ICMPRouter is disabled so we ignore all incoming ICMP packets
395+
return
396+
}
397+
398+
for {
399+
select {
400+
// If the provided context is closed we want to exit the write loop
401+
case <-ctx.Done():
402+
return
403+
case datagram := <-c.icmpDatagramChan:
404+
c.writeICMPPacket(datagram)
405+
}
406+
}
407+
}
382408

409+
func (c *datagramConn) writeICMPPacket(datagram *ICMPDatagram) {
383410
// Decode the provided ICMPDatagram as an ICMP packet
384411
rawPacket := packet.RawPacket{Data: datagram.Payload}
385412
cachedDecoder := c.icmpDecoderPool.Get()

0 commit comments

Comments
 (0)