Skip to content

Commit 59f5b0d

Browse files
committed
TUN-6530: Implement ICMPv4 proxy
This proxy uses unprivileged datagram-oriented endpoint and is shared by all quic connections
1 parent f6bd4aa commit 59f5b0d

File tree

10 files changed

+440
-126
lines changed

10 files changed

+440
-126
lines changed

connection/quic.go

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type QUICConnection struct {
4848
sessionManager datagramsession.Manager
4949
// datagramMuxer mux/demux datagrams from quic connection
5050
datagramMuxer quicpogs.BaseDatagramMuxer
51+
packetRouter *packetRouter
5152
controlStreamHandler ControlStreamHandler
5253
connOptions *tunnelpogs.ConnectionOptions
5354
}
@@ -61,14 +62,28 @@ func NewQUICConnection(
6162
connOptions *tunnelpogs.ConnectionOptions,
6263
controlStreamHandler ControlStreamHandler,
6364
logger *zerolog.Logger,
65+
icmpProxy ingress.ICMPProxy,
6466
) (*QUICConnection, error) {
6567
session, err := quic.DialAddr(edgeAddr.String(), tlsConfig, quicConfig)
6668
if err != nil {
6769
return nil, &EdgeQuicDialError{Cause: err}
6870
}
6971

7072
sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity)
71-
datagramMuxer := quicpogs.NewDatagramMuxer(session, logger, sessionDemuxChan)
73+
var (
74+
datagramMuxer quicpogs.BaseDatagramMuxer
75+
pr *packetRouter
76+
)
77+
if icmpProxy != nil {
78+
pr = &packetRouter{
79+
muxer: quicpogs.NewDatagramMuxerV2(session, logger, sessionDemuxChan),
80+
icmpProxy: icmpProxy,
81+
logger: logger,
82+
}
83+
datagramMuxer = pr.muxer
84+
} else {
85+
datagramMuxer = quicpogs.NewDatagramMuxer(session, logger, sessionDemuxChan)
86+
}
7287
sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan)
7388

7489
return &QUICConnection{
@@ -77,6 +92,7 @@ func NewQUICConnection(
7792
logger: logger,
7893
sessionManager: sessionManager,
7994
datagramMuxer: datagramMuxer,
95+
packetRouter: pr,
8096
controlStreamHandler: controlStreamHandler,
8197
connOptions: connOptions,
8298
}, nil
@@ -117,6 +133,12 @@ func (q *QUICConnection) Serve(ctx context.Context) error {
117133
defer cancel()
118134
return q.datagramMuxer.ServeReceive(ctx)
119135
})
136+
if q.packetRouter != nil {
137+
errGroup.Go(func() error {
138+
defer cancel()
139+
return q.packetRouter.serve(ctx)
140+
})
141+
}
120142

121143
return errGroup.Wait()
122144
}
@@ -305,6 +327,32 @@ func (q *QUICConnection) UpdateConfiguration(ctx context.Context, version int32,
305327
return q.orchestrator.UpdateConfig(version, config)
306328
}
307329

330+
type packetRouter struct {
331+
muxer *quicpogs.DatagramMuxerV2
332+
icmpProxy ingress.ICMPProxy
333+
logger *zerolog.Logger
334+
}
335+
336+
func (pr *packetRouter) serve(ctx context.Context) error {
337+
icmpDecoder := packet.NewICMPDecoder()
338+
for {
339+
pk, err := pr.muxer.ReceivePacket(ctx)
340+
if err != nil {
341+
return err
342+
}
343+
icmpPacket, err := icmpDecoder.Decode(pk)
344+
if err != nil {
345+
pr.logger.Err(err).Msg("Failed to decode ICMP packet from quic datagram")
346+
continue
347+
}
348+
349+
if err := pr.icmpProxy.Request(icmpPacket, pr.muxer); err != nil {
350+
pr.logger.Err(err).Str("src", icmpPacket.Src.String()).Str("dst", icmpPacket.Dst.String()).Msg("Failed to send ICMP packet")
351+
continue
352+
}
353+
}
354+
}
355+
308356
// streamReadWriteAcker is a light wrapper over QUIC streams with a callback to send response back to
309357
// the client.
310358
type streamReadWriteAcker struct {

connection/quic_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,7 @@ func testQUICConnection(udpListenerAddr net.Addr, t *testing.T) *QUICConnection
682682
&tunnelpogs.ConnectionOptions{},
683683
fakeControlStream{},
684684
&log,
685+
nil,
685686
)
686687
require.NoError(t, err)
687688
return qc

