Skip to content

Commit 513855d

Browse files
committed
TUN-7073: Fix propagating of bad stream request from origin to downstream
This changes fixes a bug where cloudflared was not propagating errors when proxying the body of an HTTP request. In a situation where we already sent HTTP status code, the eyeball would see the request as sucessfully when in fact it wasn't. To solve this, we need to guarantee that we produce HTTP RST_STREAM frames. This change was applied to both http2 and quic transports.
1 parent bd917d2 commit 513855d

File tree

4 files changed

+78
-39
lines changed

4 files changed

+78
-39
lines changed

connection/http2.go

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -115,52 +115,52 @@ func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
115115
return
116116
}
117117

118+
var requestErr error
118119
switch connType {
119120
case TypeControlStream:
120-
if err := c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions, c.orchestrator); err != nil {
121-
c.controlStreamErr = err
122-
c.log.Error().Err(err)
123-
respWriter.WriteErrorResponse()
121+
requestErr = c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions, c.orchestrator)
122+
if requestErr != nil {
123+
c.controlStreamErr = requestErr
124124
}
125125

126126
case TypeConfiguration:
127-
if err := c.handleConfigurationUpdate(respWriter, r); err != nil {
128-
c.log.Error().Err(err)
129-
respWriter.WriteErrorResponse()
130-
}
127+
requestErr = c.handleConfigurationUpdate(respWriter, r)
131128

132129
case TypeWebsocket, TypeHTTP:
133130
stripWebsocketUpgradeHeader(r)
134131
// Check for tracing on request
135132
tr := tracing.NewTracedHTTPRequest(r, c.log)
136133
if err := originProxy.ProxyHTTP(respWriter, tr, connType == TypeWebsocket); err != nil {
137-
err := fmt.Errorf("Failed to proxy HTTP: %w", err)
138-
c.log.Error().Err(err)
139-
respWriter.WriteErrorResponse()
134+
requestErr = fmt.Errorf("Failed to proxy HTTP: %w", err)
140135
}
141136

142137
case TypeTCP:
143138
host, err := getRequestHost(r)
144139
if err != nil {
145-
err := fmt.Errorf(`cloudflared received a warp-routing request with an empty host value: %w`, err)
146-
c.log.Error().Err(err)
147-
respWriter.WriteErrorResponse()
140+
requestErr = fmt.Errorf(`cloudflared received a warp-routing request with an empty host value: %w`, err)
141+
break
148142
}
149143

150144
rws := NewHTTPResponseReadWriterAcker(respWriter, r)
151-
if err := originProxy.ProxyTCP(r.Context(), rws, &TCPRequest{
145+
requestErr = originProxy.ProxyTCP(r.Context(), rws, &TCPRequest{
152146
Dest: host,
153147
CFRay: FindCfRayHeader(r),
154148
LBProbe: IsLBProbeRequest(r),
155149
CfTraceID: r.Header.Get(tracing.TracerContextName),
156-
}); err != nil {
157-
respWriter.WriteErrorResponse()
158-
}
150+
})
159151

160152
default:
161-
err := fmt.Errorf("Received unknown connection type: %s", connType)
162-
c.log.Error().Err(err)
163-
respWriter.WriteErrorResponse()
153+
requestErr = fmt.Errorf("Received unknown connection type: %s", connType)
154+
}
155+
156+
if requestErr != nil {
157+
c.log.Error().Err(requestErr).Msg("failed to serve incoming request")
158+
159+
// WriteErrorResponse will return false if status was already written. we need to abort handler.
160+
if !respWriter.WriteErrorResponse() {
161+
c.log.Debug().Msg("Handler aborted due to failure to write error response after status already sent")
162+
panic(http.ErrAbortHandler)
163+
}
164164
}
165165
}
166166

@@ -275,9 +275,16 @@ func (rp *http2RespWriter) WriteRespHeaders(status int, header http.Header) erro
275275
return nil
276276
}
277277

