Skip to content

Commit 7f1c890

Browse files
committed
Revert "TUN-6576: Consume cf-trace-id from incoming TCP requests to create root span"
This reverts commit f48a7cd.
1 parent f48a7cd commit 7f1c890

File tree

13 files changed

+62
-168
lines changed

13 files changed

+62
-168
lines changed

connection/connection.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -123,24 +123,23 @@ func (t Type) String() string {
123123

124124
// OriginProxy is how data flows from cloudflared to the origin services running behind it.
125125
type OriginProxy interface {
126-
ProxyHTTP(w ResponseWriter, tr *tracing.TracedHTTPRequest, isWebsocket bool) error
126+
ProxyHTTP(w ResponseWriter, tr *tracing.TracedRequest, isWebsocket bool) error
127127
ProxyTCP(ctx context.Context, rwa ReadWriteAcker, req *TCPRequest) error
128128
}
129129

130130
// TCPRequest defines the input format needed to perform a TCP proxy.
131131
type TCPRequest struct {
132-
Dest string
133-
CFRay string
134-
LBProbe bool
135-
FlowID string
136-
CfTraceID string
132+
Dest string
133+
CFRay string
134+
LBProbe bool
135+
FlowID string
137136
}
138137

139138
// ReadWriteAcker is a readwriter with the ability to Acknowledge to the downstream (edge) that the origin has
140139
// accepted the connection.
141140
type ReadWriteAcker interface {
142141
io.ReadWriter
143-
AckConnection(tracePropagation string) error
142+
AckConnection() error
144143
}
145144

146145
// HTTPResponseReadWriteAcker is an HTTP implementation of ReadWriteAcker.
@@ -169,7 +168,7 @@ func (h *HTTPResponseReadWriteAcker) Write(p []byte) (int, error) {
169168

170169
// AckConnection acks an HTTP connection by sending a switch protocols status code that enables the caller to
171170
// upgrade to streams.
172-
func (h *HTTPResponseReadWriteAcker) AckConnection(tracePropagation string) error {
171+
func (h *HTTPResponseReadWriteAcker) AckConnection() error {
173172
resp := &http.Response{
174173
Status: switchingProtocolText,
175174
StatusCode: http.StatusSwitchingProtocols,
@@ -180,10 +179,6 @@ func (h *HTTPResponseReadWriteAcker) AckConnection(tracePropagation string) erro
180179
resp.Header = websocket.NewResponseHeader(h.req)
181180
}
182181

183-
if tracePropagation != "" {
184-
resp.Header.Add(tracing.CanonicalCloudflaredTracingHeader, tracePropagation)
185-
}
186-
187182
return h.w.WriteRespHeaders(resp.StatusCode, resp.Header)
188183
}
189184

connection/connection_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ var (
3030
testLargeResp = make([]byte, largeFileSize)
3131
)
3232

33-
var _ ReadWriteAcker = (*HTTPResponseReadWriteAcker)(nil)
34-
3533
type testRequest struct {
3634
name string
3735
endpoint string
@@ -62,7 +60,7 @@ type mockOriginProxy struct{}
6260

6361
func (moc *mockOriginProxy) ProxyHTTP(
6462
w ResponseWriter,
65-
tr *tracing.TracedHTTPRequest,
63+
tr *tracing.TracedRequest,
6664
isWebsocket bool,
6765
) error {
6866
req := tr.Request

connection/h2mux.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ func NewH2muxConnection(
6969
connIndex uint8,
7070
observer *Observer,
7171
gracefulShutdownC <-chan struct{},
72-
log *zerolog.Logger,
7372
) (*h2muxConnection, error, bool) {
7473
h := &h2muxConnection{
7574
orchestrator: orchestrator,
@@ -80,7 +79,6 @@ func NewH2muxConnection(
8079
observer: observer,
8180
gracefulShutdownC: gracefulShutdownC,
8281
newRPCClientFunc: newRegistrationRPCClient,
83-
log: log,
8482
}
8583

8684
// Establish a muxed connection with the edge
@@ -236,7 +234,7 @@ func (h *h2muxConnection) ServeStream(stream *h2mux.MuxedStream) error {
236234
return err
237235
}
238236

239-
err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, h.log), sourceConnectionType == TypeWebsocket)
237+
err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedRequest(req), sourceConnectionType == TypeWebsocket)
240238
if err != nil {
241239
respWriter.WriteErrorResponse()
242240
}

connection/h2mux_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func newH2MuxConnection(t require.TestingT) (*h2muxConnection, *h2mux.Muxer) {
4848
}()
4949
var connIndex = uint8(0)
5050
testObserver := NewObserver(&log, &log)
51-
h2muxConn, err, _ := NewH2muxConnection(testOrchestrator, testGracePeriod, testMuxerConfig, originConn, connIndex, testObserver, nil, &log)
51+
h2muxConn, err, _ := NewH2muxConnection(testOrchestrator, testGracePeriod, testMuxerConfig, originConn, connIndex, testObserver, nil)
5252
require.NoError(t, err)
5353
return h2muxConn, <-edgeMuxChan
5454
}

connection/http2.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
132132
case TypeWebsocket, TypeHTTP:
133133
stripWebsocketUpgradeHeader(r)
134134
// Check for tracing on request
135-
tr := tracing.NewTracedHTTPRequest(r, c.log)
135+
tr := tracing.NewTracedRequest(r)
136136
if err := originProxy.ProxyHTTP(respWriter, tr, connType == TypeWebsocket); err != nil {
137137
err := fmt.Errorf("Failed to proxy HTTP: %w", err)
138138
c.log.Error().Err(err)

connection/quic.go

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ func (q *QUICConnection) dispatchRequest(ctx context.Context, stream *quicpogs.R
197197

198198
switch request.Type {
199199
case quicpogs.ConnectionTypeHTTP, quicpogs.ConnectionTypeWebsocket:
200-
tracedReq, err := buildHTTPRequest(ctx, request, stream, q.logger)
200+
tracedReq, err := buildHTTPRequest(ctx, request, stream)
201201
if err != nil {
202202
return err
203203
}
@@ -208,9 +208,8 @@ func (q *QUICConnection) dispatchRequest(ctx context.Context, stream *quicpogs.R
208208
rwa := &streamReadWriteAcker{stream}
209209
metadata := request.MetadataMap()
210210
return originProxy.ProxyTCP(ctx, rwa, &TCPRequest{
211-
Dest: request.Dest,
212-
FlowID: metadata[QUICMetadataFlowID],
213-
CfTraceID: metadata[tracing.TracerContextName],
211+
Dest: request.Dest,
212+
FlowID: metadata[QUICMetadataFlowID],
214213
})
215214
}
216215
return nil
@@ -297,12 +296,8 @@ type streamReadWriteAcker struct {
297296
}
298297

299298
// AckConnection acks response back to the proxy.
300-
func (s *streamReadWriteAcker) AckConnection(tracePropagation string) error {
301-
metadata := quicpogs.Metadata{
302-
Key: tracing.CanonicalCloudflaredTracingHeader,
303-
Val: tracePropagation,
304-
}
305-
return s.WriteConnectResponseData(nil, metadata)
299+
func (s *streamReadWriteAcker) AckConnection() error {
300+
return s.WriteConnectResponseData(nil)
306301
}
307302

308303
// httpResponseAdapter translates responses written by the HTTP Proxy into ones that can be used in QUIC.
@@ -330,12 +325,7 @@ func (hrw httpResponseAdapter) WriteErrorResponse(err error) {
330325
hrw.WriteConnectResponseData(err, quicpogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(http.StatusBadGateway)})
331326
}
332327

333-
func buildHTTPRequest(
334-
ctx context.Context,
335-
connectRequest *quicpogs.ConnectRequest,
336-
body io.ReadCloser,
337-
log *zerolog.Logger,
338-
) (*tracing.TracedHTTPRequest, error) {
328+
func buildHTTPRequest(ctx context.Context, connectRequest *quicpogs.ConnectRequest, body io.ReadCloser) (*tracing.TracedRequest, error) {
339329
metadata := connectRequest.MetadataMap()
340330
dest := connectRequest.Dest
341331
method := metadata[HTTPMethodKey]
@@ -377,7 +367,7 @@ func buildHTTPRequest(
377367
stripWebsocketUpgradeHeader(req)
378368

379369
// Check for tracing on request
380-
tracedReq := tracing.NewTracedHTTPRequest(req, log)
370+
tracedReq := tracing.NewTracedRequest(req)
381371
return tracedReq, err
382372
}
383373

connection/quic_test.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ var (
3636
}
3737
)
3838

39-
var _ ReadWriteAcker = (*streamReadWriteAcker)(nil)
40-
4139
// TestQUICServer tests if a quic server accepts and responds to a quic client with the acceptance protocol.
4240
// It also serves as a demonstration for communication with the QUIC connection started by a cloudflared.
4341
func TestQUICServer(t *testing.T) {
@@ -222,7 +220,7 @@ func quicServer(
222220

223221
type mockOriginProxyWithRequest struct{}
224222

225-
func (moc *mockOriginProxyWithRequest) ProxyHTTP(w ResponseWriter, tr *tracing.TracedHTTPRequest, isWebsocket bool) error {
223+
func (moc *mockOriginProxyWithRequest) ProxyHTTP(w ResponseWriter, tr *tracing.TracedRequest, isWebsocket bool) error {
226224
// These are a series of crude tests to ensure the headers and http related data is transferred from
227225
// metadata.
228226
r := tr.Request
@@ -477,10 +475,9 @@ func TestBuildHTTPRequest(t *testing.T) {
477475
},
478476
}
479477

480-
log := zerolog.Nop()
481478
for _, test := range tests {
482479
t.Run(test.name, func(t *testing.T) {
483-
req, err := buildHTTPRequest(context.Background(), test.connectRequest, test.body, &log)
480+
req, err := buildHTTPRequest(context.Background(), test.connectRequest, test.body)
484481
assert.NoError(t, err)
485482
test.req = test.req.WithContext(req.Context())
486483
assert.Equal(t, test.req, req.Request)
@@ -489,7 +486,7 @@ func TestBuildHTTPRequest(t *testing.T) {
489486
}
490487

491488
func (moc *mockOriginProxyWithRequest) ProxyTCP(ctx context.Context, rwa ReadWriteAcker, tcpRequest *TCPRequest) error {
492-
rwa.AckConnection("")
489+
rwa.AckConnection()
493490
io.Copy(rwa, rwa)
494491
return nil
495492
}

orchestration/orchestrator_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ func proxyHTTP(originProxy connection.OriginProxy, hostname string) (*http.Respo
355355
return nil, err
356356
}
357357

358-
err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, &log), false)
358+
err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedRequest(req), false)
359359
if err != nil {
360360
return nil, err
361361
}
@@ -604,7 +604,7 @@ func TestPersistentConnection(t *testing.T) {
604604
respWriter, err := connection.NewHTTP2RespWriter(req, wsRespReadWriter, connection.TypeWebsocket, &log)
605605
require.NoError(t, err)
606606

607-
err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, &log), true)
607+
err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedRequest(req), true)
608608
require.NoError(t, err)
609609
}()
610610

proxy/proxy.go

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func NewOriginProxy(
6363
// a simple roundtrip or a tcp/websocket dial depending on ingres rule setup.
6464
func (p *Proxy) ProxyHTTP(
6565
w connection.ResponseWriter,
66-
tr *tracing.TracedHTTPRequest,
66+
tr *tracing.TracedRequest,
6767
isWebsocket bool,
6868
) error {
6969
incrementRequests()
@@ -108,7 +108,7 @@ func (p *Proxy) ProxyHTTP(
108108
}
109109

110110
rws := connection.NewHTTPResponseReadWriterAcker(w, req)
111-
if err := p.proxyStream(tr.ToTracedContext(), rws, dest, originProxy); err != nil {
111+
if err := p.proxyStream(req.Context(), rws, dest, originProxy); err != nil {
112112
rule, srv := ruleField(p.ingressRules, ruleNum)
113113
p.logRequestError(err, cfRay, "", rule, srv)
114114
return err
@@ -137,11 +137,9 @@ func (p *Proxy) ProxyTCP(
137137
serveCtx, cancel := context.WithCancel(ctx)
138138
defer cancel()
139139

140-
tracedCtx := tracing.NewTracedContext(serveCtx, req.CfTraceID, p.log)
141-
142140
p.log.Debug().Str(LogFieldFlowID, req.FlowID).Msg("tcp proxy stream started")
143141

144-
if err := p.proxyStream(tracedCtx, rwa, req.Dest, p.warpRouting.Proxy); err != nil {
142+
if err := p.proxyStream(serveCtx, rwa, req.Dest, p.warpRouting.Proxy); err != nil {
145143
p.logRequestError(err, req.CFRay, req.FlowID, "", ingress.ServiceWarpRouting)
146144
return err
147145
}
@@ -162,7 +160,7 @@ func ruleField(ing ingress.Ingress, ruleNum int) (ruleID string, srv string) {
162160
// ProxyHTTPRequest proxies requests of underlying type http and websocket to the origin service.
163161
func (p *Proxy) proxyHTTPRequest(
164162
w connection.ResponseWriter,
165-
tr *tracing.TracedHTTPRequest,
163+
tr *tracing.TracedRequest,
166164
httpService ingress.HTTPOriginProxy,
167165
isWebsocket bool,
168166
disableChunkedEncoding bool,
@@ -213,7 +211,7 @@ func (p *Proxy) proxyHTTPRequest(
213211
}
214212

215213
// Add spans to response header (if available)
216-
tr.AddSpans(resp.Header)
214+
tr.AddSpans(resp.Header, p.log)
217215

218216
err = w.WriteRespHeaders(resp.StatusCode, resp.Header)
219217
if err != nil {
@@ -250,23 +248,17 @@ func (p *Proxy) proxyHTTPRequest(
250248
// proxyStream proxies type TCP and other underlying types if the connection is defined as a stream oriented
251249
// ingress rule.
252250
func (p *Proxy) proxyStream(
253-
tr *tracing.TracedContext,
251+
ctx context.Context,
254252
rwa connection.ReadWriteAcker,
255253
dest string,
256254
connectionProxy ingress.StreamBasedOriginProxy,
257255
) error {
258-
ctx := tr.Context
259-
_, connectSpan := tr.Tracer().Start(ctx, "stream_connect")
260256
originConn, err := connectionProxy.EstablishConnection(ctx, dest)
261257
if err != nil {
262-
tracing.EndWithErrorStatus(connectSpan, err)
263258
return err
264259
}
265-
connectSpan.End()
266-
267-
encodedSpans := tr.GetSpans()
268260

269-
if err := rwa.AckConnection(encodedSpans); err != nil {
261+
if err := rwa.AckConnection(); err != nil {
270262
return err
271263
}
272264

proxy/proxy_test.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,7 @@ func testProxyHTTP(proxy connection.OriginProxy) func(t *testing.T) {
157157
req, err := http.NewRequest(http.MethodGet, "http://localhost:8080", nil)
158158
require.NoError(t, err)
159159

160-
log := zerolog.Nop()
161-
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, &log), false)
160+
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedRequest(req), false)
162161
require.NoError(t, err)
163162
for _, tag := range testTags {
164163
assert.Equal(t, tag.Value, req.Header.Get(TagHeaderNamePrefix+tag.Name))
@@ -185,8 +184,7 @@ func testProxyWebsocket(proxy connection.OriginProxy) func(t *testing.T) {
185184

186185
errGroup, ctx := errgroup.WithContext(ctx)
187186
errGroup.Go(func() error {
188-
log := zerolog.Nop()
189-
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, &log), true)
187+
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedRequest(req), true)
190188
require.NoError(t, err)
191189

192190
require.Equal(t, http.StatusSwitchingProtocols, responseWriter.Code)
@@ -247,8 +245,7 @@ func testProxySSE(proxy connection.OriginProxy) func(t *testing.T) {
247245
wg.Add(1)
248246
go func() {
249247
defer wg.Done()
250-
log := zerolog.Nop()
251-
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, &log), false)
248+
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedRequest(req), false)
252249
require.NoError(t, err)
253250

254251
require.Equal(t, http.StatusOK, responseWriter.Code)
@@ -360,7 +357,7 @@ func runIngressTestScenarios(t *testing.T, unvalidatedIngress []config.Unvalidat
360357
req, err := http.NewRequest(http.MethodGet, test.url, nil)
361358
require.NoError(t, err)
362359

363-
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, &log), false)
360+
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedRequest(req), false)
364361
require.NoError(t, err)
365362

366363
assert.Equal(t, test.expectedStatus, responseWriter.Code)
@@ -407,7 +404,7 @@ func TestProxyError(t *testing.T) {
407404
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1", nil)
408405
assert.NoError(t, err)
409406

410-
assert.Error(t, proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, &log), false))
407+
assert.Error(t, proxy.ProxyHTTP(responseWriter, tracing.NewTracedRequest(req), false))
411408
}
412409

413410
type replayer struct {
@@ -685,8 +682,7 @@ func TestConnections(t *testing.T) {
685682
rwa := connection.NewHTTPResponseReadWriterAcker(respWriter, req)
686683
err = proxy.ProxyTCP(ctx, rwa, &connection.TCPRequest{Dest: dest})
687684
} else {
688-
log := zerolog.Nop()
689-
err = proxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, &log), test.args.connectionType == connection.TypeWebsocket)
685+
err = proxy.ProxyHTTP(respWriter, tracing.NewTracedRequest(req), test.args.connectionType == connection.TypeWebsocket)
690686
}
691687

692688
cancel()

0 commit comments

Comments
 (0)