Skip to content

Commit b6bd8c1

Browse files
committed
TUN-6604: Trace icmp echo request on Linux and Darwin
1 parent 495f9fb commit b6bd8c1

File tree

8 files changed

+187
-6
lines changed

8 files changed

+187
-6
lines changed

ingress/icmp_darwin.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ import (
1717
"time"
1818

1919
"github.com/rs/zerolog"
20+
"go.opentelemetry.io/otel/attribute"
2021
"golang.org/x/net/icmp"
2122

2223
"github.com/cloudflare/cloudflared/packet"
24+
"github.com/cloudflare/cloudflared/tracing"
2325
)
2426

2527
type icmpProxy struct {
@@ -129,10 +131,18 @@ func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idle
129131
}
130132

131133
func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
134+
ctx, span := responder.requestSpan(ctx, pk)
135+
defer responder.exportSpan()
136+
132137
originalEcho, err := getICMPEcho(pk.Message)
133138
if err != nil {
139+
tracing.EndWithErrorStatus(span, err)
134140
return err
135141
}
142+
span.SetAttributes(
143+
attribute.Int("originalEchoID", originalEcho.ID),
144+
attribute.Int("seq", originalEcho.Seq),
145+
)
136146
echoIDTrackerKey := flow3Tuple{
137147
srcIP: pk.Src,
138148
dstIP: pk.Dst,
@@ -141,8 +151,12 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
141151
// TODO: TUN-6744 assign unique flow per (src, echo ID)
142152
assignedEchoID, success := ip.echoIDTracker.getOrAssign(echoIDTrackerKey)
143153
if !success {
144-
return fmt.Errorf("failed to assign unique echo ID")
154+
err := fmt.Errorf("failed to assign unique echo ID")
155+
tracing.EndWithErrorStatus(span, err)
156+
return err
145157
}
158+
span.SetAttributes(attribute.Int("assignedEchoID", int(assignedEchoID)))
159+
146160
newFunnelFunc := func() (packet.Funnel, error) {
147161
originalEcho, err := getICMPEcho(pk.Message)
148162
if err != nil {
@@ -158,9 +172,11 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
158172
funnelID := echoFunnelID(assignedEchoID)
159173
funnel, isNew, err := ip.srcFunnelTracker.GetOrRegister(funnelID, newFunnelFunc)
160174
if err != nil {
175+
tracing.EndWithErrorStatus(span, err)
161176
return err
162177
}
163178
if isNew {
179+
span.SetAttributes(attribute.Bool("newFlow", true))
164180
ip.logger.Debug().
165181
Str("src", pk.Src.String()).
166182
Str("dst", pk.Dst.String()).
@@ -170,9 +186,16 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
170186
}
171187
icmpFlow, err := toICMPEchoFlow(funnel)
172188
if err != nil {
189+
tracing.EndWithErrorStatus(span, err)
173190
return err
174191
}
175-
return icmpFlow.sendToDst(pk.Dst, pk.Message)
192+
err = icmpFlow.sendToDst(pk.Dst, pk.Message)
193+
if err != nil {
194+
tracing.EndWithErrorStatus(span, err)
195+
return err
196+
}
197+
tracing.End(span)
198+
return nil
176199
}
177200

178201
// Serve listens for responses to the requests until context is done

ingress/icmp_linux.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ import (
1919

2020
"github.com/pkg/errors"
2121
"github.com/rs/zerolog"
22+
"go.opentelemetry.io/otel/attribute"
2223
"golang.org/x/net/icmp"
2324

2425
"github.com/cloudflare/cloudflared/packet"
26+
"github.com/cloudflare/cloudflared/tracing"
2527
)
2628

2729
const (
@@ -98,14 +100,24 @@ func checkInPingGroup() error {
98100
}
99101

100102
func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error {
103+
ctx, span := responder.requestSpan(ctx, pk)
104+
defer responder.exportSpan()
105+
101106
originalEcho, err := getICMPEcho(pk.Message)
102107
if err != nil {
108+
tracing.EndWithErrorStatus(span, err)
103109
return err
104110
}
111+
span.SetAttributes(
112+
attribute.Int("originalEchoID", originalEcho.ID),
113+
attribute.Int("seq", originalEcho.Seq),
114+
)
115+
105116
newConnChan := make(chan *icmp.PacketConn, 1)
106117
newFunnelFunc := func() (packet.Funnel, error) {
107118
conn, err := newICMPConn(ip.listenIP, ip.ipv6Zone)
108119
if err != nil {
120+
tracing.EndWithErrorStatus(span, err)
109121
return nil, errors.Wrap(err, "failed to open ICMP socket")
110122
}
111123
ip.logger.Debug().Msgf("Opened ICMP socket listen on %s", conn.LocalAddr())
@@ -117,6 +129,8 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
117129
if !ok {
118130
return nil, fmt.Errorf("ICMP listener address %s is not net.UDPAddr", conn.LocalAddr())
119131
}
132+
span.SetAttributes(attribute.Int("port", localUDPAddr.Port))
133+
120134
echoID := localUDPAddr.Port
121135
icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, conn, responder, echoID, originalEcho.ID, packet.NewEncoder())
122136
return icmpFlow, nil
@@ -128,13 +142,16 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
128142
}
129143
funnel, isNew, err := ip.srcFunnelTracker.GetOrRegister(funnelID, newFunnelFunc)
130144
if err != nil {
145+
tracing.EndWithErrorStatus(span, err)
131146
return err
132147
}
133148
icmpFlow, err := toICMPEchoFlow(funnel)
134149
if err != nil {
150+
tracing.EndWithErrorStatus(span, err)
135151
return err
136152
}
137153
if isNew {
154+
span.SetAttributes(attribute.Bool("newFlow", true))
138155
ip.logger.Debug().
139156
Str("src", pk.Src.String()).
140157
Str("dst", pk.Dst.String()).
@@ -153,8 +170,10 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
153170
}()
154171
}
155172
if err := icmpFlow.sendToDst(pk.Dst, pk.Message); err != nil {
173+
tracing.EndWithErrorStatus(span, err)
156174
return errors.Wrap(err, "failed to send ICMP echo request")
157175
}
176+
tracing.End(span)
158177
return nil
159178
}
160179

ingress/origin_icmp_proxy_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package ingress
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
67
"net"
78
"net/netip"
9+
"os"
10+
"runtime"
811
"strings"
912
"sync"
1013
"testing"
@@ -17,6 +20,8 @@ import (
1720
"golang.org/x/net/ipv6"
1821

1922
"github.com/cloudflare/cloudflared/packet"
23+
quicpogs "github.com/cloudflare/cloudflared/quic"
24+
"github.com/cloudflare/cloudflared/tracing"
2025
)
2126

2227
var (
@@ -98,6 +103,75 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) {
98103
<-proxyDone
99104
}
100105

106+
func TestTraceICMPRouterEcho(t *testing.T) {
107+
if runtime.GOOS == "windows" {
108+
t.Skip("TODO: TUN-6861 Trace ICMP in Windows")
109+
}
110+
tracingCtx := "ec31ad8a01fde11fdcabe2efdce36873:52726f6cabc144f5:0:1"
111+
112+
logger := zerolog.New(os.Stderr)
113+
router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &logger)
114+
require.NoError(t, err)
115+
116+
proxyDone := make(chan struct{})
117+
ctx, cancel := context.WithCancel(context.Background())
118+
go func() {
119+
router.Serve(ctx)
120+
close(proxyDone)
121+
}()
122+
123+
// Buffer 2 packets, reply and request span
124+
muxer := newMockMuxer(2)
125+
tracingIdentity, err := tracing.NewIdentity(tracingCtx)
126+
require.NoError(t, err)
127+
serializedIdentity, err := tracingIdentity.MarshalBinary()
128+
require.NoError(t, err)
129+
130+
responder := packetResponder{
131+
datagramMuxer: muxer,
132+
tracedCtx: tracing.NewTracedContext(ctx, tracingIdentity.String(), &logger),
133+
serializedIdentity: serializedIdentity,
134+
}
135+
136+
echo := &icmp.Echo{
137+
ID: 12910,
138+
Seq: 182,
139+
Data: []byte(t.Name()),
140+
}
141+
pk := packet.ICMP{
142+
IP: &packet.IP{
143+
Src: localhostIP,
144+
Dst: localhostIP,
145+
Protocol: layers.IPProtocolICMPv4,
146+
TTL: packet.DefaultTTL,
147+
},
148+
Message: &icmp.Message{
149+
Type: ipv4.ICMPTypeEcho,
150+
Code: 0,
151+
Body: echo,
152+
},
153+
}
154+
require.NoError(t, router.Request(ctx, &pk, &responder))
155+
validateEchoFlow(t, muxer, &pk)
156+
receivedPacket := <-muxer.cfdToEdge
157+
tracingSpanPacket, ok := receivedPacket.(*quicpogs.TracingSpanPacket)
158+
require.True(t, ok)
159+
require.NotEmpty(t, tracingSpanPacket.Spans)
160+
require.True(t, bytes.Equal(serializedIdentity, tracingSpanPacket.TracingIdentity))
161+
162+
echo.Seq++
163+
pk.Body = echo
164+
// Only first request for a flow is traced. The edge will not send tracing context for the second request
165+
responder = packetResponder{
166+
datagramMuxer: muxer,
167+
}
168+
require.NoError(t, router.Request(ctx, &pk, &responder))
169+
validateEchoFlow(t, muxer, &pk)
170+
171+
cancel()
172+
<-proxyDone
173+
}
174+
101175
// TestConcurrentRequests makes sure icmpRouter can send concurrent requests to the same destination with different
102176
// echo ID. This simulates concurrent ping to the same destination.
103177
func TestConcurrentRequestsToSameDst(t *testing.T) {

ingress/packet_router.go

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ import (
66
"net/netip"
77

88
"github.com/rs/zerolog"
9+
"go.opentelemetry.io/otel/attribute"
10+
"go.opentelemetry.io/otel/trace"
911

1012
"github.com/cloudflare/cloudflared/packet"
1113
quicpogs "github.com/cloudflare/cloudflared/quic"
14+
"github.com/cloudflare/cloudflared/tracing"
1215
)
1316

1417
// Upstream of raw packets
@@ -70,7 +73,14 @@ func (r *PacketRouter) nextPacket(ctx context.Context) (packet.RawPacket, *packe
7073
case quicpogs.DatagramTypeIP:
7174
return packet.RawPacket{Data: pk.Payload()}, responder, nil
7275
case quicpogs.DatagramTypeIPWithTrace:
73-
return packet.RawPacket{}, responder, fmt.Errorf("TODO: TUN-6604 Handle IP packet with trace")
76+
var identity tracing.Identity
77+
if err := identity.UnmarshalBinary(pk.Metadata()); err != nil {
78+
r.logger.Err(err).Bytes("tracingIdentity", pk.Metadata()).Msg("Failed to unmarshal tracing identity")
79+
} else {
80+
responder.tracedCtx = tracing.NewTracedContext(ctx, identity.String(), r.logger)
81+
responder.serializedIdentity = pk.Metadata()
82+
}
83+
return packet.RawPacket{Data: pk.Payload()}, responder, nil
7484
default:
7585
return packet.RawPacket{}, nil, fmt.Errorf("unexpected datagram type %d", pk.Type())
7686
}
@@ -126,9 +136,39 @@ func (r *PacketRouter) sendTTLExceedMsg(ctx context.Context, pk *packet.ICMP, ra
126136
}
127137

128138
type packetResponder struct {
129-
datagramMuxer muxer
139+
datagramMuxer muxer
140+
tracedCtx *tracing.TracedContext
141+
serializedIdentity []byte
142+
}
143+
144+
func (pr *packetResponder) tracingEnabled() bool {
145+
return pr.tracedCtx != nil
130146
}
131147

132148
func (pr *packetResponder) returnPacket(rawPacket packet.RawPacket) error {
133149
return pr.datagramMuxer.SendPacket(quicpogs.RawPacket(rawPacket))
134150
}
151+
152+
func (pr *packetResponder) requestSpan(ctx context.Context, pk *packet.ICMP) (context.Context, trace.Span) {
153+
if !pr.tracingEnabled() {
154+
return ctx, tracing.NewNoopSpan()
155+
}
156+
return pr.tracedCtx.Tracer().Start(pr.tracedCtx, "icmp-echo-request", trace.WithAttributes(
157+
attribute.String("src", pk.Src.String()),
158+
attribute.String("dst", pk.Dst.String()),
159+
))
160+
}
161+
162+
func (pr *packetResponder) exportSpan() {
163+
if !pr.tracingEnabled() {
164+
return
165+
}
166+
spans := pr.tracedCtx.GetProtoSpans()
167+
pr.tracedCtx.ClearSpans()
168+
if len(spans) > 0 {
169+
pr.datagramMuxer.SendPacket(&quicpogs.TracingSpanPacket{
170+
Spans: spans,
171+
TracingIdentity: pr.serializedIdentity,
172+
})
173+
}
174+
}

ingress/packet_router_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,11 @@ func (mm *mockMuxer) SendPacket(pk quicpogs.Packet) error {
200200
},
201201
TracingIdentity: copiedMetadata,
202202
}
203+
case quicpogs.DatagramTypeTracingSpan:
204+
copiedPacket = &quicpogs.TracingSpanPacket{
205+
Spans: copiedPayload,
206+
TracingIdentity: copiedMetadata,
207+
}
203208
default:
204209
return fmt.Errorf("unexpected metadata type %d", pk.Type())
205210
}

