Skip to content

Commit b9cba7f

Browse files
committed
TUN-6576: Consume cf-trace-id from incoming TCP requests to create root span
(cherry picked from commit f48a7cd)
1 parent 7f1c890 commit b9cba7f

File tree

13 files changed

+166
-62
lines changed

13 files changed

+166
-62
lines changed

connection/connection.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,23 +123,24 @@ func (t Type) String() string {
123123

124124
// OriginProxy is how data flows from cloudflared to the origin services running behind it.
125125
type OriginProxy interface {
126-
ProxyHTTP(w ResponseWriter, tr *tracing.TracedRequest, isWebsocket bool) error
126+
ProxyHTTP(w ResponseWriter, tr *tracing.TracedHTTPRequest, isWebsocket bool) error
127127
ProxyTCP(ctx context.Context, rwa ReadWriteAcker, req *TCPRequest) error
128128
}
129129

130130
// TCPRequest defines the input format needed to perform a TCP proxy.
131131
type TCPRequest struct {
132-
Dest string
133-
CFRay string
134-
LBProbe bool
135-
FlowID string
132+
Dest string
133+
CFRay string
134+
LBProbe bool
135+
FlowID string
136+
CfTraceID string
136137
}
137138

138139
// ReadWriteAcker is a readwriter with the ability to Acknowledge to the downstream (edge) that the origin has
139140
// accepted the connection.
140141
type ReadWriteAcker interface {
141142
io.ReadWriter
142-
AckConnection() error
143+
AckConnection(tracePropagation string) error
143144
}
144145

145146
// HTTPResponseReadWriteAcker is an HTTP implementation of ReadWriteAcker.
@@ -168,7 +169,7 @@ func (h *HTTPResponseReadWriteAcker) Write(p []byte) (int, error) {
168169

169170
// AckConnection acks an HTTP connection by sending a switch protocols status code that enables the caller to
170171
// upgrade to streams.
171-
func (h *HTTPResponseReadWriteAcker) AckConnection() error {
172+
func (h *HTTPResponseReadWriteAcker) AckConnection(tracePropagation string) error {
172173
resp := &http.Response{
173174
Status: switchingProtocolText,
174175
StatusCode: http.StatusSwitchingProtocols,
@@ -179,6 +180,10 @@ func (h *HTTPResponseReadWriteAcker) AckConnection() error {
179180
resp.Header = websocket.NewResponseHeader(h.req)
180181
}
181182

183+
if tracePropagation != "" {
184+
resp.Header.Add(tracing.CanonicalCloudflaredTracingHeader, tracePropagation)
185+
}
186+
182187
return h.w.WriteRespHeaders(resp.StatusCode, resp.Header)
183188
}
184189

connection/connection_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ var (
3030
testLargeResp = make([]byte, largeFileSize)
3131
)
3232

33+
var _ ReadWriteAcker = (*HTTPResponseReadWriteAcker)(nil)
34+
3335
type testRequest struct {
3436
name string
3537
endpoint string
@@ -60,7 +62,7 @@ type mockOriginProxy struct{}
6062

6163
func (moc *mockOriginProxy) ProxyHTTP(
6264
w ResponseWriter,
63-
tr *tracing.TracedRequest,
65+
tr *tracing.TracedHTTPRequest,
6466
isWebsocket bool,
6567
) error {
6668
req := tr.Request

connection/h2mux.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func NewH2muxConnection(
6969
connIndex uint8,
7070
observer *Observer,
7171
gracefulShutdownC <-chan struct{},
72+
log *zerolog.Logger,
7273
) (*h2muxConnection, error, bool) {
7374
h := &h2muxConnection{
7475
orchestrator: orchestrator,
@@ -79,6 +80,7 @@ func NewH2muxConnection(
7980
observer: observer,
8081
gracefulShutdownC: gracefulShutdownC,
8182
newRPCClientFunc: newRegistrationRPCClient,
83+
log: log,
8284
}
8385

8486
// Establish a muxed connection with the edge
@@ -234,7 +236,7 @@ func (h *h2muxConnection) ServeStream(stream *h2mux.MuxedStream) error {
234236
return err
235237
}
236238

237-
err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedRequest(req), sourceConnectionType == TypeWebsocket)
239+
err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, h.log), sourceConnectionType == TypeWebsocket)
238240
if err != nil {
239241
respWriter.WriteErrorResponse()
240242
}

connection/h2mux_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func newH2MuxConnection(t require.TestingT) (*h2muxConnection, *h2mux.Muxer) {
4848
}()
4949
var connIndex = uint8(0)
5050
testObserver := NewObserver(&log, &log)
51-
h2muxConn, err, _ := NewH2muxConnection(testOrchestrator, testGracePeriod, testMuxerConfig, originConn, connIndex, testObserver, nil)
51+
h2muxConn, err, _ := NewH2muxConnection(testOrchestrator, testGracePeriod, testMuxerConfig, originConn, connIndex, testObserver, nil, &log)
5252
require.NoError(t, err)
5353
return h2muxConn, <-edgeMuxChan
5454
}

connection/http2.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
132132
case TypeWebsocket, TypeHTTP:
133133
stripWebsocketUpgradeHeader(r)
134134
// Check for tracing on request
135-
tr := tracing.NewTracedRequest(r)
135+
tr := tracing.NewTracedHTTPRequest(r, c.log)
136136
if err := originProxy.ProxyHTTP(respWriter, tr, connType == TypeWebsocket); err != nil {
137137
err := fmt.Errorf("Failed to proxy HTTP: %w", err)
138138
c.log.Error().Err(err)

connection/quic.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ func (q *QUICConnection) dispatchRequest(ctx context.Context, stream *quicpogs.R
197197

198198
switch request.Type {
199199
case quicpogs.ConnectionTypeHTTP, quicpogs.ConnectionTypeWebsocket:
200-
tracedReq, err := buildHTTPRequest(ctx, request, stream)
200+
tracedReq, err := buildHTTPRequest(ctx, request, stream, q.logger)
201201
if err != nil {
202202
return err
203203
}
@@ -208,8 +208,9 @@ func (q *QUICConnection) dispatchRequest(ctx context.Context, stream *quicpogs.R
208208
rwa := &streamReadWriteAcker{stream}
209209
metadata := request.MetadataMap()
210210
return originProxy.ProxyTCP(ctx, rwa, &TCPRequest{
211-
Dest: request.Dest,
212-
FlowID: metadata[QUICMetadataFlowID],
211+
Dest: request.Dest,
212+
FlowID: metadata[QUICMetadataFlowID],
213+
CfTraceID: metadata[tracing.TracerContextName],
213214
})
214215
}
215216
return nil
@@ -296,8 +297,12 @@ type streamReadWriteAcker struct {
296297
}
297298

298299
// AckConnection acks response back to the proxy.
299-
func (s *streamReadWriteAcker) AckConnection() error {
300-
return s.WriteConnectResponseData(nil)
300+
func (s *streamReadWriteAcker) AckConnection(tracePropagation string) error {
301+
metadata := quicpogs.Metadata{
302+
Key: tracing.CanonicalCloudflaredTracingHeader,
303+
Val: tracePropagation,
304+
}
305+
return s.WriteConnectResponseData(nil, metadata)
301306
}
302307

303308
// httpResponseAdapter translates responses written by the HTTP Proxy into ones that can be used in QUIC.
@@ -325,7 +330,12 @@ func (hrw httpResponseAdapter) WriteErrorResponse(err error) {
325330
hrw.WriteConnectResponseData(err, quicpogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(http.StatusBadGateway)})
326331
}
327332

328-
func buildHTTPRequest(ctx context.Context, connectRequest *quicpogs.ConnectRequest, body io.ReadCloser) (*tracing.TracedRequest, error) {
333+
func buildHTTPRequest(
334+
ctx context.Context,
335+
connectRequest *quicpogs.ConnectRequest,
336+
body io.ReadCloser,
337+
log *zerolog.Logger,
338+
) (*tracing.TracedHTTPRequest, error) {
329339
metadata := connectRequest.MetadataMap()
330340
dest := connectRequest.Dest
331341
method := metadata[HTTPMethodKey]
@@ -367,7 +377,7 @@ func buildHTTPRequest(ctx context.Context, connectRequest *quicpogs.ConnectReque
367377
stripWebsocketUpgradeHeader(req)
368378

369379
// Check for tracing on request
370-
tracedReq := tracing.NewTracedRequest(req)
380+
tracedReq := tracing.NewTracedHTTPRequest(req, log)
371381
return tracedReq, err
372382
}
373383

connection/quic_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ var (
3636
}
3737
)
3838

39+
var _ ReadWriteAcker = (*streamReadWriteAcker)(nil)
40+
3941
// TestQUICServer tests if a quic server accepts and responds to a quic client with the acceptance protocol.
4042
// It also serves as a demonstration for communication with the QUIC connection started by a cloudflared.
4143
func TestQUICServer(t *testing.T) {
@@ -220,7 +222,7 @@ func quicServer(
220222

221223
type mockOriginProxyWithRequest struct{}
222224

223-
func (moc *mockOriginProxyWithRequest) ProxyHTTP(w ResponseWriter, tr *tracing.TracedRequest, isWebsocket bool) error {
225+
func (moc *mockOriginProxyWithRequest) ProxyHTTP(w ResponseWriter, tr *tracing.TracedHTTPRequest, isWebsocket bool) error {
224226
// These are a series of crude tests to ensure the headers and http related data is transferred from
225227
// metadata.
226228
r := tr.Request
@@ -475,9 +477,10 @@ func TestBuildHTTPRequest(t *testing.T) {
475477
},
476478
}
477479

480+
log := zerolog.Nop()
478481
for _, test := range tests {
479482
t.Run(test.name, func(t *testing.T) {
480-
req, err := buildHTTPRequest(context.Background(), test.connectRequest, test.body)
483+
req, err := buildHTTPRequest(context.Background(), test.connectRequest, test.body, &log)
481484
assert.NoError(t, err)
482485
test.req = test.req.WithContext(req.Context())
483486
assert.Equal(t, test.req, req.Request)
@@ -486,7 +489,7 @@ func TestBuildHTTPRequest(t *testing.T) {
486489
}
487490

488491
func (moc *mockOriginProxyWithRequest) ProxyTCP(ctx context.Context, rwa ReadWriteAcker, tcpRequest *TCPRequest) error {
489-
rwa.AckConnection()
492+
rwa.AckConnection("")
490493
io.Copy(rwa, rwa)
491494
return nil
492495
}

orchestration/orchestrator_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ func proxyHTTP(originProxy connection.OriginProxy, hostname string) (*http.Respo
355355
return nil, err
356356
}
357357

358-
err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedRequest(req), false)
358+
err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, &log), false)
359359
if err != nil {
360360
return nil, err
361361
}
@@ -604,7 +604,7 @@ func TestPersistentConnection(t *testing.T) {
604604
respWriter, err := connection.NewHTTP2RespWriter(req, wsRespReadWriter, connection.TypeWebsocket, &log)
605605
require.NoError(t, err)
606606

607-
err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedRequest(req), true)
607+
err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, &log), true)
608608
require.NoError(t, err)
609609
}()
610610

proxy/proxy.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func NewOriginProxy(
6363
// a simple roundtrip or a tcp/websocket dial depending on ingres rule setup.
6464
func (p *Proxy) ProxyHTTP(
6565
w connection.ResponseWriter,
66-
tr *tracing.TracedRequest,
66+
tr *tracing.TracedHTTPRequest,
6767
isWebsocket bool,
6868
) error {
6969
incrementRequests()
@@ -108,7 +108,7 @@ func (p *Proxy) ProxyHTTP(
108108
}
109109

110110
rws := connection.NewHTTPResponseReadWriterAcker(w, req)
111-
if err := p.proxyStream(req.Context(), rws, dest, originProxy); err != nil {
111+
if err := p.proxyStream(tr.ToTracedContext(), rws, dest, originProxy); err != nil {
112112
rule, srv := ruleField(p.ingressRules, ruleNum)
113113
p.logRequestError(err, cfRay, "", rule, srv)
114114
return err
@@ -137,9 +137,11 @@ func (p *Proxy) ProxyTCP(
137137
serveCtx, cancel := context.WithCancel(ctx)
138138
defer cancel()
139139

140+
tracedCtx := tracing.NewTracedContext(serveCtx, req.CfTraceID, p.log)
141+
140142
p.log.Debug().Str(LogFieldFlowID, req.FlowID).Msg("tcp proxy stream started")
141143

142-
if err := p.proxyStream(serveCtx, rwa, req.Dest, p.warpRouting.Proxy); err != nil {
144+
if err := p.proxyStream(tracedCtx, rwa, req.Dest, p.warpRouting.Proxy); err != nil {
143145
p.logRequestError(err, req.CFRay, req.FlowID, "", ingress.ServiceWarpRouting)
144146
return err
145147
}
@@ -160,7 +162,7 @@ func ruleField(ing ingress.Ingress, ruleNum int) (ruleID string, srv string) {
160162
// ProxyHTTPRequest proxies requests of underlying type http and websocket to the origin service.
161163
func (p *Proxy) proxyHTTPRequest(
162164
w connection.ResponseWriter,
163-
tr *tracing.TracedRequest,
165+
tr *tracing.TracedHTTPRequest,
164166
httpService ingress.HTTPOriginProxy,
165167
isWebsocket bool,
166168
disableChunkedEncoding bool,
@@ -211,7 +213,7 @@ func (p *Proxy) proxyHTTPRequest(
211213
}
212214

213215
// Add spans to response header (if available)
214-
tr.AddSpans(resp.Header, p.log)
216+
tr.AddSpans(resp.Header)
215217

216218
err = w.WriteRespHeaders(resp.StatusCode, resp.Header)
217219
if err != nil {
@@ -248,17 +250,23 @@ func (p *Proxy) proxyHTTPRequest(
248250
// proxyStream proxies type TCP and other underlying types if the connection is defined as a stream oriented
249251
// ingress rule.
250252
func (p *Proxy) proxyStream(
251-
ctx context.Context,
253+
tr *tracing.TracedContext,
252254
rwa connection.ReadWriteAcker,
253255
dest string,
254256
connectionProxy ingress.StreamBasedOriginProxy,
255257
) error {
258+
ctx := tr.Context
259+
_, connectSpan := tr.Tracer().Start(ctx, "stream_connect")
256260
originConn, err := connectionProxy.EstablishConnection(ctx, dest)
257261
if err != nil {
262+
tracing.EndWithErrorStatus(connectSpan, err)
258263
return err
259264
}
265+
connectSpan.End()
266+
267+
encodedSpans := tr.GetSpans()
260268

261-
if err := rwa.AckConnection(); err != nil {
269+
if err := rwa.AckConnection(encodedSpans); err != nil {
262270
return err
263271
}
264272

proxy/proxy_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,8 @@ func testProxyHTTP(proxy connection.OriginProxy) func(t *testing.T) {
157157
req, err := http.NewRequest(http.MethodGet, "http://localhost:8080", nil)
158158
require.NoError(t, err)
159159

160-
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedRequest(req), false)
160+
log := zerolog.Nop()
161+
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, &log), false)
161162
require.NoError(t, err)
162163
for _, tag := range testTags {
163164
assert.Equal(t, tag.Value, req.Header.Get(TagHeaderNamePrefix+tag.Name))
@@ -184,7 +185,8 @@ func testProxyWebsocket(proxy connection.OriginProxy) func(t *testing.T) {
184185

185186
errGroup, ctx := errgroup.WithContext(ctx)
186187
errGroup.Go(func() error {
187-
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedRequest(req), true)
188+
log := zerolog.Nop()
189+
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, &log), true)
188190
require.NoError(t, err)
189191

190192
require.Equal(t, http.StatusSwitchingProtocols, responseWriter.Code)
@@ -245,7 +247,8 @@ func testProxySSE(proxy connection.OriginProxy) func(t *testing.T) {
245247
wg.Add(1)
246248
go func() {
247249
defer wg.Done()
248-
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedRequest(req), false)
250+
log := zerolog.Nop()
251+
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, &log), false)
249252
require.NoError(t, err)
250253

251254
require.Equal(t, http.StatusOK, responseWriter.Code)
@@ -357,7 +360,7 @@ func runIngressTestScenarios(t *testing.T, unvalidatedIngress []config.Unvalidat
357360
req, err := http.NewRequest(http.MethodGet, test.url, nil)
358361
require.NoError(t, err)
359362

360-
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedRequest(req), false)
363+
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, &log), false)
361364
require.NoError(t, err)
362365

363366
assert.Equal(t, test.expectedStatus, responseWriter.Code)
@@ -404,7 +407,7 @@ func TestProxyError(t *testing.T) {
404407
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1", nil)
405408
assert.NoError(t, err)
406409

407-
assert.Error(t, proxy.ProxyHTTP(responseWriter, tracing.NewTracedRequest(req), false))
410+
assert.Error(t, proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, &log), false))
408411
}
409412

410413
type replayer struct {
@@ -682,7 +685,8 @@ func TestConnections(t *testing.T) {
682685
rwa := connection.NewHTTPResponseReadWriterAcker(respWriter, req)
683686
err = proxy.ProxyTCP(ctx, rwa, &connection.TCPRequest{Dest: dest})
684687
} else {
685-
err = proxy.ProxyHTTP(respWriter, tracing.NewTracedRequest(req), test.args.connectionType == connection.TypeWebsocket)
688+
log := zerolog.Nop()
689+
err = proxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, &log), test.args.connectionType == connection.TypeWebsocket)
686690
}
687691

688692
cancel()

0 commit comments

Comments
 (0)