Skip to content

Commit 495f9fb

Browse files
committed
TUN-6856: Refactor to lay foundation for tracing ICMP
Remove send and return methods from Funnel interface. Users of Funnel can provide their own send and return methods without wrapper to comply with the interface. Move packet router to ingress package to avoid circular dependency
1 parent 225c344 commit 495f9fb

File tree

15 files changed

+323
-337
lines changed

15 files changed

+323
-337
lines changed

cmd/cloudflared/tunnel/configuration.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222

2323
"github.com/cloudflare/cloudflared/cmd/cloudflared/cliutil"
2424
"github.com/cloudflare/cloudflared/edgediscovery/allregions"
25-
"github.com/cloudflare/cloudflared/packet"
2625

2726
"github.com/cloudflare/cloudflared/config"
2827
"github.com/cloudflare/cloudflared/connection"
@@ -463,7 +462,7 @@ func parseConfigIPVersion(version string) (v allregions.ConfigIPVersion, err err
463462
return
464463
}
465464

466-
func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*packet.GlobalRouterConfig, error) {
465+
func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*ingress.GlobalRouterConfig, error) {
467466
ipv4Src, err := determineICMPv4Src(c.String("icmpv4-src"), logger)
468467
if err != nil {
469468
return nil, errors.Wrap(err, "failed to determine IPv4 source address for ICMP proxy")
@@ -484,7 +483,7 @@ func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*packet.GlobalRout
484483
if err != nil {
485484
return nil, err
486485
}
487-
return &packet.GlobalRouterConfig{
486+
return &ingress.GlobalRouterConfig{
488487
ICMPRouter: icmpRouter,
489488
IPv4Src: ipv4Src,
490489
IPv6Src: ipv6Src,

connection/quic.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type QUICConnection struct {
5757
sessionManager datagramsession.Manager
5858
// datagramMuxer mux/demux datagrams from quic connection
5959
datagramMuxer *quicpogs.DatagramMuxerV2
60-
packetRouter *packet.Router
60+
packetRouter *ingress.PacketRouter
6161
controlStreamHandler ControlStreamHandler
6262
connOptions *tunnelpogs.ConnectionOptions
6363
}
@@ -72,7 +72,7 @@ func NewQUICConnection(
7272
connOptions *tunnelpogs.ConnectionOptions,
7373
controlStreamHandler ControlStreamHandler,
7474
logger *zerolog.Logger,
75-
packetRouterConfig *packet.GlobalRouterConfig,
75+
packetRouterConfig *ingress.GlobalRouterConfig,
7676
) (*QUICConnection, error) {
7777
udpConn, err := createUDPConnForConnIndex(connIndex, logger)
7878
if err != nil {
@@ -93,8 +93,7 @@ func NewQUICConnection(
9393
sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity)
9494
datagramMuxer := quicpogs.NewDatagramMuxerV2(session, logger, sessionDemuxChan)
9595
sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan)
96-
muxer := muxerWrapper{muxer: datagramMuxer}
97-
packetRouter := packet.NewRouter(packetRouterConfig, &muxer, &muxer, logger, orchestrator.WarpRoutingEnabled)
96+
packetRouter := ingress.NewPacketRouter(packetRouterConfig, datagramMuxer, logger, orchestrator.WarpRoutingEnabled)
9897

9998
return &QUICConnection{
10099
session: session,

ingress/icmp_darwin.go

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"context"
1212
"fmt"
1313
"math"
14-
"net"
1514
"net/netip"
1615
"strconv"
1716
"sync"
@@ -129,10 +128,7 @@ func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idle
129128
}, nil
130129
}
131130

132-
func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error {
133-
if pk == nil {
134-
return errPacketNil
135-
}
131+
func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
136132
originalEcho, err := getICMPEcho(pk.Message)
137133
if err != nil {
138134
return err
@@ -152,13 +148,11 @@ func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) er
152148
if err != nil {
153149
return nil, err
154150
}
155-
originSender := originSender{
156-
conn: ip.conn,
157-
echoIDTracker: ip.echoIDTracker,
158-
echoIDTrackerKey: echoIDTrackerKey,
159-
assignedEchoID: assignedEchoID,
151+
closeCallback := func() error {
152+
ip.echoIDTracker.release(echoIDTrackerKey, assignedEchoID)
153+
return nil
160154
}
161-
icmpFlow := newICMPEchoFlow(pk.Src, &originSender, responder, int(assignedEchoID), originalEcho.ID, ip.encoder)
155+
icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, ip.conn, responder, int(assignedEchoID), originalEcho.ID, ip.encoder)
162156
return icmpFlow, nil
163157
}
164158
funnelID := echoFunnelID(assignedEchoID)
@@ -250,23 +244,3 @@ func (ip *icmpProxy) sendReply(reply *echoReply) error {
250244
}
251245
return icmpFlow.returnToSrc(reply)
252246
}
253-
254-
// originSender wraps icmp.PacketConn to implement packet.FunnelUniPipe interface
255-
type originSender struct {
256-
conn *icmp.PacketConn
257-
echoIDTracker *echoIDTracker
258-
echoIDTrackerKey flow3Tuple
259-
assignedEchoID uint16
260-
}
261-
262-
func (os *originSender) SendPacket(dst netip.Addr, pk packet.RawPacket) error {
263-
_, err := os.conn.WriteTo(pk.Data, &net.UDPAddr{
264-
IP: dst.AsSlice(),
265-
})
266-
return err
267-
}
268-
269-
func (os *originSender) Close() error {
270-
os.echoIDTracker.release(os.echoIDTrackerKey, os.assignedEchoID)
271-
return nil
272-
}

ingress/icmp_generic.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ var errICMPProxyNotImplemented = fmt.Errorf("ICMP proxy is not implemented on %s
1818

1919
type icmpProxy struct{}
2020

21-
func (ip icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error {
21+
func (ip icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
2222
return errICMPProxyNotImplemented
2323
}
2424

ingress/icmp_linux.go

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,7 @@ func checkInPingGroup() error {
9797
return fmt.Errorf("did not find group range in %s", pingGroupPath)
9898
}
9999

100-
func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error {
101-
if pk == nil {
102-
return errPacketNil
103-
}
100+
func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
104101
originalEcho, err := getICMPEcho(pk.Message)
105102
if err != nil {
106103
return err
@@ -113,13 +110,15 @@ func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) er
113110
}
114111
ip.logger.Debug().Msgf("Opened ICMP socket listen on %s", conn.LocalAddr())
115112
newConnChan <- conn
113+
closeCallback := func() error {
114+
return conn.Close()
115+
}
116116
localUDPAddr, ok := conn.LocalAddr().(*net.UDPAddr)
117117
if !ok {
118118
return nil, fmt.Errorf("ICMP listener address %s is not net.UDPAddr", conn.LocalAddr())
119119
}
120-
originSender := originSender{conn: conn}
121120
echoID := localUDPAddr.Port
122-
icmpFlow := newICMPEchoFlow(pk.Src, &originSender, responder, echoID, originalEcho.ID, packet.NewEncoder())
121+
icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, conn, responder, echoID, originalEcho.ID, packet.NewEncoder())
123122
return icmpFlow, nil
124123
}
125124
funnelID := flow3Tuple{
@@ -187,22 +186,6 @@ func (ip *icmpProxy) listenResponse(flow *icmpEchoFlow, conn *icmp.PacketConn) e
187186
}
188187
}
189188

190-
// originSender wraps icmp.PacketConn to implement packet.FunnelUniPipe interface
191-
type originSender struct {
192-
conn *icmp.PacketConn
193-
}
194-
195-
func (os *originSender) SendPacket(dst netip.Addr, pk packet.RawPacket) error {
196-
_, err := os.conn.WriteTo(pk.Data, &net.UDPAddr{
197-
IP: dst.AsSlice(),
198-
})
199-
return err
200-
}
201-
202-
func (os *originSender) Close() error {
203-
return os.conn.Close()
204-
}
205-
206189
// Only linux uses flow3Tuple as FunnelID
207190
func (ft flow3Tuple) Type() string {
208191
return "srcIP_dstIP_echoID"

ingress/icmp_posix.go

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,24 +43,54 @@ type flow3Tuple struct {
4343

4444
// icmpEchoFlow implements the packet.Funnel interface.
4545
type icmpEchoFlow struct {
46-
*packet.RawPacketFunnel
46+
*packet.ActivityTracker
47+
closeCallback func() error
48+
src netip.Addr
49+
originConn *icmp.PacketConn
50+
responder *packetResponder
4751
assignedEchoID int
4852
originalEchoID int
4953
// it's up to the user to ensure respEncoder is not used concurrently
5054
respEncoder *packet.Encoder
5155
}
5256

53-
func newICMPEchoFlow(src netip.Addr, sendPipe, returnPipe packet.FunnelUniPipe, assignedEchoID, originalEchoID int, respEncoder *packet.Encoder) *icmpEchoFlow {
57+
func newICMPEchoFlow(src netip.Addr, closeCallback func() error, originConn *icmp.PacketConn, responder *packetResponder, assignedEchoID, originalEchoID int, respEncoder *packet.Encoder) *icmpEchoFlow {
5458
return &icmpEchoFlow{
55-
RawPacketFunnel: packet.NewRawPacketFunnel(src, sendPipe, returnPipe),
59+
ActivityTracker: packet.NewActivityTracker(),
60+
closeCallback: closeCallback,
61+
src: src,
62+
originConn: originConn,
63+
responder: responder,
5664
assignedEchoID: assignedEchoID,
5765
originalEchoID: originalEchoID,
5866
respEncoder: respEncoder,
5967
}
6068
}
6169

70+
func (ief *icmpEchoFlow) Equal(other packet.Funnel) bool {
71+
otherICMPFlow, ok := other.(*icmpEchoFlow)
72+
if !ok {
73+
return false
74+
}
75+
if otherICMPFlow.src != ief.src {
76+
return false
77+
}
78+
if otherICMPFlow.originalEchoID != ief.originalEchoID {
79+
return false
80+
}
81+
if otherICMPFlow.assignedEchoID != ief.assignedEchoID {
82+
return false
83+
}
84+
return true
85+
}
86+
87+
func (ief *icmpEchoFlow) Close() error {
88+
return ief.closeCallback()
89+
}
90+
6291
// sendToDst rewrites the echo ID to the one assigned to this flow
6392
func (ief *icmpEchoFlow) sendToDst(dst netip.Addr, msg *icmp.Message) error {
93+
ief.UpdateLastActive()
6494
originalEcho, err := getICMPEcho(msg)
6595
if err != nil {
6696
return err
@@ -80,17 +110,21 @@ func (ief *icmpEchoFlow) sendToDst(dst netip.Addr, msg *icmp.Message) error {
80110
if err != nil {
81111
return err
82112
}
83-
return ief.SendToDst(dst, packet.RawPacket{Data: serializedPacket})
113+
_, err = ief.originConn.WriteTo(serializedPacket, &net.UDPAddr{
114+
IP: dst.AsSlice(),
115+
})
116+
return err
84117
}
85118

86119
// returnToSrc rewrites the echo ID to the original echo ID from the eyeball
87120
func (ief *icmpEchoFlow) returnToSrc(reply *echoReply) error {
121+
ief.UpdateLastActive()
88122
reply.echo.ID = ief.originalEchoID
89123
reply.msg.Body = reply.echo
90124
pk := packet.ICMP{
91125
IP: &packet.IP{
92126
Src: reply.from,
93-
Dst: ief.Src,
127+
Dst: ief.src,
94128
Protocol: layers.IPProtocol(reply.msg.Type.Protocol()),
95129
TTL: packet.DefaultTTL,
96130
},
@@ -100,7 +134,7 @@ func (ief *icmpEchoFlow) returnToSrc(reply *echoReply) error {
100134
if err != nil {
101135
return err
102136
}
103-
return ief.ReturnToSrc(serializedPacket)
137+
return ief.responder.returnPacket(serializedPacket)
104138
}
105139

106140
type echoReply struct {

ingress/icmp_posix_test.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,24 +52,26 @@ func TestFunnelIdleTimeout(t *testing.T) {
5252
},
5353
},
5454
}
55-
responder := echoFlowResponder{
56-
decoder: packet.NewICMPDecoder(),
57-
respChan: make(chan []byte),
55+
muxer := newMockMuxer(0)
56+
responder := packetResponder{
57+
datagramMuxer: muxer,
5858
}
59-
require.NoError(t, proxy.Request(&pk, &responder))
60-
responder.validate(t, &pk)
59+
require.NoError(t, proxy.Request(ctx, &pk, &responder))
60+
validateEchoFlow(t, muxer, &pk)
6161

6262
// Send second request, should reuse the funnel
63-
require.NoError(t, proxy.Request(&pk, nil))
64-
responder.validate(t, &pk)
63+
require.NoError(t, proxy.Request(ctx, &pk, &packetResponder{
64+
datagramMuxer: nil,
65+
}))
66+
validateEchoFlow(t, muxer, &pk)
6567

6668
time.Sleep(idleTimeout * 2)
67-
newResponder := echoFlowResponder{
68-
decoder: packet.NewICMPDecoder(),
69-
respChan: make(chan []byte),
69+
newMuxer := newMockMuxer(0)
70+
newResponder := packetResponder{
71+
datagramMuxer: newMuxer,
7072
}
71-
require.NoError(t, proxy.Request(&pk, &newResponder))
72-
newResponder.validate(t, &pk)
73+
require.NoError(t, proxy.Request(ctx, &pk, &newResponder))
74+
validateEchoFlow(t, newMuxer, &pk)
7375

7476
cancel()
7577
<-proxyDone

ingress/icmp_windows.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ func (ip *icmpProxy) Serve(ctx context.Context) error {
265265
// Request sends an ICMP echo request and wait for a reply or timeout.
266266
// The async version of Win32 APIs take a callback whose memory is not garbage collected, so we use the synchronous version.
267267
// It's possible that a slow request will block other requests, so we set the timeout to only 1s.
268-
func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error {
268+
func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
269269
if pk == nil {
270270
return errPacketNil
271271
}
@@ -292,7 +292,7 @@ func (ip *icmpProxy) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) er
292292
return nil
293293
}
294294

295-
func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, data []byte, responder packet.FunnelUniPipe) error {
295+
func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, data []byte, responder *packetResponder) error {
296296
var replyType icmp.Type
297297
if request.Dst.Is4() {
298298
replyType = ipv4.ICMPTypeEchoReply
@@ -331,7 +331,7 @@ func (ip *icmpProxy) handleEchoReply(request *packet.ICMP, echoReq *icmp.Echo, d
331331
if err != nil {
332332
return err
333333
}
334-
return responder.SendPacket(request.Src, serializedPacket)
334+
return responder.returnPacket(serializedPacket)
335335
}
336336

337337
func (ip *icmpProxy) icmpEchoRoundtrip(dst netip.Addr, echo *icmp.Echo) ([]byte, error) {

ingress/origin_icmp_proxy.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ func NewICMPRouter(ipv4Addr, ipv6Addr netip.Addr, ipv6Zone string, logger *zerol
3737
ipv4Proxy, ipv4Err := newICMPProxy(ipv4Addr, "", logger, funnelIdleTimeout)
3838
ipv6Proxy, ipv6Err := newICMPProxy(ipv6Addr, ipv6Zone, logger, funnelIdleTimeout)
3939
if ipv4Err != nil && ipv6Err != nil {
40-
return nil, fmt.Errorf("cannot create ICMPv4 proxy: %v nor ICMPv6 proxy: %v", ipv4Err, ipv6Err)
40+
err := fmt.Errorf("cannot create ICMPv4 proxy: %v nor ICMPv6 proxy: %v", ipv4Err, ipv6Err)
41+
logger.Debug().Err(err).Msg("ICMP proxy feature is disabled")
42+
return nil, err
4143
}
4244
if ipv4Err != nil {
4345
logger.Debug().Err(ipv4Err).Msg("failed to create ICMPv4 proxy, only ICMPv6 proxy is created")
@@ -73,15 +75,18 @@ func (ir *icmpRouter) Serve(ctx context.Context) error {
7375
return fmt.Errorf("ICMPv4 proxy and ICMPv6 proxy are both nil")
7476
}
7577

76-
func (ir *icmpRouter) Request(pk *packet.ICMP, responder packet.FunnelUniPipe) error {
78+
func (ir *icmpRouter) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
79+
if pk == nil {
80+
return errPacketNil
81+
}
7782
if pk.Dst.Is4() {
7883
if ir.ipv4Proxy != nil {
79-
return ir.ipv4Proxy.Request(pk, responder)
84+
return ir.ipv4Proxy.Request(ctx, pk, responder)
8085
}
8186
return fmt.Errorf("ICMPv4 proxy was not instantiated")
8287
}
8388
if ir.ipv6Proxy != nil {
84-
return ir.ipv6Proxy.Request(pk, responder)
89+
return ir.ipv6Proxy.Request(ctx, pk, responder)
8590
}
8691
return fmt.Errorf("ICMPv6 proxy was not instantiated")
8792
}

0 commit comments

Comments
 (0)