packet/funnel.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ var (
1616

1717
// Funnel is an abstraction to pipe from 1 src to 1 or more destinations
1818
type Funnel interface {
19-
// LastActive returns the last time SendToDst or ReturnToSrc is called
19+
// Updates the last time traffic went through this funnel
20+
UpdateLastActive()
21+
// LastActive returns the last time there is traffic through this funnel
2022
LastActive() time.Time
2123
// Close closes the funnel. Further call to SendToDst or ReturnToSrc should return an error
2224
Close() error

tracing/client.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ type InMemoryClient interface {
2727
// ProtoSpans returns a copy of the list of in-memory stored spans as otlp
2828
// protobuf byte array.
2929
ProtoSpans() ([]byte, error)
30+
// Clear spans removes all in-memory spans
31+
ClearSpans()
3032
}
3133

3234
// InMemoryOtlpClient is a client implementation for otlptrace.Client
@@ -78,6 +80,12 @@ func (mc *InMemoryOtlpClient) ProtoSpans() ([]byte, error) {
7880
return proto.Marshal(pbRequest)
7981
}
8082

83+
func (mc *InMemoryOtlpClient) ClearSpans() {
84+
mc.mu.Lock()
85+
defer mc.mu.Unlock()
86+
mc.spans = make([]*tracepb.ResourceSpans, 0)
87+
}
88+
8189
// NoopOtlpClient is a client implementation for otlptrace.Client that does nothing
8290
type NoopOtlpClient struct{}
8391

@@ -102,3 +110,5 @@ func (mc *NoopOtlpClient) Spans() (string, error) {
102110
func (mc *NoopOtlpClient) ProtoSpans() ([]byte, error) {
103111
return nil, errNoopTracer
104112
}
113+
114+
func (mc *NoopOtlpClient) ClearSpans() {}

tracing/tracing.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,10 @@ func (cft *cfdTracer) AddSpans(headers http.Header) {
189189
headers[CanonicalCloudflaredTracingHeader] = []string{enc}
190190
}
191191

192+
func (cft *cfdTracer) ClearSpans() {
193+
cft.exporter.ClearSpans()
194+
}
195+
192196
// End will set the OK status for the span and then end it.
193197
func End(span trace.Span) {
194198
endSpan(span, -1, codes.Ok, nil)
@@ -246,7 +250,6 @@ func extractTraceFromString(ctx context.Context, trace string) (context.Context,
246250
parts[0] = strings.Repeat("0", left) + parts[0]
247251
trace = strings.Join(parts, separator)
248252
}
249-
250253
// Override the 'cf-trace-id' as 'uber-trace-id' so the jaeger propagator can extract it.
251254
traceHeader := map[string]string{TracerContextNameOverride: trace}
252255
remoteCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(traceHeader))
@@ -277,6 +280,11 @@ func extractTrace(req *http.Request) (context.Context, bool) {
277280
if traceHeader[TracerContextNameOverride] == "" {
278281
return nil, false
279282
}
283+
280284
remoteCtx := otel.GetTextMapPropagator().Extract(req.Context(), propagation.MapCarrier(traceHeader))
281285
return remoteCtx, true
282286
}
287+
288+
func NewNoopSpan() trace.Span {
289+
return trace.SpanFromContext(nil)
290+
}

0 commit comments

Comments
 (0)