ingress/origin_icmp_proxy.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package ingress
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"strconv"
8+
9+
"github.com/google/gopacket/layers"
10+
"github.com/pkg/errors"
11+
"github.com/rs/zerolog"
12+
"golang.org/x/net/icmp"
13+
14+
"github.com/cloudflare/cloudflared/packet"
15+
)
16+
17+
// ICMPProxy sends ICMP messages and listens for their responses
18+
type ICMPProxy interface {
19+
// Request sends an ICMP message
20+
Request(pk *packet.ICMP, responder packet.FlowResponder) error
21+
// ListenResponse listens for responses to the requests until context is done
22+
ListenResponse(ctx context.Context) error
23+
}
24+
25+
// TODO: TUN-6654 Extend support to IPv6
26+
type icmpProxy struct {
27+
srcFlowTracker *packet.FlowTracker
28+
conn *icmp.PacketConn
29+
logger *zerolog.Logger
30+
encoder *packet.Encoder
31+
}
32+
33+
// TODO: TUN-6586: Use echo ID as FlowID
34+
type seqNumFlowID int
35+
36+
func (snf seqNumFlowID) ID() string {
37+
return strconv.FormatInt(int64(snf), 10)
38+
}
39+
40+
func NewICMPProxy(network string, listenIP net.IP, logger *zerolog.Logger) (*icmpProxy, error) {
41+
conn, err := icmp.ListenPacket(network, listenIP.String())
42+
if err != nil {
43+
return nil, err
44+
}
45+
return &icmpProxy{
46+
srcFlowTracker: packet.NewFlowTracker(),
47+
conn: conn,
48+
logger: logger,
49+
encoder: packet.NewEncoder(),
50+
}, nil
51+
}
52+
53+
func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FlowResponder) error {
54+
switch body := pk.Message.Body.(type) {
55+
case *icmp.Echo:
56+
return ip.sendICMPEchoRequest(pk, body, responder)
57+
default:
58+
return fmt.Errorf("sending ICMP %s is not implemented", pk.Type)
59+
}
60+
}
61+
62+
func (ip *icmpProxy) ListenResponse(ctx context.Context) error {
63+
go func() {
64+
<-ctx.Done()
65+
ip.conn.Close()
66+
}()
67+
buf := make([]byte, 1500)
68+
for {
69+
n, src, err := ip.conn.ReadFrom(buf)
70+
if err != nil {
71+
return err
72+
}
73+
// TODO: TUN-6654 Check for IPv6
74+
msg, err := icmp.ParseMessage(int(layers.IPProtocolICMPv4), buf[:n])
75+
if err != nil {
76+
ip.logger.Error().Err(err).Str("src", src.String()).Msg("Failed to parse ICMP message")
77+
continue
78+
}
79+
switch body := msg.Body.(type) {
80+
case *icmp.Echo:
81+
if err := ip.handleEchoResponse(msg, body); err != nil {
82+
ip.logger.Error().Err(err).Str("src", src.String()).Msg("Failed to handle ICMP response")
83+
continue
84+
}
85+
default:
86+
ip.logger.Warn().
87+
Str("icmpType", fmt.Sprintf("%s", msg.Type)).
88+
Msgf("Responding to this type of ICMP is not implemented")
89+
continue
90+
}
91+
}
92+
}
93+
94+
func (ip *icmpProxy) sendICMPEchoRequest(pk *packet.ICMP, echo *icmp.Echo, responder packet.FlowResponder) error {
95+
flow := packet.Flow{
96+
Src: pk.Src,
97+
Dst: pk.Dst,
98+
Responder: responder,
99+
}
100+
// TODO: TUN-6586 rewrite ICMP echo request identifier and use it to track flows
101+
flowID := seqNumFlowID(echo.Seq)
102+
// TODO: TUN-6588 clean up flows
103+
if replaced := ip.srcFlowTracker.Register(flowID, &flow, true); replaced {
104+
ip.logger.Info().Str("src", flow.Src.String()).Str("dst", flow.Dst.String()).Msg("Replaced flow")
105+
}
106+
var pseudoHeader []byte = nil
107+
serializedMsg, err := pk.Marshal(pseudoHeader)
108+
if err != nil {
109+
return errors.Wrap(err, "Failed to encode ICMP message")
110+
}
111+
// The address needs to be of type UDPAddr when conn is created without priviledge
112+
_, err = ip.conn.WriteTo(serializedMsg, &net.UDPAddr{
113+
IP: pk.Dst.AsSlice(),
114+
})
115+
return err
116+
}
117+
118+
func (ip *icmpProxy) handleEchoResponse(msg *icmp.Message, echo *icmp.Echo) error {
119+
flow, ok := ip.srcFlowTracker.Get(seqNumFlowID(echo.Seq))
120+
if !ok {
121+
return fmt.Errorf("flow not found")
122+
}
123+
icmpPacket := packet.ICMP{
124+
IP: &packet.IP{
125+
Src: flow.Dst,
126+
Dst: flow.Src,
127+
Protocol: layers.IPProtocol(msg.Type.Protocol()),
128+
},
129+
Message: msg,
130+
}
131+
serializedPacket, err := ip.encoder.Encode(&icmpPacket)
132+
if err != nil {
133+
return errors.Wrap(err, "Failed to encode ICMP message")
134+
}
135+
if err := flow.Responder.SendPacket(serializedPacket); err != nil {
136+
return errors.Wrap(err, "Failed to send packet to the edge")
137+
}
138+
return nil
139+
}

