Skip to content

Commit 76fb329

Browse files
committed
TUN-5724: Fix SSE streaming by guaranteeing we write everything we read
1 parent 7bac4b1 commit 76fb329

File tree

2 files changed

+32
-6
lines changed

2 files changed

+32
-6
lines changed

origin/proxy.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -271,11 +271,20 @@ func (wr *bidirectionalStream) Write(p []byte) (n int, err error) {
271271
func (p *Proxy) writeEventStream(w connection.ResponseWriter, respBody io.ReadCloser) {
272272
reader := bufio.NewReader(respBody)
273273
for {
274-
line, err := reader.ReadBytes('\n')
275-
if err != nil {
276-
break
274+
line, readErr := reader.ReadBytes('\n')
275+
276+
// We first try to write whatever we read even if an error occurred
277+
// The reason for doing it is to guarantee we really push everything to the eyeball side
278+
// before returning
279+
if len(line) > 0 {
280+
if _, writeErr := w.Write(line); writeErr != nil {
281+
return
282+
}
283+
}
284+
285+
if readErr != nil {
286+
return
277287
}
278-
_, _ = w.Write(line)
279288
}
280289
}
281290

origin/proxy_test.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"net"
1010
"net/http"
1111
"net/http/httptest"
12+
"strings"
1213
"sync"
1314
"testing"
1415
"time"
@@ -138,6 +139,7 @@ func TestProxySingleOrigin(t *testing.T) {
138139
t.Run("testProxyHTTP", testProxyHTTP(proxy))
139140
t.Run("testProxyWebsocket", testProxyWebsocket(proxy))
140141
t.Run("testProxySSE", testProxySSE(proxy))
142+
t.Run("testProxySSEAllData", testProxySSEAllData(proxy))
141143
cancel()
142144
wg.Wait()
143145
}
@@ -256,6 +258,21 @@ func testProxySSE(proxy connection.OriginProxy) func(t *testing.T) {
256258
}
257259
}
258260

261+
// Regression test to guarantee that we always write the contents downstream even if EOF is reached without
262+
// hitting the delimiter
263+
func testProxySSEAllData(proxy *Proxy) func(t *testing.T) {
264+
return func(t *testing.T) {
265+
eyeballReader := io.NopCloser(strings.NewReader("data\r\r"))
266+
responseWriter := newMockSSERespWriter()
267+
268+
// responseWriter uses an unbuffered channel, so we call in a different go-routine
269+
go proxy.writeEventStream(responseWriter, eyeballReader)
270+
271+
result := string(<-responseWriter.writeNotification)
272+
require.Equal(t, "data\r\r", result)
273+
}
274+
}
275+
259276
func TestProxyMultipleOrigins(t *testing.T) {
260277
api := httptest.NewServer(mockAPI{})
261278
defer api.Close()
@@ -447,7 +464,7 @@ func TestConnections(t *testing.T) {
447464
// eyeball connection type.
448465
connectionType connection.Type
449466

450-
//requestheaders to be sent in the call to proxy.Proxy
467+
// requestheaders to be sent in the call to proxy.Proxy
451468
requestHeaders http.Header
452469
}
453470

@@ -508,7 +525,7 @@ func TestConnections(t *testing.T) {
508525
args: args{
509526
ingressServiceScheme: "ws://",
510527
originService: runEchoWSService,
511-
//eyeballResponseWriter gets set after roundtrip dial.
528+
// eyeballResponseWriter gets set after roundtrip dial.
512529
eyeballRequestBody: newPipedWSRequestBody([]byte("test3")),
513530
warpRoutingService: ingress.NewWarpRoutingService(),
514531
requestHeaders: map[string][]string{

0 commit comments

Comments
 (0)