Skip to content

Commit 73a9980

Browse files
committed
TUN-9255: Improve flush on write conditions in http2 tunnel type to match what is done on the edge
## Summary We have adapted our edge services to better know when they should flush on write. This is an important feature to ensure response types like Server Side Events are not buffered, and instead are propagated to the eyeball as soon as possible. This commit implements a similar logic for http2 tunnel protocol that we use in our edge services. By adding the new events stream header for json `application/x-ndjson` and using the content-length and transfer-encoding headers as well, following the RFC's: - https://datatracker.ietf.org/doc/html/rfc7230#section-4.1 - https://datatracker.ietf.org/doc/html/rfc9112#section-6.1 Closes TUN-9255
1 parent 86e8585 commit 73a9980

File tree

2 files changed

+73
-5
lines changed

2 files changed

+73
-5
lines changed

connection/connection.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,20 @@ const (
2626
MaxGracePeriod = time.Minute * 3
2727
MaxConcurrentStreams = math.MaxUint32
2828

29-
contentTypeHeader = "content-type"
30-
sseContentType = "text/event-stream"
31-
grpcContentType = "application/grpc"
29+
contentTypeHeader = "content-type"
30+
contentLengthHeader = "content-length"
31+
transferEncodingHeader = "transfer-encoding"
32+
33+
sseContentType = "text/event-stream"
34+
grpcContentType = "application/grpc"
35+
sseJsonContentType = "application/x-ndjson"
36+
37+
chunkTransferEncoding = "chunked"
3238
)
3339

3440
var (
3541
switchingProtocolText = fmt.Sprintf("%d %s", http.StatusSwitchingProtocols, http.StatusText(http.StatusSwitchingProtocols))
36-
flushableContentTypes = []string{sseContentType, grpcContentType}
42+
flushableContentTypes = []string{sseContentType, grpcContentType, sseJsonContentType}
3743
)
3844

3945
// TunnelConnection represents the connection to the edge.
@@ -274,6 +280,22 @@ type ConnectedFuse interface {
274280
// Helper method to let the caller know what content-types should require a flush on every
275281
// write to a ResponseWriter.
276282
func shouldFlush(headers http.Header) bool {
283+
// When doing Server Side Events (SSE), some frameworks don't respect the `Content-Type` header.
284+
// Therefore, we need to rely on other ways to know whether we should flush on write or not. A good
285+
// approach is to assume that responses without `Content-Length` or with `Transfer-Encoding: chunked`
286+
// are streams, and therefore, should be flushed right away to the eyeball.
287+
// References:
288+
// - https://datatracker.ietf.org/doc/html/rfc7230#section-4.1
289+
// - https://datatracker.ietf.org/doc/html/rfc9112#section-6.1
290+
if contentLength := headers.Get(contentLengthHeader); contentLength == "" {
291+
return true
292+
}
293+
if transferEncoding := headers.Get(transferEncodingHeader); transferEncoding != "" {
294+
transferEncoding = strings.ToLower(transferEncoding)
295+
if strings.Contains(transferEncoding, chunkTransferEncoding) {
296+
return true
297+
}
298+
}
277299
if contentType := headers.Get(contentTypeHeader); contentType != "" {
278300
contentType = strings.ToLower(contentType)
279301
for _, c := range flushableContentTypes {
@@ -282,7 +304,6 @@ func shouldFlush(headers http.Header) bool {
282304
}
283305
}
284306
}
285-
286307
return false
287308
}
288309

connection/connection_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ import (
77
"io"
88
"math/big"
99
"net/http"
10+
"testing"
1011
"time"
1112

1213
pkgerrors "github.com/pkg/errors"
1314
"github.com/rs/zerolog"
15+
"github.com/stretchr/testify/require"
1416

1517
cfdflow "github.com/cloudflare/cloudflared/flow"
1618

@@ -209,3 +211,48 @@ func (mcf mockConnectedFuse) Connected() {}
209211
func (mcf mockConnectedFuse) IsConnected() bool {
210212
return true
211213
}
214+
215+
func TestShouldFlushHeaders(t *testing.T) {
216+
tests := []struct {
217+
headers map[string]string
218+
shouldFlush bool
219+
}{
220+
{
221+
headers: map[string]string{contentTypeHeader: "application/json", contentLengthHeader: "1"},
222+
shouldFlush: false,
223+
},
224+
{
225+
headers: map[string]string{contentTypeHeader: "text/html", contentLengthHeader: "1"},
226+
shouldFlush: false,
227+
},
228+
{
229+
headers: map[string]string{contentTypeHeader: "text/event-stream", contentLengthHeader: "1"},
230+
shouldFlush: true,
231+
},
232+
{
233+
headers: map[string]string{contentTypeHeader: "application/grpc", contentLengthHeader: "1"},
234+
shouldFlush: true,
235+
},
236+
{
237+
headers: map[string]string{contentTypeHeader: "application/x-ndjson", contentLengthHeader: "1"},
238+
shouldFlush: true,
239+
},
240+
{
241+
headers: map[string]string{contentTypeHeader: "application/json"},
242+
shouldFlush: true,
243+
},
244+
{
245+
headers: map[string]string{contentTypeHeader: "application/json", contentLengthHeader: "-1", transferEncodingHeader: "chunked"},
246+
shouldFlush: true,
247+
},
248+
}
249+
250+
for _, test := range tests {
251+
headers := http.Header{}
252+
for k, v := range test.headers {
253+
headers.Add(k, v)
254+
}
255+
256+
require.Equal(t, test.shouldFlush, shouldFlush(headers))
257+
}
258+
}

0 commit comments

Comments
 (0)