Skip to content

Commit 8445b88

Browse files
committed
TUN-5593: Read full packet from UDP connection, even if it exceeds MTU of the transport. When packet length is greater than the MTU of the transport, we will silently drop packets (for now).
1 parent 7a55208 commit 8445b88

File tree

6 files changed

+28
-19
lines changed

6 files changed

+28
-19
lines changed

datagramsession/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func (m *manager) RegisterSession(ctx context.Context, sessionID uuid.UUID, orig
9595
}
9696

9797
func (m *manager) registerSession(ctx context.Context, registration *registerSessionEvent) {
98-
session := newSession(registration.sessionID, m.transport, registration.originProxy)
98+
session := newSession(registration.sessionID, m.transport, registration.originProxy, m.log)
9999
m.sessions[registration.sessionID] = session
100100
registration.resultChan <- session
101101
}

datagramsession/session.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/google/uuid"
10+
"github.com/rs/zerolog"
1011
)
1112

1213
const (
@@ -17,7 +18,7 @@ func SessionIdleErr(timeout time.Duration) error {
1718
return fmt.Errorf("session idle for %v", timeout)
1819
}
1920

20-
// Each Session is a bidirectional pipe of datagrams between transport and dstConn
21+
// Session is a bidirectional pipe of datagrams between transport and dstConn
2122
// Currently the only implementation of transport is quic DatagramMuxer
2223
// Destination can be a connection with origin or with eyeball
2324
// When the destination is origin:
@@ -35,9 +36,10 @@ type Session struct {
3536
// activeAtChan is used to communicate the last read/write time
3637
activeAtChan chan time.Time
3738
closeChan chan error
39+
log *zerolog.Logger
3840
}
3941

40-
func newSession(id uuid.UUID, transport transport, dstConn io.ReadWriteCloser) *Session {
42+
func newSession(id uuid.UUID, transport transport, dstConn io.ReadWriteCloser, log *zerolog.Logger) *Session {
4143
return &Session{
4244
ID: id,
4345
transport: transport,
@@ -47,14 +49,16 @@ func newSession(id uuid.UUID, transport transport, dstConn io.ReadWriteCloser) *
4749
activeAtChan: make(chan time.Time, 2),
4850
// capacity is 2 because close() and dstToTransport routine in Serve() can write to this channel
4951
closeChan: make(chan error, 2),
52+
log: log,
5053
}
5154
}
5255

5356
func (s *Session) Serve(ctx context.Context, closeAfterIdle time.Duration) (closedByRemote bool, err error) {
5457
go func() {
5558
// QUIC implementation copies data to another buffer before returning https://github.com/lucas-clemente/quic-go/blob/v0.24.0/session.go#L1967-L1975
5659
// This makes it safe to share readBuffer between iterations
57-
readBuffer := make([]byte, s.transport.ReceiveMTU())
60+
const maxPacketSize = 1500
61+
readBuffer := make([]byte, maxPacketSize)
5862
for {
5963
if err := s.dstToTransport(readBuffer); err != nil {
6064
s.closeChan <- err
@@ -103,8 +107,15 @@ func (s *Session) dstToTransport(buffer []byte) error {
103107
n, err := s.dstConn.Read(buffer)
104108
s.markActive()
105109
if n > 0 {
106-
if err := s.transport.SendTo(s.ID, buffer[:n]); err != nil {
107-
return err
110+
if n <= int(s.transport.MTU()) {
111+
err = s.transport.SendTo(s.ID, buffer[:n])
112+
} else {
113+
// drop packet for now, eventually reply with ICMP for PMTUD
114+
s.log.Debug().
115+
Str("session", s.ID.String()).
116+
Int("len", n).
117+
Uint("mtu", s.transport.MTU()).
118+
Msg("dropped packet exceeding MTU")
108119
}
109120
}
110121
return err

datagramsession/session_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/google/uuid"
14+
"github.com/rs/zerolog"
1415
"github.com/stretchr/testify/require"
1516
"golang.org/x/sync/errgroup"
1617
)
@@ -44,7 +45,8 @@ func testSessionReturns(t *testing.T, closeBy closeMethod, closeAfterIdle time.D
4445
reqChan: newDatagramChannel(1),
4546
respChan: newDatagramChannel(1),
4647
}
47-
session := newSession(sessionID, transport, cfdConn)
48+
log := zerolog.Nop()
49+
session := newSession(sessionID, transport, cfdConn, &log)
4850

4951
ctx, cancel := context.WithCancel(context.Background())
5052
sessionDone := make(chan struct{})
@@ -119,7 +121,8 @@ func testActiveSessionNotClosed(t *testing.T, readFromDst bool, writeToDst bool)
119121
reqChan: newDatagramChannel(100),
120122
respChan: newDatagramChannel(100),
121123
}
122-
session := newSession(sessionID, transport, cfdConn)
124+
log := zerolog.Nop()
125+
session := newSession(sessionID, transport, cfdConn, &log)
123126

124127
startTime := time.Now()
125128
activeUntil := startTime.Add(activeTime)
@@ -181,7 +184,7 @@ func testActiveSessionNotClosed(t *testing.T, readFromDst bool, writeToDst bool)
181184

182185
func TestMarkActiveNotBlocking(t *testing.T) {
183186
const concurrentCalls = 50
184-
session := newSession(uuid.New(), nil, nil)
187+
session := newSession(uuid.New(), nil, nil, nil)
185188
var wg sync.WaitGroup
186189
wg.Add(concurrentCalls)
187190
for i := 0; i < concurrentCalls; i++ {

datagramsession/transport.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ type transport interface {
99
// ReceiveFrom reads the next datagram from the transport
1010
ReceiveFrom() (uuid.UUID, []byte, error)
1111
// Max transmission unit to receive from the transport
12-
ReceiveMTU() uint
12+
MTU() uint
1313
}

datagramsession/transport_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func (mt *mockQUICTransport) ReceiveFrom() (uuid.UUID, []byte, error) {
2222
return mt.reqChan.Receive(context.Background())
2323
}
2424

25-
func (mt *mockQUICTransport) ReceiveMTU() uint {
25+
func (mt *mockQUICTransport) MTU() uint {
2626
return 1217
2727
}
2828

quic/datagram.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func NewDatagramMuxer(quicSession quic.Session) (*DatagramMuxer, error) {
3636
func (dm *DatagramMuxer) SendTo(sessionID uuid.UUID, payload []byte) error {
3737
if len(payload) > MaxDatagramFrameSize-sessionIDLen {
3838
// TODO: TUN-5302 return ICMP packet too big message
39-
return fmt.Errorf("origin UDP payload has %d bytes, which exceeds transport MTU %d", len(payload), dm.SendMTU())
39+
return fmt.Errorf("origin UDP payload has %d bytes, which exceeds transport MTU %d", len(payload), dm.MTU())
4040
}
4141
msgWithID, err := SuffixSessionID(sessionID, payload)
4242
if err != nil {
@@ -59,16 +59,11 @@ func (dm *DatagramMuxer) ReceiveFrom() (uuid.UUID, []byte, error) {
5959
return ExtractSessionID(msg)
6060
}
6161

62-
// Maximum application payload to send through QUIC datagram frame
63-
func (dm *DatagramMuxer) SendMTU() uint {
62+
// Maximum application payload to send to / receive from QUIC datagram frame
63+
func (dm *DatagramMuxer) MTU() uint {
6464
return uint(MaxDatagramFrameSize - sessionIDLen)
6565
}
6666

67-
// Maximum expected bytes to read from QUIC datagram frame
68-
func (dm *DatagramMuxer) ReceiveMTU() uint {
69-
return MaxDatagramFrameSize
70-
}
71-
7267
// Each QUIC datagram should be suffixed with session ID.
7368
// ExtractSessionID extracts the session ID and a slice with only the payload
7469
func ExtractSessionID(b []byte) (uuid.UUID, []byte, error) {

0 commit comments

Comments
 (0)