Skip to content

Commit 2ffff06

Browse files
committed
TUN-6696: Refactor flow into funnel and close idle funnels
A funnel is an abstraction for 1 source to many destinations. As part of this refactoring, shared logic between Darwin and Linux are moved into icmp_posix
1 parent e380333 commit 2ffff06

File tree

13 files changed

+666
-471
lines changed

13 files changed

+666
-471
lines changed

connection/quic.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"io"
88
"net"
99
"net/http"
10+
"net/netip"
1011
"strconv"
1112
"strings"
1213
"sync/atomic"
@@ -346,13 +347,31 @@ func (pr *packetRouter) serve(ctx context.Context) error {
346347
continue
347348
}
348349

349-
if err := pr.icmpProxy.Request(icmpPacket, pr.muxer); err != nil {
350-
pr.logger.Err(err).Str("src", icmpPacket.Src.String()).Str("dst", icmpPacket.Dst.String()).Msg("Failed to send ICMP packet")
350+
flowPipe := muxerResponder{muxer: pr.muxer}
351+
if err := pr.icmpProxy.Request(icmpPacket, &flowPipe); err != nil {
352+
pr.logger.Err(err).
353+
Str("src", icmpPacket.Src.String()).
354+
Str("dst", icmpPacket.Dst.String()).
355+
Interface("type", icmpPacket.Type).
356+
Msg("Failed to send ICMP packet")
351357
continue
352358
}
353359
}
354360
}
355361