ingress/origin_icmp_proxy_test.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package ingress
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/netip"
7+
"runtime"
8+
"testing"
9+
10+
"github.com/google/gopacket/layers"
11+
"github.com/rs/zerolog"
12+
"github.com/stretchr/testify/require"
13+
"golang.org/x/net/icmp"
14+
"golang.org/x/net/ipv4"
15+
16+
"github.com/cloudflare/cloudflared/packet"
17+
)
18+
19+
var (
20+
noopLogger = zerolog.Nop()
21+
localhostIP = netip.MustParseAddr("127.0.0.1")
22+
)
23+
24+
// TestICMPProxyEcho makes sure we can send ICMP echo via the Request method and receives response via the
25+
// ListenResponse method
26+
func TestICMPProxyEcho(t *testing.T) {
27+
skipWindows(t)
28+
const (
29+
echoID = 36571
30+
endSeq = 100
31+
)
32+
proxy, err := NewICMPProxy("udp4", localhostIP.AsSlice(), &noopLogger)
33+
require.NoError(t, err)
34+
35+
proxyDone := make(chan struct{})
36+
ctx, cancel := context.WithCancel(context.Background())
37+
go func() {
38+
proxy.ListenResponse(ctx)
39+
close(proxyDone)
40+
}()
41+
42+
responder := echoFlowResponder{
43+
decoder: packet.NewICMPDecoder(),
44+
respChan: make(chan []byte),
45+
}
46+
47+
ip := packet.IP{
48+
Src: localhostIP,
49+
Dst: localhostIP,
50+
Protocol: layers.IPProtocolICMPv4,
51+
}
52+
for i := 0; i < endSeq; i++ {
53+
pk := packet.ICMP{
54+
IP: &ip,
55+
Message: &icmp.Message{
56+
Type: ipv4.ICMPTypeEcho,
57+
Code: 0,
58+
Body: &icmp.Echo{
59+
ID: echoID,
60+
Seq: i,
61+
Data: []byte(fmt.Sprintf("icmp echo seq %d", i)),
62+
},
63+
},
64+
}
65+
require.NoError(t, proxy.Request(&pk, &responder))
66+
responder.validate(t, &pk)
67+
}
68+
cancel()
69+
<-proxyDone
70+
}
71+
72+
// TestICMPProxyRejectNotEcho makes sure it rejects messages other than echo
73+
func TestICMPProxyRejectNotEcho(t *testing.T) {
74+
skipWindows(t)
75+
msgs := []icmp.Message{
76+
{
77+
Type: ipv4.ICMPTypeDestinationUnreachable,
78+
Code: 1,
79+
Body: &icmp.DstUnreach{
80+
Data: []byte("original packet"),
81+
},
82+
},
83+
{
84+
Type: ipv4.ICMPTypeTimeExceeded,
85+
Code: 1,
86+
Body: &icmp.TimeExceeded{
87+
Data: []byte("original packet"),
88+
},
89+
},
90+
{
91+
Type: ipv4.ICMPType(2),
92+
Code: 0,
93+
Body: &icmp.PacketTooBig{
94+
MTU: 1280,
95+
Data: []byte("original packet"),
96+
},
97+
},
98+
}
99+
proxy, err := NewICMPProxy("udp4", localhostIP.AsSlice(), &noopLogger)
100+
require.NoError(t, err)
101+
102+
responder := echoFlowResponder{
103+
decoder: packet.NewICMPDecoder(),
104+
respChan: make(chan []byte),
105+
}
106+
for _, m := range msgs {
107+
pk := packet.ICMP{
108+
IP: &packet.IP{
109+
Src: localhostIP,
110+
Dst: localhostIP,
111+
Protocol: layers.IPProtocolICMPv4,
112+
},
113+
Message: &m,
114+
}
115+
require.Error(t, proxy.Request(&pk, &responder))
116+
}
117+
}
118+
119+
func skipWindows(t *testing.T) {
120+
if runtime.GOOS == "windows" {
121+
t.Skip("Cannot create non-privileged datagram-oriented ICMP endpoint on Windows")
122+
}
123+
}
124+
125+
type echoFlowResponder struct {
126+
decoder *packet.ICMPDecoder
127+
respChan chan []byte
128+
}
129+
130+
func (efr *echoFlowResponder) SendPacket(pk packet.RawPacket) error {
131+
copiedPacket := make([]byte, len(pk.Data))
132+
copy(copiedPacket, pk.Data)
133+
efr.respChan <- copiedPacket
134+
return nil
135+
}
136+
137+
func (efr *echoFlowResponder) validate(t *testing.T, echoReq *packet.ICMP) {
138+
pk := <-efr.respChan
139+
decoded, err := efr.decoder.Decode(packet.RawPacket{Data: pk})
140+
require.NoError(t, err)
141+
require.Equal(t, decoded.Src, echoReq.Dst)
142+
require.Equal(t, decoded.Dst, echoReq.Src)
143+
require.Equal(t, echoReq.Protocol, decoded.Protocol)
144+
145+
require.Equal(t, ipv4.ICMPTypeEchoReply, decoded.Type)
146+
require.Equal(t, 0, decoded.Code)
147+
require.NotZero(t, decoded.Checksum)
148+
// TODO: TUN-6586: Enable this validation when ICMP echo ID matches on Linux
149+
//require.Equal(t, echoReq.Body, decoded.Body)
150+
}

0 commit comments

Comments
 (0)