Skip to content

Commit d2bc15e

Browse files
committed
TUN-6667: DatagramMuxerV2 provides a method to receive RawPacket
1 parent bad2e8e commit d2bc15e

File tree

3 files changed

+111
-43
lines changed

3 files changed

+111
-43
lines changed

connection/quic.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type QUICConnection struct {
4747
// sessionManager tracks active sessions. It receives datagrams from quic connection via datagramMuxer
4848
sessionManager datagramsession.Manager
4949
// datagramMuxer mux/demux datagrams from quic connection
50-
datagramMuxer *quicpogs.DatagramMuxer
50+
datagramMuxer quicpogs.BaseDatagramMuxer
5151
controlStreamHandler ControlStreamHandler
5252
connOptions *tunnelpogs.ConnectionOptions
5353
}
@@ -67,9 +67,9 @@ func NewQUICConnection(
6767
return nil, &EdgeQuicDialError{Cause: err}
6868
}
6969

70-
demuxChan := make(chan *packet.Session, demuxChanCapacity)
71-
datagramMuxer := quicpogs.NewDatagramMuxer(session, logger, demuxChan)
72-
sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, demuxChan)
70+
sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity)
71+
datagramMuxer := quicpogs.NewDatagramMuxer(session, logger, sessionDemuxChan)
72+
sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan)
7373

