Skip to content

Commit fc20a22

Browse files
committed
TUN-6695: Implement ICMP proxy for linux
1 parent faa86ff commit fc20a22

File tree

7 files changed

+374
-33
lines changed

7 files changed

+374
-33
lines changed

ingress/icmp_darwin.go

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ type icmpProxy struct {
2727
echoIDTracker *echoIDTracker
2828
conn *icmp.PacketConn
2929
logger *zerolog.Logger
30-
encoder *packet.Encoder
3130
}
3231

3332
// echoIDTracker tracks which ID has been assigned. It first loops through assignment from lastAssignment to then end,
@@ -112,13 +111,8 @@ func (snf echoFlowID) String() string {
112111
return strconv.FormatUint(uint64(snf), 10)
113112
}
114113

115-
func newICMPProxy(listenIP net.IP, logger *zerolog.Logger) (ICMPProxy, error) {
116-
network := "udp6"
117-
if listenIP.To4() != nil {
118-
network = "udp4"
119-
}
120-
// Opens a non-privileged ICMP socket
121-
conn, err := icmp.ListenPacket(network, listenIP.String())
114+
func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger) (ICMPProxy, error) {
115+
conn, err := newICMPConn(listenIP)
122116
if err != nil {
123117
return nil, err
124118
}
@@ -127,11 +121,13 @@ func newICMPProxy(listenIP net.IP, logger *zerolog.Logger) (ICMPProxy, error) {
127121
echoIDTracker: newEchoIDTracker(),
128122
conn: conn,
129123
logger: logger,
130-
encoder: packet.NewEncoder(),
131124
}, nil
132125
}
133126

134127
func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FlowResponder) error {
128+
if pk == nil {
129+
return errPacketNil
130+
}
135131
switch body := pk.Message.Body.(type) {
136132
case *icmp.Echo:
137133
return ip.sendICMPEchoRequest(pk, body, responder)
@@ -140,12 +136,14 @@ func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FlowResponder) er
140136
}
141137
}
142138

