Skip to content

Commit 2d5234e

Browse files
committed
TUN-6858: Trace ICMP reply
1 parent b6bd8c1 commit 2d5234e

File tree

4 files changed

+100
-39
lines changed

4 files changed

+100
-39
lines changed

ingress/icmp_darwin.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func (ip *icmpProxy) Serve(ctx context.Context) error {
219219
ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply, continue to parse as full packet")
220220
// In unit test, we found out when the listener listens on 0.0.0.0, the socket reads the full packet after
221221
// the second reply
222-
if err := ip.handleFullPacket(icmpDecoder, buf[:n]); err != nil {
222+
if err := ip.handleFullPacket(ctx, icmpDecoder, buf[:n]); err != nil {
223223
ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply as full packet")
224224
}
225225
continue
@@ -228,14 +228,14 @@ func (ip *icmpProxy) Serve(ctx context.Context) error {
228228
ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type)
229229
continue
230230
}
231-
if err := ip.sendReply(reply); err != nil {
231+
if err := ip.sendReply(ctx, reply); err != nil {
232232
ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply")
233233
continue
234234
}
235235
}
236236
}
237237

238-
func (ip *icmpProxy) handleFullPacket(decoder *packet.ICMPDecoder, rawPacket []byte) error {
238+
func (ip *icmpProxy) handleFullPacket(ctx context.Context, decoder *packet.ICMPDecoder, rawPacket []byte) error {
239239
icmpPacket, err := decoder.Decode(packet.RawPacket{Data: rawPacket})
240240
if err != nil {
241241
return err
@@ -249,13 +249,13 @@ func (ip *icmpProxy) handleFullPacket(decoder *packet.ICMPDecoder, rawPacket []b
249249
msg: icmpPacket.Message,
250250
echo: echo,
251251
}
252-
if ip.sendReply(&reply); err != nil {
252+
if ip.sendReply(ctx, &reply); err != nil {
253253
return err
254254
}
255255
return nil
256256
}
257257

258-
func (ip *icmpProxy) sendReply(reply *echoReply) error {
258+
func (ip *icmpProxy) sendReply(ctx context.Context, reply *echoReply) error {
259259
funnelID := echoFunnelID(reply.echo.ID)
260260
funnel, ok := ip.srcFunnelTracker.Get(funnelID)
261261
if !ok {
@@ -265,5 +265,19 @@ func (ip *icmpProxy) sendReply(reply *echoReply) error {
265265
if err != nil {
266266
return err
267267
}
268-
return icmpFlow.returnToSrc(reply)
268+
269+
_, span := icmpFlow.responder.replySpan(ctx, ip.logger)
270+
defer icmpFlow.responder.exportSpan()
271+
272+
span.SetAttributes(
273+
attribute.String("dst", reply.from.String()),
274+
attribute.Int("echoID", reply.echo.ID),
275+
attribute.Int("seq", reply.echo.Seq),
276+
attribute.Int("originalEchoID", icmpFlow.originalEchoID),
277+
)
278+
if err := icmpFlow.returnToSrc(reply); err != nil {
279+
tracing.EndWithErrorStatus(span, err)
280+
}
281+
tracing.End(span)
282+
return nil
269283
}

ingress/icmp_linux.go

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"github.com/pkg/errors"
2121
"github.com/rs/zerolog"
2222
"go.opentelemetry.io/otel/attribute"
23-
"golang.org/x/net/icmp"
2423

2524
"github.com/cloudflare/cloudflared/packet"
2625
"github.com/cloudflare/cloudflared/tracing"
@@ -113,15 +112,13 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
113112
attribute.Int("seq", originalEcho.Seq),
114113
)
115114

116-
newConnChan := make(chan *icmp.PacketConn, 1)
117115
newFunnelFunc := func() (packet.Funnel, error) {
118116
conn, err := newICMPConn(ip.listenIP, ip.ipv6Zone)
119117
if err != nil {
120118
tracing.EndWithErrorStatus(span, err)
121119
return nil, errors.Wrap(err, "failed to open ICMP socket")
122120
}
123121
ip.logger.Debug().Msgf("Opened ICMP socket listen on %s", conn.LocalAddr())
124-
newConnChan <- conn
125122
closeCallback := func() error {
126123
return conn.Close()
127124
}
@@ -157,10 +154,9 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa
157154
Str("dst", pk.Dst.String()).
158155
Int("originalEchoID", originalEcho.ID).
159156
Msg("New flow")
160-
conn := <-newConnChan
161157
go func() {
162158
defer ip.srcFunnelTracker.Unregister(funnelID, icmpFlow)
163-
if err := ip.listenResponse(icmpFlow, conn); err != nil {
159+
if err := ip.listenResponse(ctx, icmpFlow); err != nil {
164160
ip.logger.Debug().Err(err).
165161
Str("src", pk.Src.String()).
166162
Str("dst", pk.Dst.String()).
@@ -182,29 +178,55 @@ func (ip *icmpProxy) Serve(ctx context.Context) error {
182178
return ctx.Err()
183179
}
184180

185-
func (ip *icmpProxy) listenResponse(flow *icmpEchoFlow, conn *icmp.PacketConn) error {
181+
func (ip *icmpProxy) listenResponse(ctx context.Context, flow *icmpEchoFlow) error {
186182
buf := make([]byte, mtu)
187183
for {
188-
n, from, err := conn.ReadFrom(buf)
189-
if err != nil {
184+
retryable, err := ip.handleResponse(ctx, flow, buf)
185+
if err != nil && !retryable {
190186
return err
191187
}
192-
reply, err := parseReply(from, buf[:n])
193-
if err != nil {
194-
ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply")
195-
continue
196-
}
197-
if !isEchoReply(reply.msg) {
198-
ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type)
199-
continue
200-
}
201-
if err := flow.returnToSrc(reply); err != nil {
202-
ip.logger.Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply")
203-
continue
204-
}
205188
}
206189
}
207190

191+
func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf []byte) (retryableErr bool, err error) {
192+
_, span := flow.responder.replySpan(ctx, ip.logger)
193+
defer flow.responder.exportSpan()
194+
195+
span.SetAttributes(
196+
attribute.Int("originalEchoID", flow.originalEchoID),
197+
)
198+
199+
n, from, err := flow.originConn.ReadFrom(buf)
200+
if err != nil {
201+
tracing.EndWithErrorStatus(span, err)
202+
return false, err
203+
}
204+
reply, err := parseReply(from, buf[:n])
205+
if err != nil {
206+
ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply")
207+
tracing.EndWithErrorStatus(span, err)
208+
return true, err
209+
}
210+
if !isEchoReply(reply.msg) {
211+
err := fmt.Errorf("Expect ICMP echo reply, got %s", reply.msg.Type)
212+
ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type)
213+
tracing.EndWithErrorStatus(span, err)
214+
return true, err
215+
}
216+
span.SetAttributes(
217+
attribute.String("dst", reply.from.String()),
218+
attribute.Int("echoID", reply.echo.ID),
219+
attribute.Int("seq", reply.echo.Seq),
220+
)
221+
if err := flow.returnToSrc(reply); err != nil {
222+
ip.logger.Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply")
223+
tracing.EndWithErrorStatus(span, err)
224+
return true, err
225+
}
226+
tracing.End(span)
227+
return true, nil
228+
}
229+
208230
// Only linux uses flow3Tuple as FunnelID
209231
func (ft flow3Tuple) Type() string {
210232
return "srcIP_dstIP_echoID"

ingress/origin_icmp_proxy_test.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"fmt"
77
"net"
88
"net/netip"
9-
"os"
109
"runtime"
1110
"strings"
1211
"sync"
@@ -109,8 +108,7 @@ func TestTraceICMPRouterEcho(t *testing.T) {
109108
}
110109
tracingCtx := "ec31ad8a01fde11fdcabe2efdce36873:52726f6cabc144f5:0:1"
111110

112-
logger := zerolog.New(os.Stderr)
113-
router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &logger)
111+
router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger)
114112
require.NoError(t, err)
115113

116114
proxyDone := make(chan struct{})
@@ -120,16 +118,16 @@ func TestTraceICMPRouterEcho(t *testing.T) {
120118
close(proxyDone)
121119
}()
122120

123-
// Buffer 2 packets, reply and request span
124-
muxer := newMockMuxer(2)
121+
// Buffer 3 packets, request span, reply span and reply
122+
muxer := newMockMuxer(3)
125123
tracingIdentity, err := tracing.NewIdentity(tracingCtx)
126124
require.NoError(t, err)
127125
serializedIdentity, err := tracingIdentity.MarshalBinary()
128126
require.NoError(t, err)
129127

130128
responder := packetResponder{
131129
datagramMuxer: muxer,
132-
tracedCtx: tracing.NewTracedContext(ctx, tracingIdentity.String(), &logger),
130+
tracedCtx: tracing.NewTracedContext(ctx, tracingIdentity.String(), &noopLogger),
133131
serializedIdentity: serializedIdentity,
134132
}
135133

@@ -153,21 +151,36 @@ func TestTraceICMPRouterEcho(t *testing.T) {
153151
}
154152
require.NoError(t, router.Request(ctx, &pk, &responder))
155153
validateEchoFlow(t, muxer, &pk)
154+
155+
// Request span
156156
receivedPacket := <-muxer.cfdToEdge
157-
tracingSpanPacket, ok := receivedPacket.(*quicpogs.TracingSpanPacket)
157+
requestSpan, ok := receivedPacket.(*quicpogs.TracingSpanPacket)
158+
require.True(t, ok)
159+
require.NotEmpty(t, requestSpan.Spans)
160+
require.True(t, bytes.Equal(serializedIdentity, requestSpan.TracingIdentity))
161+
// Reply span
162+
receivedPacket = <-muxer.cfdToEdge
163+
replySpan, ok := receivedPacket.(*quicpogs.TracingSpanPacket)
158164
require.True(t, ok)
159-
require.NotEmpty(t, tracingSpanPacket.Spans)
160-
require.True(t, bytes.Equal(serializedIdentity, tracingSpanPacket.TracingIdentity))
165+
require.NotEmpty(t, replySpan.Spans)
166+
require.True(t, bytes.Equal(serializedIdentity, replySpan.TracingIdentity))
167+
require.False(t, bytes.Equal(requestSpan.Spans, replySpan.Spans))
161168

162169
echo.Seq++
163170
pk.Body = echo
164171
// Only first request for a flow is traced. The edge will not send tracing context for the second request
165-
responder = packetResponder{
172+
newResponder := packetResponder{
166173
datagramMuxer: muxer,
167174
}
168-
require.NoError(t, router.Request(ctx, &pk, &responder))
175+
require.NoError(t, router.Request(ctx, &pk, &newResponder))
169176
validateEchoFlow(t, muxer, &pk)
170177

178+
select {
179+
case receivedPacket = <-muxer.cfdToEdge:
180+
panic(fmt.Sprintf("Receive unexpected packet %+v", receivedPacket))
181+
default:
182+
}
183+
171184
cancel()
172185
<-proxyDone
173186
}

ingress/packet_router.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,17 +135,21 @@ func (r *PacketRouter) sendTTLExceedMsg(ctx context.Context, pk *packet.ICMP, ra
135135
return r.muxer.SendPacket(quicpogs.RawPacket(encodedTTLExceed))
136136
}
137137

138+
// packetResponder should not be used concurrently. This assumption is upheld because reply packets are ready one-by-one
138139
type packetResponder struct {
139140
datagramMuxer muxer
140141
tracedCtx *tracing.TracedContext
141142
serializedIdentity []byte
143+
// hadReply tracks if there has been any reply for this flow
144+
hadReply bool
142145
}
143146

144147
func (pr *packetResponder) tracingEnabled() bool {
145148
return pr.tracedCtx != nil
146149
}
147150

148151
func (pr *packetResponder) returnPacket(rawPacket packet.RawPacket) error {
152+
pr.hadReply = true
149153
return pr.datagramMuxer.SendPacket(quicpogs.RawPacket(rawPacket))
150154
}
151155

@@ -159,13 +163,21 @@ func (pr *packetResponder) requestSpan(ctx context.Context, pk *packet.ICMP) (co
159163
))
160164
}
161165

166+
func (pr *packetResponder) replySpan(ctx context.Context, logger *zerolog.Logger) (context.Context, trace.Span) {
167+
if !pr.tracingEnabled() || pr.hadReply {
168+
return ctx, tracing.NewNoopSpan()
169+
}
170+
return pr.tracedCtx.Tracer().Start(pr.tracedCtx, "icmp-echo-reply")
171+
}
172+
162173
func (pr *packetResponder) exportSpan() {
163174
if !pr.tracingEnabled() {
164175
return
165176
}
166177
spans := pr.tracedCtx.GetProtoSpans()
167-
pr.tracedCtx.ClearSpans()
168178
if len(spans) > 0 {
179+
// Make sure spans are cleared after they are sent
180+
defer pr.tracedCtx.ClearSpans()
169181
pr.datagramMuxer.SendPacket(&quicpogs.TracingSpanPacket{
170182
Spans: spans,
171183
TracingIdentity: pr.serializedIdentity,

0 commit comments

Comments
 (0)