Skip to content

Commit 7e760f9

Browse files
committed
TUN-6586: Change ICMP proxy to only build for Darwin and use echo ID to track flows
1 parent efb99d9 commit 7e760f9

File tree

7 files changed

+355
-130
lines changed

7 files changed

+355
-130
lines changed

ingress/icmp_darwin.go

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
//go:build darwin
2+
3+
package ingress
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"math"
9+
"net"
10+
"net/netip"
11+
"strconv"
12+
"sync"
13+
14+
"github.com/google/gopacket/layers"
15+
"github.com/pkg/errors"
16+
"github.com/rs/zerolog"
17+
"golang.org/x/net/icmp"
18+
19+
"github.com/cloudflare/cloudflared/packet"
20+
)
21+
22+
// TODO: TUN-6654 Extend support to IPv6
23+
// On Darwin, a non-privileged ICMP socket can read messages from all echo IDs, so we use it for all sources.
24+
type icmpProxy struct {
25+
// TODO: TUN-6588 clean up flows
26+
srcFlowTracker *packet.FlowTracker
27+
echoIDTracker *echoIDTracker
28+
conn *icmp.PacketConn
29+
logger *zerolog.Logger
30+
encoder *packet.Encoder
31+
}
32+
33+
// echoIDTracker tracks which ID has been assigned. It first loops through assignment from lastAssignment to then end,
34+
// then from the beginning to lastAssignment.
35+
// ICMP echo are short lived. By the time an ID is revisited, it should have been released.
36+
type echoIDTracker struct {
37+
lock sync.RWMutex
38+
// maps the source IP to an echo ID obtained from assignment
39+
srcIPMapping map[netip.Addr]uint16
40+
// assignment tracks if an ID is assigned using index as the ID
41+
// The size of the array is math.MaxUint16 because echo ID is 2 bytes
42+
assignment [math.MaxUint16]bool
43+
// nextAssignment is the next number to check for assigment
44+
nextAssignment uint16
45+
}
46+
47+
func newEchoIDTracker() *echoIDTracker {
48+
return &echoIDTracker{
49+
srcIPMapping: make(map[netip.Addr]uint16),
50+
}
51+
}
52+
53+
func (eit *echoIDTracker) get(srcIP netip.Addr) (uint16, bool) {
54+
eit.lock.RLock()
55+
defer eit.lock.RUnlock()
56+
id, ok := eit.srcIPMapping[srcIP]
57+
return id, ok
58+
}
59+
60+
func (eit *echoIDTracker) assign(srcIP netip.Addr) (uint16, bool) {
61+
eit.lock.Lock()
62+
defer eit.lock.Unlock()
63+
64+
if eit.nextAssignment == math.MaxUint16 {
65+
eit.nextAssignment = 0
66+
}
67+
68+
for i, assigned := range eit.assignment[eit.nextAssignment:] {
69+
if !assigned {
70+
echoID := uint16(i) + eit.nextAssignment
71+
eit.set(srcIP, echoID)
72+
return echoID, true
73+
}
74+
}
75+
for i, assigned := range eit.assignment[0:eit.nextAssignment] {
76+
if !assigned {
77+
echoID := uint16(i)
78+
eit.set(srcIP, echoID)
79+
return echoID, true
80+
}
81+
}
82+
return 0, false
83+
}
84+
85+
// Caller should hold the lock
86+
func (eit *echoIDTracker) set(srcIP netip.Addr, echoID uint16) {
87+
eit.assignment[echoID] = true
88+
eit.srcIPMapping[srcIP] = echoID
89+
eit.nextAssignment = echoID + 1
90+
}
91+
92+
func (eit *echoIDTracker) release(srcIP netip.Addr, id uint16) bool {
93+
eit.lock.Lock()
94+
defer eit.lock.Unlock()
95+
96+
currentID, ok := eit.srcIPMapping[srcIP]
97+
if ok && id == currentID {
98+
delete(eit.srcIPMapping, srcIP)
99+
eit.assignment[id] = false
100+
return true
101+
}
102+
return false
103+
}
104+
105+
type echoFlowID uint16
106+
107+
func (snf echoFlowID) Type() string {
108+
return "echoID"
109+
}
110+
111+
func (snf echoFlowID) String() string {
112+
return strconv.FormatUint(uint64(snf), 10)
113+
}
114+
115+
func newICMPProxy(listenIP net.IP, logger *zerolog.Logger) (ICMPProxy, error) {
116+
network := "udp6"
117+
if listenIP.To4() != nil {
118+
network = "udp4"
119+
}
120+
// Opens a non-privileged ICMP socket
121+
conn, err := icmp.ListenPacket(network, listenIP.String())
122+
if err != nil {
123+
return nil, err
124+
}
125+
return &icmpProxy{
126+
srcFlowTracker: packet.NewFlowTracker(),
127+
echoIDTracker: newEchoIDTracker(),
128+
conn: conn,
129+
logger: logger,
130+
encoder: packet.NewEncoder(),
131+
}, nil
132+
}
133+
134+
func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FlowResponder) error {
135+
switch body := pk.Message.Body.(type) {
136+
case *icmp.Echo:
137+
return ip.sendICMPEchoRequest(pk, body, responder)
138+
default:
139+
return fmt.Errorf("sending ICMP %s is not implemented", pk.Type)
140+
}
141+
}
142+
143+
func (ip *icmpProxy) ListenResponse(ctx context.Context) error {
144+
go func() {
145+
<-ctx.Done()
146+
ip.conn.Close()
147+
}()
148+
buf := make([]byte, 1500)
149+
for {
150+
n, src, err := ip.conn.ReadFrom(buf)
151+
if err != nil {
152+
return err
153+
}
154+
// TODO: TUN-6654 Check for IPv6
155+
msg, err := icmp.ParseMessage(int(layers.IPProtocolICMPv4), buf[:n])
156+
if err != nil {
157+
ip.logger.Error().Err(err).Str("src", src.String()).Msg("Failed to parse ICMP message")
158+
continue
159+
}
160+
switch body := msg.Body.(type) {
161+
case *icmp.Echo:
162+
if err := ip.handleEchoResponse(msg, body); err != nil {
163+
ip.logger.Error().Err(err).
164+
Str("src", src.String()).
165+
Str("flowID", echoFlowID(body.ID).String()).
166+
Msg("Failed to handle ICMP response")
167+
continue
168+
}
169+
default:
170+
ip.logger.Warn().
171+
Str("icmpType", fmt.Sprintf("%s", msg.Type)).
172+
Msgf("Responding to this type of ICMP is not implemented")
173+
continue
174+
}
175+
}
176+
}
177+
178+
func (ip *icmpProxy) sendICMPEchoRequest(pk *packet.ICMP, echo *icmp.Echo, responder packet.FlowResponder) error {
179+
echoID, ok := ip.echoIDTracker.get(pk.Src)
180+
if !ok {
181+
echoID, ok = ip.echoIDTracker.assign(pk.Src)
182+
if !ok {
183+
return fmt.Errorf("failed to assign unique echo ID")
184+
}
185+
flowID := echoFlowID(echoID)
186+
flow := packet.Flow{
187+
Src: pk.Src,
188+
Dst: pk.Dst,
189+
Responder: responder,
190+
}
191+
if replaced := ip.srcFlowTracker.Register(flowID, &flow, true); replaced {
192+
ip.logger.Info().Str("src", flow.Src.String()).Str("dst", flow.Dst.String()).Msg("Replaced flow")
193+
}
194+
}
195+
196+
echo.ID = int(echoID)
197+
var pseudoHeader []byte = nil
198+
serializedMsg, err := pk.Marshal(pseudoHeader)
199+
if err != nil {
200+
return errors.Wrap(err, "Failed to encode ICMP message")
201+
}
202+
// The address needs to be of type UDPAddr when conn is created without priviledge
203+
_, err = ip.conn.WriteTo(serializedMsg, &net.UDPAddr{
204+
IP: pk.Dst.AsSlice(),
205+
})
206+
return err
207+
}
208+
209+
func (ip *icmpProxy) handleEchoResponse(msg *icmp.Message, echo *icmp.Echo) error {
210+
flowID := echoFlowID(echo.ID)
211+
flow, ok := ip.srcFlowTracker.Get(flowID)
212+
if !ok {
213+
return fmt.Errorf("flow not found")
214+
}
215+
icmpPacket := packet.ICMP{
216+
IP: &packet.IP{
217+
Src: flow.Dst,
218+
Dst: flow.Src,
219+
Protocol: layers.IPProtocol(msg.Type.Protocol()),
220+
},
221+
Message: msg,
222+
}
223+
serializedPacket, err := ip.encoder.Encode(&icmpPacket)
224+
if err != nil {
225+
return errors.Wrap(err, "Failed to encode ICMP message")
226+
}
227+
if err := flow.Responder.SendPacket(serializedPacket); err != nil {
228+
return errors.Wrap(err, "Failed to send packet to the edge")
229+
}
230+
return nil
231+
}