7474
return &QUICConnection{
7575
session: session,

quic/datagram_test.go

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,17 @@ import (
99
"encoding/pem"
1010
"fmt"
1111
"math/big"
12+
"net/netip"
1213
"testing"
1314
"time"
1415

16+
"github.com/google/gopacket/layers"
1517
"github.com/google/uuid"
1618
"github.com/lucas-clemente/quic-go"
1719
"github.com/rs/zerolog"
1820
"github.com/stretchr/testify/require"
21+
"golang.org/x/net/icmp"
22+
"golang.org/x/net/ipv4"
1923
"golang.org/x/sync/errgroup"
2024

2125
"github.com/cloudflare/cloudflared/packet"
@@ -68,15 +72,45 @@ func TestDatagram(t *testing.T) {
6872
Payload: maxPayload,
6973
},
7074
}
71-
flowPayloads := [][]byte{
72-
maxPayload,
75+
76+
packets := []packet.ICMP{
77+
{
78+
IP: &packet.IP{
79+
Src: netip.MustParseAddr("172.16.0.1"),
80+
Dst: netip.MustParseAddr("192.168.0.1"),
81+
Protocol: layers.IPProtocolICMPv4,
82+
},
83+
Message: &icmp.Message{
84+
Type: ipv4.ICMPTypeTimeExceeded,
85+
Code: 0,
86+
Body: &icmp.TimeExceeded{
87+
Data: []byte("original packet"),
88+
},
89+
},
90+
},
91+
{
92+
IP: &packet.IP{
93+
Src: netip.MustParseAddr("172.16.0.2"),
94+
Dst: netip.MustParseAddr("192.168.0.2"),
95+
Protocol: layers.IPProtocolICMPv4,
96+
},
97+
Message: &icmp.Message{
98+
Type: ipv4.ICMPTypeEcho,
99+
Code: 0,
100+
Body: &icmp.Echo{
101+
ID: 6182,
102+
Seq: 9151,
103+
Data: []byte("Test ICMP echo"),
104+
},
105+
},
106+
},
73107
}
74108

75109
testDatagram(t, 1, sessionToPayload, nil)
76-
testDatagram(t, 2, sessionToPayload, flowPayloads)
110+
testDatagram(t, 2, sessionToPayload, packets)
77111
}
78112

79-
func testDatagram(t *testing.T, version uint8, sessionToPayloads []*packet.Session, packetPayloads [][]byte) {
113+
func testDatagram(t *testing.T, version uint8, sessionToPayloads []*packet.Session, packets []packet.ICMP) {
80114
quicConfig := &quic.Config{
81115
KeepAlivePeriod: 5 * time.Millisecond,
82116
EnableDatagrams: true,
@@ -103,12 +137,20 @@ func testDatagram(t *testing.T, version uint8, sessionToPayloads []*packet.Sessi
103137
muxer := NewDatagramMuxer(quicSession, &logger, sessionDemuxChan)
104138
muxer.ServeReceive(ctx)
105139
case 2:
106-
packetDemuxChan := make(chan []byte, len(packetPayloads))
107-
muxer := NewDatagramMuxerV2(quicSession, &logger, sessionDemuxChan, packetDemuxChan)
140+
muxer := NewDatagramMuxerV2(quicSession, &logger, sessionDemuxChan)
108141
muxer.ServeReceive(ctx)
109142

110-
for _, expectedPayload := range packetPayloads {
111-
require.Equal(t, expectedPayload, <-packetDemuxChan)
143+
icmpDecoder := packet.NewICMPDecoder()
144+
for _, pk := range packets {
145+
received, err := muxer.ReceivePacket(ctx)
146+
require.NoError(t, err)
147+
148+
receivedICMP, err := icmpDecoder.Decode(received.Data)
149+
require.NoError(t, err)
150+
require.Equal(t, pk.IP, receivedICMP.IP)
151+
require.Equal(t, pk.Type, receivedICMP.Type)
152+
require.Equal(t, pk.Code, receivedICMP.Code)
153+
require.Equal(t, pk.Body, receivedICMP.Body)
112154
}
113155
default:
114156
return fmt.Errorf("unknown datagram version %d", version)
@@ -141,12 +183,17 @@ func testDatagram(t *testing.T, version uint8, sessionToPayloads []*packet.Sessi
141183
case 1:
142184
muxer = NewDatagramMuxer(quicSession, &logger, nil)
143185
case 2:
144-
muxerV2 := NewDatagramMuxerV2(quicSession, &logger, nil, nil)
145-
for _, payload := range packetPayloads {
146-
require.NoError(t, muxerV2.MuxPacket(payload))
186+
muxerV2 := NewDatagramMuxerV2(quicSession, &logger, nil)
187+
encoder := packet.NewEncoder()
188+
for _, pk := range packets {
189+
encodedPacket, err := encoder.Encode(&pk)
190+
require.NoError(t, err)
191+
require.NoError(t, muxerV2.SendPacket(encodedPacket))
147192
}
148193
// Payload larger than transport MTU, should not be sent
149-
require.Error(t, muxerV2.MuxPacket(largePayload))
194+
require.Error(t, muxerV2.SendPacket(packet.RawPacket{
195+
Data: largePayload,
196+
}))
150197
muxer = muxerV2
151198
default:
152199
return fmt.Errorf("unknown datagram version %d", version)

quic/datagramv2.go

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type datagramV2Type byte
1616
const (
1717
udp datagramV2Type = iota
1818
ip
19+
// Same as sessionDemuxChan capacity
20+
packetChanCapacity = 16
1921
)
2022

2123
func suffixType(b []byte, datagramType datagramV2Type) ([]byte, error) {
@@ -35,24 +37,24 @@ type DatagramMuxerV2 struct {
3537
session quic.Connection
3638
logger *zerolog.Logger
3739
sessionDemuxChan chan<- *packet.Session
38-
packetDemuxChan chan<- []byte
40+
packetDemuxChan chan packet.RawPacket
3941
}
4042

4143
func NewDatagramMuxerV2(
4244
quicSession quic.Connection,
4345
log *zerolog.Logger,
4446
sessionDemuxChan chan<- *packet.Session,
45-
packetDemuxChan chan<- []byte) *DatagramMuxerV2 {
47+
) *DatagramMuxerV2 {
4648
logger := log.With().Uint8("datagramVersion", 2).Logger()
4749
return &DatagramMuxerV2{
4850
session: quicSession,
4951
logger: &logger,
5052
sessionDemuxChan: sessionDemuxChan,
51-
packetDemuxChan: packetDemuxChan,
53+
packetDemuxChan: make(chan packet.RawPacket, packetChanCapacity),
5254
}
5355
}
5456

55-
// MuxSession suffix the session ID and datagram version to the payload so the other end of the QUIC connection can
57+
// SendToSession suffix the session ID and datagram version to the payload so the other end of the QUIC connection can
5658
// demultiplex the payload from multiple datagram sessions
5759
func (dm *DatagramMuxerV2) SendToSession(session *packet.Session) error {
5860
if len(session.Payload) > dm.mtu() {
@@ -73,10 +75,10 @@ func (dm *DatagramMuxerV2) SendToSession(session *packet.Session) error {
7375
return nil
7476
}
7577

76-
// MuxPacket suffix the datagram type to the packet. The other end of the QUIC connection can demultiplex by parsing
78+
// SendPacket suffix the datagram type to the packet. The other end of the QUIC connection can demultiplex by parsing
7779
// the payload as IP and look at the source and destination.
78-
func (dm *DatagramMuxerV2) MuxPacket(packet []byte) error {
79-
payloadWithVersion, err := suffixType(packet, ip)
80+
func (dm *DatagramMuxerV2) SendPacket(pk packet.RawPacket) error {
81+
payloadWithVersion, err := suffixType(pk.Data, ip)
8082
if err != nil {
8183
return errors.Wrap(err, "Failed to suffix datagram type, it will be dropped")
8284
}
@@ -102,6 +104,15 @@ func (dm *DatagramMuxerV2) ServeReceive(ctx context.Context) error {
102104
}
103105
}
104106

107+
func (dm *DatagramMuxerV2) ReceivePacket(ctx context.Context) (packet.RawPacket, error) {
108+
select {
109+
case <-ctx.Done():
110+
return packet.RawPacket{}, ctx.Err()
111+
case pk := <-dm.packetDemuxChan:
112+
return pk, nil
113+
}
114+
}
115+
105116
func (dm *DatagramMuxerV2) demux(ctx context.Context, msgWithType []byte) error {
106117
if len(msgWithType) < 1 {
107118
return fmt.Errorf("QUIC datagram should have at least 1 byte")
@@ -110,28 +121,38 @@ func (dm *DatagramMuxerV2) demux(ctx context.Context, msgWithType []byte) error
110121
msg := msgWithType[0 : len(msgWithType)-1]
111122
switch msgType {
112123
case udp:
113-
sessionID, payload, err := extractSessionID(msg)
114-
if err != nil {
115-
return err
116-
}
117-
sessionDatagram := packet.Session{
118-
ID: sessionID,
119-
Payload: payload,
120-
}
121-
select {
122-
case dm.sessionDemuxChan <- &sessionDatagram:
123-
return nil
124-
case <-ctx.Done():
125-
return ctx.Err()
126-
}
124+
return dm.handleSession(ctx, msg)
127125
case ip:
128-
select {
129-
case dm.packetDemuxChan <- msg:
130-
return nil
131-
case <-ctx.Done():
132-
return ctx.Err()
133-
}
126+
return dm.handlePacket(ctx, msg)
134127
default:
135128
return fmt.Errorf("Unexpected datagram type %d", msgType)
136129
}
137130
}
131+
132+
func (dm *DatagramMuxerV2) handleSession(ctx context.Context, session []byte) error {
133+
sessionID, payload, err := extractSessionID(session)
134+
if err != nil {
135+
return err
136+
}
137+
sessionDatagram := packet.Session{
138+
ID: sessionID,
139+
Payload: payload,
140+
}
141+
select {
142+
case dm.sessionDemuxChan <- &sessionDatagram:
143+
return nil
144+
case <-ctx.Done():
145+
return ctx.Err()
146+
}
147+
}
148+
149+
func (dm *DatagramMuxerV2) handlePacket(ctx context.Context, pk []byte) error {
150+
select {
151+
case <-ctx.Done():
152+
return ctx.Err()
153+
case dm.packetDemuxChan <- packet.RawPacket{
154+
Data: pk,
155+
}:
156+
return nil
157+
}
158+
}

0 commit comments

Comments
 (0)