Skip to content

Commit f5f3e6a

Browse files
committed
TUN-6689: Utilize new RegisterUDPSession to begin tracing
1 parent 30c529e commit f5f3e6a

File tree

6 files changed

+88
-26
lines changed

6 files changed

+88
-26
lines changed

connection/quic.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"github.com/lucas-clemente/quic-go"
1818
"github.com/pkg/errors"
1919
"github.com/rs/zerolog"
20+
"go.opentelemetry.io/otel/attribute"
21+
"go.opentelemetry.io/otel/trace"
2022
"golang.org/x/sync/errgroup"
2123

2224
"github.com/cloudflare/cloudflared/datagramsession"
@@ -259,24 +261,42 @@ func (q *QUICConnection) handleRPCStream(rpcStream *quicpogs.RPCServerStream) er
259261
}
260262

261263
// RegisterUdpSession is the RPC method invoked by edge to register and run a session
262-
func (q *QUICConnection) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration, traceContext string) error {
264+
func (q *QUICConnection) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration, traceContext string) (*tunnelpogs.RegisterUdpSessionResponse, error) {
265+
traceCtx := tracing.NewTracedContext(ctx, traceContext, q.logger)
266+
ctx, registerSpan := traceCtx.Tracer().Start(traceCtx, "register-session", trace.WithAttributes(
267+
attribute.String("session-id", sessionID.String()),
268+
attribute.String("dst", fmt.Sprintf("%s:%d", dstIP, dstPort)),
269+
))
263270
// Each session is a series of datagram from an eyeball to a dstIP:dstPort.
264271
// (src port, dst IP, dst port) uniquely identifies a session, so it needs a dedicated connected socket.
265272
originProxy, err := ingress.DialUDP(dstIP, dstPort)
266273
if err != nil {
267274
q.logger.Err(err).Msgf("Failed to create udp proxy to %s:%d", dstIP, dstPort)
268-
return err
275+
tracing.EndWithErrorStatus(registerSpan, err)
276+
return nil, err
269277
}
278+
registerSpan.SetAttributes(
279+
attribute.Bool("socket-bind-success", true),
280+
attribute.String("src", originProxy.LocalAddr().String()),
281+
)
282+
270283
session, err := q.sessionManager.RegisterSession(ctx, sessionID, originProxy)
271284
if err != nil {
272285
q.logger.Err(err).Str("sessionID", sessionID.String()).Msgf("Failed to register udp session")
273-
return err
286+
tracing.EndWithErrorStatus(registerSpan, err)
287+
return nil, err
274288
}
275289

276290
go q.serveUDPSession(session, closeAfterIdleHint)
277291

278292
q.logger.Debug().Str("sessionID", sessionID.String()).Str("src", originProxy.LocalAddr().String()).Str("dst", fmt.Sprintf("%s:%d", dstIP, dstPort)).Msgf("Registered session")
279-
return nil
293+
tracing.End(registerSpan)
294+
295+
resp := tunnelpogs.RegisterUdpSessionResponse{
296+
Spans: traceCtx.GetProtoSpans(),
297+
}
298+
299+
return &resp, nil
280300
}
281301

282302
func (q *QUICConnection) serveUDPSession(session *datagramsession.Session, closeAfterIdleHint time.Duration) {

connection/quic_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/cloudflare/cloudflared/datagramsession"
2727
quicpogs "github.com/cloudflare/cloudflared/quic"
2828
"github.com/cloudflare/cloudflared/tracing"
29+
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
2930
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
3031
)
3132

@@ -554,7 +555,7 @@ func TestNopCloserReadWriterCloseAfterEOF(t *testing.T) {
554555
require.Equal(t, n, 9)
555556

556557
// force another read to read eof
557-
n, err = readerWriter.Read(buffer)
558+
_, err = readerWriter.Read(buffer)
558559
require.Equal(t, err, io.EOF)
559560

560561
// close
@@ -652,8 +653,8 @@ type mockSessionRPCServer struct {
652653
calledUnregisterChan chan struct{}
653654
}
654655

655-
func (s mockSessionRPCServer) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfter time.Duration, traceContext string) error {
656-
return fmt.Errorf("mockSessionRPCServer doesn't implement RegisterUdpSession")
656+
func (s mockSessionRPCServer) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfter time.Duration, traceContext string) (*pogs.RegisterUdpSessionResponse, error) {
657+
return nil, fmt.Errorf("mockSessionRPCServer doesn't implement RegisterUdpSession")
657658
}
658659