ingress/icmp_darwin_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
//go:build darwin
2+
3+
package ingress
4+
5+
import (
6+
"math"
7+
"net/netip"
8+
"testing"
9+
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestSingleEchoIDTracker(t *testing.T) {
14+
tracker := newEchoIDTracker()
15+
srcIP := netip.MustParseAddr("127.0.0.1")
16+
echoID, ok := tracker.get(srcIP)
17+
require.False(t, ok)
18+
require.Equal(t, uint16(0), echoID)
19+
20+
// not assigned yet, so nothing to release
21+
require.False(t, tracker.release(srcIP, echoID))
22+
23+
echoID, ok = tracker.assign(srcIP)
24+
require.True(t, ok)
25+
require.Equal(t, uint16(0), echoID)
26+
27+
echoID, ok = tracker.get(srcIP)
28+
require.True(t, ok)
29+
require.Equal(t, uint16(0), echoID)
30+
31+
// releasing a different ID returns false
32+
require.False(t, tracker.release(srcIP, 1999))
33+
require.True(t, tracker.release(srcIP, echoID))
34+
// releasing the second time returns false
35+
require.False(t, tracker.release(srcIP, echoID))
36+
37+
echoID, ok = tracker.get(srcIP)
38+
require.False(t, ok)
39+
require.Equal(t, uint16(0), echoID)
40+
41+
// Move to the next IP
42+
echoID, ok = tracker.assign(srcIP)
43+
require.True(t, ok)
44+
require.Equal(t, uint16(1), echoID)
45+
}
46+
47+
func TestFullEchoIDTracker(t *testing.T) {
48+
tracker := newEchoIDTracker()
49+
firstIP := netip.MustParseAddr("172.16.0.1")
50+
srcIP := firstIP
51+
52+
for i := uint16(0); i < math.MaxUint16; i++ {
53+
echoID, ok := tracker.assign(srcIP)
54+
require.True(t, ok)
55+
require.Equal(t, i, echoID)
56+
57+
echoID, ok = tracker.get(srcIP)
58+
require.True(t, ok)
59+
require.Equal(t, i, echoID)
60+
srcIP = srcIP.Next()
61+
}
62+
63+
// All echo IDs are assigned
64+
echoID, ok := tracker.assign(srcIP.Next())
65+
require.False(t, ok)
66+
require.Equal(t, uint16(0), echoID)
67+
68+
srcIP = firstIP
69+
for i := uint16(0); i < math.MaxUint16; i++ {
70+
ok := tracker.release(srcIP, i)
71+
require.True(t, ok)
72+
73+
echoID, ok = tracker.get(srcIP)
74+
require.False(t, ok)
75+
require.Equal(t, uint16(0), echoID)
76+
srcIP = srcIP.Next()
77+
}
78+
79+
// The IDs are assignable again
80+
srcIP = firstIP
81+
for i := uint16(0); i < math.MaxUint16; i++ {
82+
echoID, ok := tracker.assign(srcIP)
83+
require.True(t, ok)
84+
require.Equal(t, i, echoID)
85+
86+
echoID, ok = tracker.get(srcIP)
87+
require.True(t, ok)
88+
require.Equal(t, i, echoID)
89+
srcIP = srcIP.Next()
90+
}
91+
}

ingress/icmp_generic.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
//go:build !darwin
2+
3+
package ingress
4+
5+
import (
6+
"fmt"
7+
"net"
8+
"runtime"
9+
10+
"github.com/rs/zerolog"
11+
)
12+
13+
func newICMPProxy(listenIP net.IP, logger *zerolog.Logger) (ICMPProxy, error) {
14+
return nil, fmt.Errorf("ICMP proxy is not implemented on %s", runtime.GOOS)
15+
}

0 commit comments

Comments
 (0)