Skip to content

Commit 225c344

Browse files
committed
TUN-6855: Add DatagramV2Type for IP packet with trace and tracing spans
1 parent 61007dd commit 225c344

File tree

6 files changed

+347
-34
lines changed

6 files changed

+347
-34
lines changed

connection/quic.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ func NewQUICConnection(
9393
sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity)
9494
datagramMuxer := quicpogs.NewDatagramMuxerV2(session, logger, sessionDemuxChan)
9595
sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan)
96-
packetRouter := packet.NewRouter(packetRouterConfig, datagramMuxer, &returnPipe{muxer: datagramMuxer}, logger, orchestrator.WarpRoutingEnabled)
96+
muxer := muxerWrapper{muxer: datagramMuxer}
97+
packetRouter := packet.NewRouter(packetRouterConfig, &muxer, &muxer, logger, orchestrator.WarpRoutingEnabled)
9798

9899
return &QUICConnection{
99100
session: session,
@@ -498,16 +499,28 @@ func (np *nopCloserReadWriter) Close() error {
498499
return nil
499500
}
500501

501-
// returnPipe wraps DatagramMuxerV2 to satisfy the packet.FunnelUniPipe interface
502-
type returnPipe struct {
502+
// muxerWrapper wraps DatagramMuxerV2 to satisfy the packet.FunnelUniPipe interface
503+
type muxerWrapper struct {
503504
muxer *quicpogs.DatagramMuxerV2
504505
}
505506

506-
func (rp *returnPipe) SendPacket(dst netip.Addr, pk packet.RawPacket) error {
507-
return rp.muxer.SendPacket(pk)
507+
func (rp *muxerWrapper) SendPacket(dst netip.Addr, pk packet.RawPacket) error {
508+
return rp.muxer.SendPacket(quicpogs.RawPacket(pk))
508509
}
509510

510-
func (rp *returnPipe) Close() error {
511+
func (rp *muxerWrapper) ReceivePacket(ctx context.Context) (packet.RawPacket, error) {
512+
pk, err := rp.muxer.ReceivePacket(ctx)
513+
if err != nil {
514+
return packet.RawPacket{}, err
515+
}
516+
rawPacket, ok := pk.(quicpogs.RawPacket)
517+
if ok {
518+
return packet.RawPacket(rawPacket), nil
519+
}
520+
return packet.RawPacket{}, fmt.Errorf("unexpected packet type %+v", pk)
521+
}
522+
523+
func (rp *muxerWrapper) Close() error {
511524
return nil
512525
}
513526

quic/datagram.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,12 @@ func extractSessionID(b []byte) (uuid.UUID, []byte, error) {
113113
// SuffixSessionID appends the session ID at the end of the payload. Suffix is more performant than prefix because
114114
// the payload slice might already have enough capacity to append the session ID at the end
115115
func SuffixSessionID(sessionID uuid.UUID, b []byte) ([]byte, error) {
116-
if len(b)+len(sessionID) > MaxDatagramFrameSize {
116+
return suffixMetadata(b, sessionID[:])
117+
}
118+
119+
func suffixMetadata(payload, metadata []byte) ([]byte, error) {
120+
if len(payload)+len(metadata) > MaxDatagramFrameSize {
117121
return nil, fmt.Errorf("datagram size exceed %d", MaxDatagramFrameSize)
118122
}
119-
b = append(b, sessionID[:]...)
120-
return b, nil
123+
return append(payload, metadata...), nil
121124
}

quic/datagram_test.go

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package quic
22

33
import (
4+
"bytes"
45
"context"
56
"crypto/rand"
67
"crypto/rsa"
@@ -23,6 +24,7 @@ import (
2324
"golang.org/x/sync/errgroup"
2425

2526
"github.com/cloudflare/cloudflared/packet"
27+
"github.com/cloudflare/cloudflared/tracing"
2628
)
2729

2830
var (
@@ -121,6 +123,15 @@ func testDatagram(t *testing.T, version uint8, sessionToPayloads []*packet.Sessi
121123

122124
logger := zerolog.Nop()
123125

126+
tracingIdentity, err := tracing.NewIdentity("ec31ad8a01fde11fdcabe2efdce36873:52726f6cabc144f5:0:1")
127+
require.NoError(t, err)
128+
serializedTracingID, err := tracingIdentity.MarshalBinary()
129+
require.NoError(t, err)
130+
tracingSpan := &TracingSpanPacket{
131+
Spans: []byte("tracing"),
132+
TracingIdentity: serializedTracingID,
133+
}
134+
124135
errGroup, ctx := errgroup.WithContext(context.Background())
125136
// Run edge side of datagram muxer
126137
errGroup.Go(func() error {
@@ -140,18 +151,17 @@ func testDatagram(t *testing.T, version uint8, sessionToPayloads []*packet.Sessi
140151
muxer := NewDatagramMuxerV2(quicSession, &logger, sessionDemuxChan)
141152
muxer.ServeReceive(ctx)
142153

143-
icmpDecoder := packet.NewICMPDecoder()
144154
for _, pk := range packets {
145155
received, err := muxer.ReceivePacket(ctx)
146156
require.NoError(t, err)
147-
148-
receivedICMP, err := icmpDecoder.Decode(received)
157+
validateIPPacket(t, received, &pk)
158+
received, err = muxer.ReceivePacket(ctx)
149159
require.NoError(t, err)
150-
require.Equal(t, pk.IP, receivedICMP.IP)
151-
require.Equal(t, pk.Type, receivedICMP.Type)
152-
require.Equal(t, pk.Code, receivedICMP.Code)
153-
require.Equal(t, pk.Body, receivedICMP.Body)
160+
validateIPPacketWithTracing(t, received, &pk, serializedTracingID)
154161
}
162+
received, err := muxer.ReceivePacket(ctx)
163+
require.NoError(t, err)
164+
validateTracingSpans(t, received, tracingSpan)
155165
default:
156166
return fmt.Errorf("unknown datagram version %d", version)
157167
}
@@ -188,10 +198,15 @@ func testDatagram(t *testing.T, version uint8, sessionToPayloads []*packet.Sessi
188198
for _, pk := range packets {
189199
encodedPacket, err := encoder.Encode(&pk)
190200
require.NoError(t, err)
191-
require.NoError(t, muxerV2.SendPacket(encodedPacket))
201+
require.NoError(t, muxerV2.SendPacket(RawPacket(encodedPacket)))
202+
require.NoError(t, muxerV2.SendPacket(&TracedPacket{
203+
Packet: encodedPacket,
204+
TracingIdentity: serializedTracingID,
205+
}))
192206
}
207+
require.NoError(t, muxerV2.SendPacket(tracingSpan))
193208
// Payload larger than transport MTU, should not be sent
194-
require.Error(t, muxerV2.SendPacket(packet.RawPacket{
209+
require.Error(t, muxerV2.SendPacket(RawPacket{
195210
Data: largePayload,
196211
}))
197212
muxer = muxerV2
@@ -217,6 +232,38 @@ func testDatagram(t *testing.T, version uint8, sessionToPayloads []*packet.Sessi
217232
require.NoError(t, errGroup.Wait())
218233
}
219234

235+
func validateIPPacket(t *testing.T, receivedPacket Packet, expectedICMP *packet.ICMP) {
236+
require.Equal(t, DatagramTypeIP, receivedPacket.Type())
237+
rawPacket := receivedPacket.(RawPacket)
238+
decoder := packet.NewICMPDecoder()
239+
receivedICMP, err := decoder.Decode(packet.RawPacket(rawPacket))
240+
require.NoError(t, err)
241+
validateICMP(t, expectedICMP, receivedICMP)
242+
}
243+
244+
func validateIPPacketWithTracing(t *testing.T, receivedPacket Packet, expectedICMP *packet.ICMP, serializedTracingID []byte) {
245+
require.Equal(t, DatagramTypeIPWithTrace, receivedPacket.Type())
246+
tracedPacket := receivedPacket.(*TracedPacket)
247+
decoder := packet.NewICMPDecoder()
248+
receivedICMP, err := decoder.Decode(tracedPacket.Packet)
249+
require.NoError(t, err)
250+
validateICMP(t, expectedICMP, receivedICMP)
251+
require.True(t, bytes.Equal(tracedPacket.TracingIdentity, serializedTracingID))
252+
}
253+
254+
func validateICMP(t *testing.T, expected, actual *packet.ICMP) {
255+
require.Equal(t, expected.IP, actual.IP)
256+
require.Equal(t, expected.Type, actual.Type)
257+
require.Equal(t, expected.Code, actual.Code)
258+
require.Equal(t, expected.Body, actual.Body)
259+
}
260+
261+
func validateTracingSpans(t *testing.T, receivedPacket Packet, expectedSpan *TracingSpanPacket) {
262+
require.Equal(t, DatagramTypeTracingSpan, receivedPacket.Type())
263+
tracingSpans := receivedPacket.(*TracingSpanPacket)
264+
require.Equal(t, tracingSpans, expectedSpan)
265+
}
266+
220267
func newQUICListener(t *testing.T, config *quic.Config) quic.Listener {
221268
// Create a simple tls config.
222269
tlsConfig := generateTLSConfig()

quic/datagramv2.go

Lines changed: 112 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,28 @@ import (
99
"github.com/rs/zerolog"
1010

1111
"github.com/cloudflare/cloudflared/packet"
12+
"github.com/cloudflare/cloudflared/tracing"
1213
)
1314

1415
type DatagramV2Type byte
1516

1617
const (
18+
// UDP payload
1719
DatagramTypeUDP DatagramV2Type = iota
20+
// Full IP packet
1821
DatagramTypeIP
22+
// DatagramTypeIP + tracing ID
23+
DatagramTypeIPWithTrace
24+
// Tracing spans in protobuf format
25+
DatagramTypeTracingSpan
1926
)
2027

28+
type Packet interface {
29+
Type() DatagramV2Type
30+
Payload() []byte
31+
Metadata() []byte
32+
}
33+
2134
const (
2235
typeIDLen = 1
2336
// Same as sessionDemuxChan capacity
@@ -41,7 +54,7 @@ type DatagramMuxerV2 struct {
4154
session quic.Connection
4255
logger *zerolog.Logger
4356
sessionDemuxChan chan<- *packet.Session
44-
packetDemuxChan chan packet.RawPacket
57+
packetDemuxChan chan Packet
4558
}
4659

4760
func NewDatagramMuxerV2(
@@ -54,7 +67,7 @@ func NewDatagramMuxerV2(
5467
session: quicSession,
5568
logger: &logger,
5669
sessionDemuxChan: sessionDemuxChan,
57-
packetDemuxChan: make(chan packet.RawPacket, packetChanCapacity),
70+
packetDemuxChan: make(chan Packet, packetChanCapacity),
5871
}
5972
}
6073

@@ -79,14 +92,19 @@ func (dm *DatagramMuxerV2) SendToSession(session *packet.Session) error {
7992
return nil
8093
}
8194

82-
// SendPacket suffix the datagram type to the packet. The other end of the QUIC connection can demultiplex by parsing
83-
// the payload as IP and look at the source and destination.
84-
func (dm *DatagramMuxerV2) SendPacket(pk packet.RawPacket) error {
85-
payloadWithVersion, err := SuffixType(pk.Data, DatagramTypeIP)
95+
// SendPacket sends a packet with datagram version in the suffix. If ctx is a TracedContext, it adds the tracing
96+
// context between payload and datagram version.
97+
// The other end of the QUIC connection can demultiplex by parsing the payload as IP and look at the source and destination.
98+
func (dm *DatagramMuxerV2) SendPacket(pk Packet) error {
99+
payloadWithMetadata, err := suffixMetadata(pk.Payload(), pk.Metadata())
100+
if err != nil {
101+
return err
102+
}
103+
payloadWithMetadataAndType, err := SuffixType(payloadWithMetadata, pk.Type())
86104
if err != nil {
87105
return errors.Wrap(err, "Failed to suffix datagram type, it will be dropped")
88106
}
89-
if err := dm.session.SendMessage(payloadWithVersion); err != nil {
107+
if err := dm.session.SendMessage(payloadWithMetadataAndType); err != nil {
90108
return errors.Wrap(err, "Failed to send datagram back to edge")
91109
}
92110
return nil
@@ -108,10 +126,10 @@ func (dm *DatagramMuxerV2) ServeReceive(ctx context.Context) error {
108126
}
109127
}
110128

111-
func (dm *DatagramMuxerV2) ReceivePacket(ctx context.Context) (packet.RawPacket, error) {
129+
func (dm *DatagramMuxerV2) ReceivePacket(ctx context.Context) (pk Packet, err error) {
112130
select {
113131
case <-ctx.Done():
114-
return packet.RawPacket{}, ctx.Err()
132+
return nil, ctx.Err()
115133
case pk := <-dm.packetDemuxChan:
116134
return pk, nil
117135
}
@@ -126,10 +144,8 @@ func (dm *DatagramMuxerV2) demux(ctx context.Context, msgWithType []byte) error
126144
switch msgType {
127145
case DatagramTypeUDP:
128146
return dm.handleSession(ctx, msg)
129-
case DatagramTypeIP:
130-
return dm.handlePacket(ctx, msg)
131147
default:
132-
return fmt.Errorf("Unexpected datagram type %d", msgType)
148+
return dm.handlePacket(ctx, msg, msgType)
133149
}
134150
}
135151

@@ -150,13 +166,93 @@ func (dm *DatagramMuxerV2) handleSession(ctx context.Context, session []byte) er
150166
}
151167
}
152168

153-
func (dm *DatagramMuxerV2) handlePacket(ctx context.Context, pk []byte) error {
169+
func (dm *DatagramMuxerV2) handlePacket(ctx context.Context, pk []byte, msgType DatagramV2Type) error {
170+
var demuxedPacket Packet
171+
switch msgType {
172+
case DatagramTypeIP:
173+
demuxedPacket = RawPacket(packet.RawPacket{Data: pk})
174+
case DatagramTypeIPWithTrace:
175+
tracingIdentity, payload, err := extractTracingIdentity(pk)
176+
if err != nil {
177+
return err
178+
}
179+
demuxedPacket = &TracedPacket{
180+
Packet: packet.RawPacket{Data: payload},
181+
TracingIdentity: tracingIdentity,
182+
}
183+
case DatagramTypeTracingSpan:
184+
tracingIdentity, spans, err := extractTracingIdentity(pk)
185+
if err != nil {
186+
return err
187+
}
188+
demuxedPacket = &TracingSpanPacket{
189+
Spans: spans,
190+
TracingIdentity: tracingIdentity,
191+
}
192+
default:
193+
return fmt.Errorf("Unexpected datagram type %d", msgType)
194+
}
154195
select {
155196
case <-ctx.Done():
156197
return ctx.Err()
157-
case dm.packetDemuxChan <- packet.RawPacket{
158-
Data: pk,
159-
}:
198+
case dm.packetDemuxChan <- demuxedPacket:
160199
return nil
161200
}
162201
}
202+
203+
func extractTracingIdentity(pk []byte) (tracingIdentity []byte, payload []byte, err error) {
204+
if len(pk) < tracing.IdentityLength {
205+
return nil, nil, fmt.Errorf("packet with tracing context should have at least %d bytes, got %v", tracing.IdentityLength, pk)
206+
}
207+
tracingIdentity = pk[len(pk)-tracing.IdentityLength:]
208+
payload = pk[:len(pk)-tracing.IdentityLength]
209+
return tracingIdentity, payload, nil
210+
}
211+
212+
type RawPacket packet.RawPacket
213+
214+
func (rw RawPacket) Type() DatagramV2Type {
215+
return DatagramTypeIP
216+
}
217+
218+
func (rw RawPacket) Payload() []byte {
219+
return rw.Data
220+
}
221+
222+
func (rw RawPacket) Metadata() []byte {
223+
return []byte{}
224+
}
225+
226+
type TracedPacket struct {
227+
Packet packet.RawPacket
228+
TracingIdentity []byte
229+
}
230+
231+
func (tp *TracedPacket) Type() DatagramV2Type {
232+
return DatagramTypeIPWithTrace
233+
}
234+
235+
func (tp *TracedPacket) Payload() []byte {
236+
return tp.Packet.Data
237+
}
238+
239+
func (tp *TracedPacket) Metadata() []byte {
240+
return tp.TracingIdentity
241+
}
242+
243+
type TracingSpanPacket struct {
244+
Spans []byte
245+
TracingIdentity []byte
246+
}
247+
248+
func (tsp *TracingSpanPacket) Type() DatagramV2Type {
249+
return DatagramTypeTracingSpan
250+
}
251+
252+
func (tsp *TracingSpanPacket) Payload() []byte {
253+
return tsp.Spans
254+
}
255+
256+
func (tsp *TracingSpanPacket) Metadata() []byte {
257+
return tsp.TracingIdentity
258+
}

0 commit comments

Comments
 (0)