278-
func (rp *http2RespWriter) WriteErrorResponse() {
278+
func (rp *http2RespWriter) WriteErrorResponse() bool {
279+
if rp.statusWritten {
280+
return false
281+
}
282+
279283
rp.setResponseMetaHeader(responseMetaHeaderCfd)
280284
rp.w.WriteHeader(http.StatusBadGateway)
285+
rp.statusWritten = true
286+
287+
return true
281288
}
282289

283290
func (rp *http2RespWriter) setResponseMetaHeader(value string) {

connection/quic.go

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,11 @@ func (q *QUICConnection) runStream(quicStream quic.Stream) {
193193
// A call to close will simulate a close to the read-side, which will fail subsequent reads.
194194
noCloseStream := &nopCloserReadWriter{ReadWriteCloser: stream}
195195
if err := q.handleStream(ctx, noCloseStream); err != nil {
196-
q.logger.Err(err).Msg("Failed to handle QUIC stream")
196+
q.logger.Debug().Err(err).Msg("Failed to handle QUIC stream")
197+
198+
// if we received an error at this level, then close write side of stream with an error, which will result in
199+
// RST_STREAM frame.
200+
quicStream.CancelWrite(0)
197201
}
198202
}
199203

@@ -226,43 +230,60 @@ func (q *QUICConnection) handleDataStream(ctx context.Context, stream *quicpogs.
226230
return err
227231
}
228232

229-
if err := q.dispatchRequest(ctx, stream, err, request); err != nil {
230-
_ = stream.WriteConnectResponseData(err)
233+
if err, connectResponseSent := q.dispatchRequest(ctx, stream, err, request); err != nil {
231234
q.logger.Err(err).Str("type", request.Type.String()).Str("dest", request.Dest).Msg("Request failed")
235+
236+
// if the connectResponse was already sent and we had an error, we need to propagate it up, so that the stream is
237+
// closed with an RST_STREAM frame
238+
if connectResponseSent {
239+
return err
240+
}
241+
242+
if writeRespErr := stream.WriteConnectResponseData(err); writeRespErr != nil {
243+
return writeRespErr
244+
}
232245
}
233246

234247
return nil
235248
}
236249

237-
func (q *QUICConnection) dispatchRequest(ctx context.Context, stream *quicpogs.RequestServerStream, err error, request *quicpogs.ConnectRequest) error {
250+
// dispatchRequest will dispatch the request depending on the type and returns an error if it occurs.
251+
// More importantly, it also tells if the during processing of the request the ConnectResponse metadata was sent downstream.
252+
// This is important since it informs
253+
func (q *QUICConnection) dispatchRequest(ctx context.Context, stream *quicpogs.RequestServerStream, err error, request *quicpogs.ConnectRequest) (error, bool) {
238254
originProxy, err := q.orchestrator.GetOriginProxy()
239255
if err != nil {
240-
return err
256+
return err, false
241257
}
242258

243259
switch request.Type {
244260
case quicpogs.ConnectionTypeHTTP, quicpogs.ConnectionTypeWebsocket:
245261
tracedReq, err := buildHTTPRequest(ctx, request, stream, q.logger)
246262
if err != nil {
247-
return err
263+
return err, false
248264
}
249265
w := newHTTPResponseAdapter(stream)
250-
return originProxy.ProxyHTTP(w, tracedReq, request.Type == quicpogs.ConnectionTypeWebsocket)
266+
return originProxy.ProxyHTTP(&w, tracedReq, request.Type == quicpogs.ConnectionTypeWebsocket), w.connectResponseSent
251267

252268
case quicpogs.ConnectionTypeTCP:
253-
rwa := &streamReadWriteAcker{stream}
269+
rwa := &streamReadWriteAcker{RequestServerStream: stream}
254270
metadata := request.MetadataMap()
255271
return originProxy.ProxyTCP(ctx, rwa, &TCPRequest{
256272
Dest: request.Dest,
257273
FlowID: metadata[QUICMetadataFlowID],
258274
CfTraceID: metadata[tracing.TracerContextName],
259-
})
275+
}), rwa.connectResponseSent
276+
default:
277+
return errors.Errorf("unsupported error type: %s", request.Type), false
260278
}
261-
return nil
262279
}
263280

264281
func (q *QUICConnection) handleRPCStream(rpcStream *quicpogs.RPCServerStream) error {
265-
return rpcStream.Serve(q, q, q.logger)
282+
if err := rpcStream.Serve(q, q, q.logger); err != nil {
283+
q.logger.Err(err).Msg("failed handling RPC stream")
284+
}
285+
286+
return nil
266287
}
267288

268289
// RegisterUdpSession is the RPC method invoked by edge to register and run a session
@@ -357,6 +378,7 @@ func (q *QUICConnection) UpdateConfiguration(ctx context.Context, version int32,
357378
// the client.
358379
type streamReadWriteAcker struct {
359380
*quicpogs.RequestServerStream
381+
connectResponseSent bool
360382
}
361383

362384
// AckConnection acks response back to the proxy.
@@ -365,23 +387,25 @@ func (s *streamReadWriteAcker) AckConnection(tracePropagation string) error {
365387
Key: tracing.CanonicalCloudflaredTracingHeader,
366388
Val: tracePropagation,
367389
}
390+
s.connectResponseSent = true
368391
return s.WriteConnectResponseData(nil, metadata)
369392
}
370393

371394
// httpResponseAdapter translates responses written by the HTTP Proxy into ones that can be used in QUIC.
372395
type httpResponseAdapter struct {
373396
*quicpogs.RequestServerStream
397+
connectResponseSent bool
374398
}
375399

376400
func newHTTPResponseAdapter(s *quicpogs.RequestServerStream) httpResponseAdapter {
377-
return httpResponseAdapter{s}
401+
return httpResponseAdapter{RequestServerStream: s}
378402
}
379403

380-
func (hrw httpResponseAdapter) AddTrailer(trailerName, trailerValue string) {
404+
func (hrw *httpResponseAdapter) AddTrailer(trailerName, trailerValue string) {
381405
// we do not support trailers over QUIC
382406
}
383407

384-
func (hrw httpResponseAdapter) WriteRespHeaders(status int, header http.Header) error {
408+
func (hrw *httpResponseAdapter) WriteRespHeaders(status int, header http.Header) error {
385409
metadata := make([]quicpogs.Metadata, 0)
386410
metadata = append(metadata, quicpogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(status)})
387411
for k, vv := range header {
@@ -390,13 +414,19 @@ func (hrw httpResponseAdapter) WriteRespHeaders(status int, header http.Header)
390414
metadata = append(metadata, quicpogs.Metadata{Key: httpHeaderKey, Val: v})
391415
}
392416
}
417+
393418
return hrw.WriteConnectResponseData(nil, metadata...)
394419
}
395420

396-
func (hrw httpResponseAdapter) WriteErrorResponse(err error) {
421+
func (hrw *httpResponseAdapter) WriteErrorResponse(err error) {
397422
hrw.WriteConnectResponseData(err, quicpogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(http.StatusBadGateway)})
398423
}
399424

425+
func (hrw *httpResponseAdapter) WriteConnectResponseData(respErr error, metadata ...quicpogs.Metadata) error {
426+
hrw.connectResponseSent = true
427+
return hrw.RequestServerStream.WriteConnectResponseData(respErr, metadata...)
428+
}
429+
400430
func buildHTTPRequest(
401431
ctx context.Context,
402432
connectRequest *quicpogs.ConnectRequest,

proxy/proxy.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,9 @@ func (p *Proxy) proxyHTTPRequest(
261261
return nil
262262
}
263263

264-
_, _ = cfio.Copy(w, resp.Body)
264+
if _, err = cfio.Copy(w, resp.Body); err != nil {
265+
return err
266+
}
265267

266268
// copy trailers
267269
copyTrailers(w, resp)

proxy/proxy_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func testProxySSE(proxy connection.OriginProxy) func(t *testing.T) {
261261
defer wg.Done()
262262
log := zerolog.Nop()
263263
err = proxy.ProxyHTTP(responseWriter, tracing.NewTracedHTTPRequest(req, &log), false)
264-
require.NoError(t, err)
264+
require.Equal(t, err.Error(), "context canceled")
265265

266266
require.Equal(t, http.StatusOK, responseWriter.Code)
267267
}()

0 commit comments

Comments
 (0)