143-
func (ip *icmpProxy) ListenResponse(ctx context.Context) error {
139+
// Serve listens for responses to the requests until context is done
140+
func (ip *icmpProxy) Serve(ctx context.Context) error {
144141
go func() {
145142
<-ctx.Done()
146143
ip.conn.Close()
147144
}()
148-
buf := make([]byte, 1500)
145+
buf := make([]byte, mtu)
146+
encoder := packet.NewEncoder()
149147
for {
150148
n, src, err := ip.conn.ReadFrom(buf)
151149
if err != nil {
@@ -159,7 +157,7 @@ func (ip *icmpProxy) ListenResponse(ctx context.Context) error {
159157
}
160158
switch body := msg.Body.(type) {
161159
case *icmp.Echo:
162-
if err := ip.handleEchoResponse(msg, body); err != nil {
160+
if err := ip.handleEchoResponse(encoder, msg, body); err != nil {
163161
ip.logger.Error().Err(err).
164162
Str("src", src.String()).
165163
Str("flowID", echoFlowID(body.ID).String()).
@@ -206,7 +204,7 @@ func (ip *icmpProxy) sendICMPEchoRequest(pk *packet.ICMP, echo *icmp.Echo, respo
206204
return err
207205
}
208206

209-
func (ip *icmpProxy) handleEchoResponse(msg *icmp.Message, echo *icmp.Echo) error {
207+
func (ip *icmpProxy) handleEchoResponse(encoder *packet.Encoder, msg *icmp.Message, echo *icmp.Echo) error {
210208
flowID := echoFlowID(echo.ID)
211209
flow, ok := ip.srcFlowTracker.Get(flowID)
212210
if !ok {
@@ -220,7 +218,7 @@ func (ip *icmpProxy) handleEchoResponse(msg *icmp.Message, echo *icmp.Echo) erro
220218
},
221219
Message: msg,
222220
}
223-
serializedPacket, err := ip.encoder.Encode(&icmpPacket)
221+
serializedPacket, err := encoder.Encode(&icmpPacket)
224222
if err != nil {
225223
return errors.Wrap(err, "Failed to encode ICMP message")
226224
}

ingress/icmp_generic.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
//go:build !darwin
1+
//go:build !darwin && !linux
22

33
package ingress
44

55
import (
66
"fmt"
7-
"net"
7+
"net/netip"
88
"runtime"
99

1010
"github.com/rs/zerolog"
1111
)
1212

13-
func newICMPProxy(listenIP net.IP, logger *zerolog.Logger) (ICMPProxy, error) {
13+
func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger) (ICMPProxy, error) {
1414
return nil, fmt.Errorf("ICMP proxy is not implemented on %s", runtime.GOOS)
1515
}

ingress/icmp_linux.go

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
//go:build linux
2+
3+
package ingress
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"net"
9+
"net/netip"
10+
"sync"
11+
"sync/atomic"
12+
"time"
13+
14+
"github.com/google/gopacket/layers"
15+
"github.com/pkg/errors"
16+
"github.com/rs/zerolog"
17+
"golang.org/x/net/icmp"
18+
19+
"github.com/cloudflare/cloudflared/packet"
20+
)
21+
22+
// The request echo ID is rewritten to the port of the socket. The kernel uses the reply echo ID to demultiplex
23+
// We can open a socket for each source so multiple sources requesting the same destination doesn't collide
24+
type icmpProxy struct {
25+
srcToFlowTracker *srcToFlowTracker
26+
listenIP netip.Addr
27+
logger *zerolog.Logger
28+
shutdownC chan struct{}
29+
}
30+
31+
func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger) (ICMPProxy, error) {
32+
if err := testPermission(listenIP); err != nil {
33+
return nil, err
34+
}
35+
return &icmpProxy{
36+
srcToFlowTracker: newSrcToConnTracker(),
37+
listenIP: listenIP,
38+
logger: logger,
39+
shutdownC: make(chan struct{}),
40+
}, nil
41+
}
42+
43+
func testPermission(listenIP netip.Addr) error {
44+
// Opens a non-privileged ICMP socket. On Linux the group ID of the process needs to be in ping_group_range
45+
// For more information, see https://man7.org/linux/man-pages/man7/icmp.7.html and https://lwn.net/Articles/422330/
46+
conn, err := newICMPConn(listenIP)
47+
if err != nil {
48+
// TODO: TUN-6715 check if cloudflared is in ping_group_range if the check failed. If not log instruction to
49+
// change the group ID
50+
return err
51+
}
52+
// This conn is only to test if cloudflared has permission to open this type of socket
53+
conn.Close()
54+
return nil
55+
}
56+
57+
func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FlowResponder) error {
58+
if pk == nil {
59+
return errPacketNil
60+
}
61+
switch body := pk.Message.Body.(type) {
62+
case *icmp.Echo:
63+
return ip.sendICMPEchoRequest(pk, body, responder)
64+
default:
65+
return fmt.Errorf("sending ICMP %s is not implemented", pk.Type)
66+
}
67+
}
68+
69+
func (ip *icmpProxy) Serve(ctx context.Context) error {
70+
<-ctx.Done()
71+
close(ip.shutdownC)
72+
return ctx.Err()
73+
}
74+
75+
func (ip *icmpProxy) sendICMPEchoRequest(pk *packet.ICMP, echo *icmp.Echo, responder packet.FlowResponder) error {
76+
icmpFlow, ok := ip.srcToFlowTracker.get(pk.Src)
77+
if ok {
78+
return icmpFlow.send(pk)
79+
}
80+
81+
conn, err := newICMPConn(ip.listenIP)
82+
if err != nil {
83+
return err
84+
}
85+
flow := packet.Flow{
86+
Src: pk.Src,
87+
Dst: pk.Dst,
88+
Responder: responder,
89+
}
90+
icmpFlow = newICMPFlow(conn, &flow, uint16(echo.ID), ip.logger)
91+
go func() {
92+
defer ip.srcToFlowTracker.delete(pk.Src)
93+
94+
if err := icmpFlow.serve(ip.shutdownC, defaultCloseAfterIdle); err != nil {
95+
ip.logger.Debug().Err(err).Uint16("flowID", icmpFlow.echoID).Msg("flow terminated")
96+
}
97+
}()
98+
ip.srcToFlowTracker.set(pk.Src, icmpFlow)
99+
return icmpFlow.send(pk)
100+
}
101+
102+
type srcIPFlowID netip.Addr
103+
104+
func (sifd srcIPFlowID) Type() string {
105+
return "srcIP"
106+
}
107+
108+
func (sifd srcIPFlowID) String() string {
109+
return netip.Addr(sifd).String()
110+
}
111+
112+
type srcToFlowTracker struct {
113+
lock sync.RWMutex
114+
// srcIPToConn tracks source IP to ICMP connection
115+
srcToFlow map[netip.Addr]*icmpFlow
116+
}
117+
118+
func newSrcToConnTracker() *srcToFlowTracker {
119+
return &srcToFlowTracker{
120+
srcToFlow: make(map[netip.Addr]*icmpFlow),
121+
}
122+
}
123+
124+
func (sft *srcToFlowTracker) get(srcIP netip.Addr) (*icmpFlow, bool) {
125+
sft.lock.RLock()
126+
defer sft.lock.RUnlock()
127+
128+
flow, ok := sft.srcToFlow[srcIP]
129+
return flow, ok
130+
}
131+
132+
func (sft *srcToFlowTracker) set(srcIP netip.Addr, flow *icmpFlow) {
133+
sft.lock.Lock()
134+
defer sft.lock.Unlock()
135+
136+
sft.srcToFlow[srcIP] = flow
137+
}
138+
139+
func (sft *srcToFlowTracker) delete(srcIP netip.Addr) {
140+
sft.lock.Lock()
141+
defer sft.lock.Unlock()
142+
143+
delete(sft.srcToFlow, srcIP)
144+
}
145+
146+
type icmpFlow struct {
147+
conn *icmp.PacketConn
148+
flow *packet.Flow
149+
echoID uint16
150+
// last active unix time. Unit is seconds
151+
lastActive int64
152+
logger *zerolog.Logger
153+
}
154+
155+
func newICMPFlow(conn *icmp.PacketConn, flow *packet.Flow, echoID uint16, logger *zerolog.Logger) *icmpFlow {
156+
return &icmpFlow{
157+
conn: conn,
158+
flow: flow,
159+
echoID: echoID,
160+
lastActive: time.Now().Unix(),
161+
logger: logger,
162+
}
163+
}
164+
165+
func (f *icmpFlow) serve(shutdownC chan struct{}, closeAfterIdle time.Duration) error {
166+
errC := make(chan error)
167+
go func() {
168+
errC <- f.listenResponse()
169+
}()
170+
171+
checkIdleTicker := time.NewTicker(closeAfterIdle)
172+
defer f.conn.Close()
173+
defer checkIdleTicker.Stop()
174+
for {
175+
select {
176+
case err := <-errC:
177+
return err
178+
case <-shutdownC:
179+
return nil
180+
case <-checkIdleTicker.C:
181+
now := time.Now().Unix()
182+
lastActive := atomic.LoadInt64(&f.lastActive)
183+
if now > lastActive+int64(closeAfterIdle.Seconds()) {
184+
return errFlowInactive
185+
}
186+
}
187+
}
188+
}
189+
190+
func (f *icmpFlow) send(pk *packet.ICMP) error {
191+
f.updateLastActive()
192+
193+
// For IPv4, the pseudoHeader is not used because the checksum is always calculated
194+
var pseudoHeader []byte = nil
195+
serializedMsg, err := pk.Marshal(pseudoHeader)
196+
if err != nil {
197+
return errors.Wrap(err, "Failed to encode ICMP message")
198+
}
199+
// The address needs to be of type UDPAddr when conn is created without priviledge
200+
_, err = f.conn.WriteTo(serializedMsg, &net.UDPAddr{
201+
IP: pk.Dst.AsSlice(),
202+
})
203+
return err
204+
}
205+
206+
func (f *icmpFlow) listenResponse() error {
207+
buf := make([]byte, mtu)
208+
encoder := packet.NewEncoder()
209+
for {
210+
n, src, err := f.conn.ReadFrom(buf)
211+
if err != nil {
212+
return err
213+
}
214+
f.updateLastActive()
215+
216+
if err := f.handleResponse(encoder, src, buf[:n]); err != nil {
217+
f.logger.Err(err).Str("dst", src.String()).Msg("Failed to handle ICMP response")
218+
continue
219+
}
220+
}
221+
}
222+
223+
func (f *icmpFlow) handleResponse(encoder *packet.Encoder, from net.Addr, rawPacket []byte) error {
224+
// TODO: TUN-6654 Check for IPv6
225+
msg, err := icmp.ParseMessage(int(layers.IPProtocolICMPv4), rawPacket)
226+
if err != nil {
227+
return err
228+
}
229+
230+
echo, ok := msg.Body.(*icmp.Echo)
231+
if !ok {
232+
return fmt.Errorf("received unexpected icmp type %s from non-privileged ICMP socket", msg.Type)
233+
}
234+
235+
addrPort, err := netip.ParseAddrPort(from.String())
236+
if err != nil {
237+
return err
238+
}
239+
icmpPacket := packet.ICMP{
240+
IP: &packet.IP{
241+
Src: addrPort.Addr(),
242+
Dst: f.flow.Src,
243+
Protocol: layers.IPProtocol(msg.Type.Protocol()),
244+
},
245+
Message: &icmp.Message{
246+
Type: msg.Type,
247+
Code: msg.Code,
248+
Body: &icmp.Echo{
249+
ID: int(f.echoID),
250+
Seq: echo.Seq,
251+
Data: echo.Data,
252+
},
253+
},
254+
}
255+
serializedPacket, err := encoder.Encode(&icmpPacket)
256+
if err != nil {
257+
return errors.Wrap(err, "Failed to encode ICMP message")
258+
}
259+
if err := f.flow.Responder.SendPacket(serializedPacket); err != nil {
260+
return errors.Wrap(err, "Failed to send packet to the edge")
261+
}
262+
return nil
263+
}
264+
265+
func (f *icmpFlow) updateLastActive() {
266+
atomic.StoreInt64(&f.lastActive, time.Now().Unix())
267+
}

0 commit comments

Comments
 (0)