659660
func (s mockSessionRPCServer) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, reason string) error {

quic/quic_protocol_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -230,23 +230,23 @@ type mockSessionRPCServer struct {
230230
traceContext string
231231
}
232232

233-
func (s mockSessionRPCServer) RegisterUdpSession(_ context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfter time.Duration, traceContext string) error {
233+
func (s mockSessionRPCServer) RegisterUdpSession(_ context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfter time.Duration, traceContext string) (*tunnelpogs.RegisterUdpSessionResponse, error) {
234234
if s.sessionID != sessionID {
235-
return fmt.Errorf("expect session ID %s, got %s", s.sessionID, sessionID)
235+
return nil, fmt.Errorf("expect session ID %s, got %s", s.sessionID, sessionID)
236236
}
237237
if !s.dstIP.Equal(dstIP) {
238-
return fmt.Errorf("expect destination IP %s, got %s", s.dstIP, dstIP)
238+
return nil, fmt.Errorf("expect destination IP %s, got %s", s.dstIP, dstIP)
239239
}
240240
if s.dstPort != dstPort {
241-
return fmt.Errorf("expect destination port %d, got %d", s.dstPort, dstPort)
241+
return nil, fmt.Errorf("expect destination port %d, got %d", s.dstPort, dstPort)
242242
}
243243
if s.closeIdleAfter != closeIdleAfter {
244-
return fmt.Errorf("expect closeIdleAfter %d, got %d", s.closeIdleAfter, closeIdleAfter)
244+
return nil, fmt.Errorf("expect closeIdleAfter %d, got %d", s.closeIdleAfter, closeIdleAfter)
245245
}
246246
if s.traceContext != traceContext {
247-
return fmt.Errorf("expect traceContext %s, got %s", s.traceContext, traceContext)
247+
return nil, fmt.Errorf("expect traceContext %s, got %s", s.traceContext, traceContext)
248248
}
249-
return nil
249+
return &tunnelpogs.RegisterUdpSessionResponse{}, nil
250250
}
251251

252252
func (s mockSessionRPCServer) UnregisterUdpSession(_ context.Context, sessionID uuid.UUID, message string) error {

tracing/client.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ type InMemoryClient interface {
2424
// Spans returns a copy of the list of in-memory stored spans as a base64
2525
// encoded otlp protobuf string.
2626
Spans() (string, error)
27+
// ProtoSpans returns a copy of the list of in-memory stored spans as otlp
28+
// protobuf byte array.
29+
ProtoSpans() ([]byte, error)
2730
}
2831

2932
// InMemoryOtlpClient is a client implementation for otlptrace.Client
@@ -55,19 +58,24 @@ func (mc *InMemoryOtlpClient) UploadTraces(_ context.Context, protoSpans []*trac
5558

5659
// Spans returns the list of in-memory stored spans as a base64 encoded otlp protobuf string.
5760
func (mc *InMemoryOtlpClient) Spans() (string, error) {
61+
data, err := mc.ProtoSpans()
62+
if err != nil {
63+
return "", err
64+
}
65+
return base64.StdEncoding.EncodeToString(data), nil
66+
}
67+
68+
// ProtoSpans returns the list of in-memory stored spans as the protobuf byte array.
69+
func (mc *InMemoryOtlpClient) ProtoSpans() ([]byte, error) {
5870
mc.mu.Lock()
5971
defer mc.mu.Unlock()
6072
if len(mc.spans) <= 0 {
61-
return "", errNoTraces
73+
return nil, errNoTraces
6274
}
6375
pbRequest := &coltracepb.ExportTraceServiceRequest{
6476
ResourceSpans: mc.spans,
6577
}
66-
data, err := proto.Marshal(pbRequest)
67-
if err != nil {
68-
return "", err
69-
}
70-
return base64.StdEncoding.EncodeToString(data), nil
78+
return proto.Marshal(pbRequest)
7179
}
7280

7381
// NoopOtlpClient is a client implementation for otlptrace.Client that does nothing
@@ -89,3 +97,8 @@ func (mc *NoopOtlpClient) UploadTraces(_ context.Context, _ []*tracepb.ResourceS
8997
func (mc *NoopOtlpClient) Spans() (string, error) {
9098
return "", errNoopTracer
9199
}
100+
101+
// Spans always returns no traces error
102+
func (mc *NoopOtlpClient) ProtoSpans() ([]byte, error) {
103+
return nil, errNoopTracer
104+
}

tracing/tracing.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ type TracedContext struct {
9393
*cfdTracer
9494
}
9595

96-
// NewTracedHTTPRequest creates a new tracer for the current HTTP request context.
96+
// NewTracedContext creates a new tracer for the current context.
9797
func NewTracedContext(ctx context.Context, traceContext string, log *zerolog.Logger) *TracedContext {
9898
ctx, exists := extractTraceFromString(ctx, traceContext)
9999
if !exists {
@@ -155,6 +155,24 @@ func (cft *cfdTracer) GetSpans() (enc string) {
155155
return
156156
}
157157

158+
// GetProtoSpans returns the spans as the otlp traces in protobuf byte array.
159+
func (cft *cfdTracer) GetProtoSpans() (proto []byte) {
160+
proto, err := cft.exporter.ProtoSpans()
161+
switch err {
162+
case nil:
163+
break
164+
case errNoTraces:
165+
cft.log.Trace().Err(err).Msgf("expected traces to be available")
166+
return
167+
case errNoopTracer:
168+
return // noop tracer has no traces
169+
default:
170+
cft.log.Debug().Err(err)
171+
return
172+
}
173+
return
174+
}
175+
158176
// AddSpans assigns spans as base64 encoded protobuf otlp traces to provided
159177
// HTTP headers.
160178
func (cft *cfdTracer) AddSpans(headers http.Header) {
@@ -171,6 +189,11 @@ func (cft *cfdTracer) AddSpans(headers http.Header) {
171189
headers[CanonicalCloudflaredTracingHeader] = []string{enc}
172190
}
173191

192+
// End will set the OK status for the span and then end it.
193+
func End(span trace.Span) {
194+
endSpan(span, -1, codes.Ok, nil)
195+
}
196+
174197
// EndWithErrorStatus will set a status for the span and then end it.
175198
func EndWithErrorStatus(span trace.Span, err error) {
176199
endSpan(span, -1, codes.Error, err)

tunnelrpc/pogs/sessionrpc.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@ import (
1414
)
1515

1616
type SessionManager interface {
17-
RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration, traceContext string) error
17+
// RegisterUdpSession is the call provided to cloudflared to handle an incoming
18+
// capnproto RegisterUdpSession request from the edge.
19+
RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration, traceContext string) (*RegisterUdpSessionResponse, error)
20+
// UnregisterUdpSession is the call provided to cloudflared to handle an incoming
21+
// capnproto UnregisterUdpSession request from the edge.
1822
UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error
1923
}
2024

@@ -55,14 +59,15 @@ func (i SessionManager_PogsImpl) RegisterUdpSession(p tunnelrpc.SessionManager_r
5559
return err
5660
}
5761

58-
resp := RegisterUdpSessionResponse{}
59-
registrationErr := i.impl.RegisterUdpSession(p.Ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext)
62+
resp, registrationErr := i.impl.RegisterUdpSession(p.Ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext)
6063
if registrationErr != nil {
64+
// Make sure to assign a response even if one is not returned from register
65+
if resp == nil {
66+
resp = &RegisterUdpSessionResponse{}
67+
}
6168
resp.Err = registrationErr
6269
}
6370

64-
// TUN-6689: Add spans to return path for RegisterUdpSession
65-
6671
result, err := p.Results.NewResult()
6772
if err != nil {
6873
return err

0 commit comments

Comments
 (0)