362+
// muxerResponder wraps DatagramMuxerV2 to satisfy the packet.FunnelUniPipe interface
363+
type muxerResponder struct {
364+
muxer *quicpogs.DatagramMuxerV2
365+
}
366+
367+
func (mr *muxerResponder) SendPacket(dst netip.Addr, pk packet.RawPacket) error {
368+
return mr.muxer.SendPacket(pk)
369+
}
370+
371+
func (mr *muxerResponder) Close() error {
372+
return nil
373+
}
374+
356375
// streamReadWriteAcker is a light wrapper over QUIC streams with a callback to send response back to
357376
// the client.
358377
type streamReadWriteAcker struct {

ingress/icmp_darwin.go

Lines changed: 89 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
package ingress
44

5+
// This file implements ICMPProxy for Darwin. It uses a non-privileged ICMP socket to send echo requests and listen for
6+
// echo replies. The source IP of the requests are rewritten to the bind IP of the socket and the socket reads all
7+
// messages, so we use echo ID to distinguish the replies. Each (source IP, destination IP, echo ID) is assigned a
8+
// unique echo ID.
9+
510
import (
611
"context"
712
"fmt"
@@ -10,23 +15,23 @@ import (
1015
"net/netip"
1116
"strconv"
1217
"sync"
18+
"time"
1319

14-
"github.com/google/gopacket/layers"
15-
"github.com/pkg/errors"
1620
"github.com/rs/zerolog"
1721
"golang.org/x/net/icmp"
1822

1923
"github.com/cloudflare/cloudflared/packet"
2024
)
2125

2226
// TODO: TUN-6654 Extend support to IPv6
23-
// On Darwin, a non-privileged ICMP socket can read messages from all echo IDs, so we use it for all sources.
2427
type icmpProxy struct {
25-
// TODO: TUN-6588 clean up flows
26-
srcFlowTracker *packet.FlowTracker
27-
echoIDTracker *echoIDTracker
28-
conn *icmp.PacketConn
29-
logger *zerolog.Logger
28+
srcFunnelTracker *packet.FunnelTracker
29+
echoIDTracker *echoIDTracker
30+
conn *icmp.PacketConn
31+
// Response is handled in one-by-one, so encoder can be shared between funnels
32+
encoder *packet.Encoder
33+
logger *zerolog.Logger
34+
idleTimeout time.Duration
3035
}
3136

3237
// echoIDTracker tracks which ID has been assigned. It first loops through assignment from lastAssignment to then end,
@@ -92,48 +97,77 @@ func (eit *echoIDTracker) release(srcIP netip.Addr, id uint16) bool {
9297
eit.lock.Lock()
9398
defer eit.lock.Unlock()
9499

95-
currentID, ok := eit.srcIPMapping[srcIP]
96-
if ok && id == currentID {
100+
currentID, exists := eit.srcIPMapping[srcIP]
101+
if exists && id == currentID {
97102
delete(eit.srcIPMapping, srcIP)
98103
eit.assignment[id] = false
99104
return true
100105
}
101106
return false
102107
}
103108

104-
type echoFlowID uint16
109+
type echoFunnelID uint16
105110

106-
func (snf echoFlowID) Type() string {
111+
func (snf echoFunnelID) Type() string {
107112
return "echoID"
108113
}
109114

110-
func (snf echoFlowID) String() string {
115+
func (snf echoFunnelID) String() string {
111116
return strconv.FormatUint(uint64(snf), 10)
112117
}
113118

114-
func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger) (ICMPProxy, error) {
119+
func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger, idleTimeout time.Duration) (ICMPProxy, error) {
115120
conn, err := newICMPConn(listenIP)
116121
if err != nil {
117122
return nil, err
118123
}
119124
return &icmpProxy{
120-
srcFlowTracker: packet.NewFlowTracker(),
121-
echoIDTracker: newEchoIDTracker(),
122-
conn: conn,
123-
logger: logger,
125+
srcFunnelTracker: packet.NewFunnelTracker(),
126+
echoIDTracker: newEchoIDTracker(),
127+
encoder: packet.NewEncoder(),
128+
conn: conn,
129+
logger: logger,
130+
idleTimeout: idleTimeout,
124131
}, nil
125132
}
126133

127-
func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FlowResponder) error {
134+
func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error {
128135
if pk == nil {
129136
return errPacketNil
130137
}
131-
switch body := pk.Message.Body.(type) {
132-
case *icmp.Echo:
133-
return ip.sendICMPEchoRequest(pk, body, responder)
134-
default:
135-
return fmt.Errorf("sending ICMP %s is not implemented", pk.Type)
138+
// TODO: TUN-6744 assign unique flow per (src, echo ID)
139+
echoID, exists := ip.echoIDTracker.get(pk.Src)
140+
if !exists {
141+
originalEcho, err := getICMPEcho(pk.Message)
142+
if err != nil {
143+
return err
144+
}
145+
echoID, exists = ip.echoIDTracker.assign(pk.Src)
146+
if !exists {
147+
return fmt.Errorf("failed to assign unique echo ID")
148+
}
149+
funnelID := echoFunnelID(echoID)
150+
originSender := originSender{
151+
conn: ip.conn,
152+
echoIDTracker: ip.echoIDTracker,
153+
srcIP: pk.Src,
154+
echoID: echoID,
155+
}
156+
icmpFlow := newICMPEchoFlow(pk.Src, &originSender, responder, int(echoID), originalEcho.ID, ip.encoder)
157+
if replaced := ip.srcFunnelTracker.Register(funnelID, icmpFlow); replaced {
158+
ip.logger.Info().Str("src", pk.Src.String()).Msg("Replaced funnel")
159+
}
160+
return icmpFlow.sendToDst(pk.Dst, pk.Message)
161+
}
162+
funnel, exists := ip.srcFunnelTracker.Get(echoFunnelID(echoID))
163+
if !exists {
164+
return packet.ErrFunnelNotFound
165+
}
166+
icmpFlow, err := toICMPEchoFlow(funnel)
167+
if err != nil {
168+
return err
136169
}
170+
return icmpFlow.sendToDst(pk.Dst, pk.Message)
137171
}
138172

139173
// Serve listens for responses to the requests until context is done
@@ -142,88 +176,54 @@ func (ip *icmpProxy) Serve(ctx context.Context) error {
142176
<-ctx.Done()
143177
ip.conn.Close()
144178
}()
179+
go func() {
180+
ip.srcFunnelTracker.ScheduleCleanup(ctx, ip.idleTimeout)
181+
}()
145182
buf := make([]byte, mtu)
146-
encoder := packet.NewEncoder()
147183
for {
148184
n, src, err := ip.conn.ReadFrom(buf)
149185
if err != nil {
150186
return err
151187
}
152-
// TODO: TUN-6654 Check for IPv6
153-
msg, err := icmp.ParseMessage(int(layers.IPProtocolICMPv4), buf[:n])
154-
if err != nil {
155-
ip.logger.Error().Err(err).Str("src", src.String()).Msg("Failed to parse ICMP message")
156-
continue
157-
}
158-
switch body := msg.Body.(type) {
159-
case *icmp.Echo:
160-
if err := ip.handleEchoResponse(encoder, msg, body); err != nil {
161-
ip.logger.Error().Err(err).
162-
Str("src", src.String()).
163-
Str("flowID", echoFlowID(body.ID).String()).
164-
Msg("Failed to handle ICMP response")
165-
continue
166-
}
167-
default:
168-
ip.logger.Warn().
169-
Str("icmpType", fmt.Sprintf("%s", msg.Type)).
170-
Msgf("Responding to this type of ICMP is not implemented")
188+
if err := ip.handleResponse(src, buf[:n]); err != nil {
189+
ip.logger.Err(err).Str("src", src.String()).Msg("Failed to handle ICMP response")
171190
continue
172191
}
173192
}
174193
}
175194

176-
func (ip *icmpProxy) sendICMPEchoRequest(pk *packet.ICMP, echo *icmp.Echo, responder packet.FlowResponder) error {
177-
echoID, ok := ip.echoIDTracker.get(pk.Src)
178-
if !ok {
179-
echoID, ok = ip.echoIDTracker.assign(pk.Src)
180-
if !ok {
181-
return fmt.Errorf("failed to assign unique echo ID")
182-
}
183-
flowID := echoFlowID(echoID)
184-
flow := packet.Flow{
185-
Src: pk.Src,
186-
Dst: pk.Dst,
187-
Responder: responder,
188-
}
189-
if replaced := ip.srcFlowTracker.Register(flowID, &flow, true); replaced {
190-
ip.logger.Info().Str("src", flow.Src.String()).Str("dst", flow.Dst.String()).Msg("Replaced flow")
191-
}
195+
func (ip *icmpProxy) handleResponse(from net.Addr, rawMsg []byte) error {
196+
reply, err := parseReply(from, rawMsg)
197+
if err != nil {
198+
return err
192199
}
193-
194-
echo.ID = int(echoID)
195-
var pseudoHeader []byte = nil
196-
serializedMsg, err := pk.Marshal(pseudoHeader)
200+
funnel, exists := ip.srcFunnelTracker.Get(echoFunnelID(reply.echo.ID))
201+
if !exists {
202+
return packet.ErrFunnelNotFound
203+
}
204+
icmpFlow, err := toICMPEchoFlow(funnel)
197205
if err != nil {
198-
return errors.Wrap(err, "Failed to encode ICMP message")
206+
return err
199207
}
200-
// The address needs to be of type UDPAddr when conn is created without priviledge
201-
_, err = ip.conn.WriteTo(serializedMsg, &net.UDPAddr{
202-
IP: pk.Dst.AsSlice(),
208+
return icmpFlow.returnToSrc(reply)
209+
}
210+
211+
// originSender wraps icmp.PacketConn to implement packet.FunnelUniPipe interface
212+
type originSender struct {
213+
conn *icmp.PacketConn
214+
echoIDTracker *echoIDTracker
215+
srcIP netip.Addr
216+
echoID uint16
217+
}
218+
219+
func (os *originSender) SendPacket(dst netip.Addr, pk packet.RawPacket) error {
220+
_, err := os.conn.WriteTo(pk.Data, &net.UDPAddr{
221+
IP: dst.AsSlice(),
203222
})
204223
return err
205224
}
206225

207-
func (ip *icmpProxy) handleEchoResponse(encoder *packet.Encoder, msg *icmp.Message, echo *icmp.Echo) error {
208-
flowID := echoFlowID(echo.ID)
209-
flow, ok := ip.srcFlowTracker.Get(flowID)
210-
if !ok {
211-
return fmt.Errorf("flow not found")
212-
}
213-
icmpPacket := packet.ICMP{
214-
IP: &packet.IP{
215-
Src: flow.Dst,
216-
Dst: flow.Src,
217-
Protocol: layers.IPProtocol(msg.Type.Protocol()),
218-
},
219-
Message: msg,
220-
}
221-
serializedPacket, err := encoder.Encode(&icmpPacket)
222-
if err != nil {
223-
return errors.Wrap(err, "Failed to encode ICMP message")
224-
}
225-
if err := flow.Responder.SendPacket(serializedPacket); err != nil {
226-
return errors.Wrap(err, "Failed to send packet to the edge")
227-
}
226+
func (os *originSender) Close() error {
227+
os.echoIDTracker.release(os.srcIP, os.echoID)
228228
return nil
229229
}

ingress/icmp_generic.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@ import (
1010
"github.com/rs/zerolog"
1111
)
1212

13-
func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger) (ICMPProxy, error) {
13+
func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger, idleTimeout time.Duration) (ICMPProxy, error) {
1414
return nil, fmt.Errorf("ICMP proxy is not implemented on %s", runtime.GOOS)
1515
}

0 commit comments

Comments
 (0)