Skip to content

Commit 46bdabf

Browse files
authored
Merge pull request #6 from launchdarkly/eb/ch81628/stream-deps
revise streaming HTTP helpers for cleaner implementation and fewer dependencies
2 parents f9d0cb0 + f9b3192 commit 46bdabf

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+20548
-648
lines changed

Gopkg.lock

Lines changed: 0 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,6 @@
2424
# go-tests = true
2525
# unused-packages = true
2626

27-
28-
[[constraint]]
29-
name = "github.com/launchdarkly/eventsource"
30-
version = "1.2.0"
31-
3227
[[constraint]]
3328
name = "github.com/stretchr/testify"
3429
version = "1.5.1"

httphelpers/clients_test.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,6 @@ func TestClientFromHandler(t *testing.T) {
2222
}
2323

2424
func TestClientFromHandlerConvertsPanicToError(t *testing.T) {
25-
myError := errors.New("sorry")
26-
handler := PanicHandler(myError)
27-
client := ClientFromHandler(handler)
28-
29-
resp, err := client.Get("/")
30-
31-
expectedError := &url.Error{Op: "Get", URL: "/", Err: myError}
32-
require.Error(t, err)
33-
require.Nil(t, resp)
34-
assert.Equal(t, expectedError, err)
35-
}
36-
37-
func TestClientFromHandlerConvertsPanicWithArbitraryValueToError(t *testing.T) {
3825
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3926
panic("sorry")
4027
})

httphelpers/handlers.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"io/ioutil"
66
"log"
77
"net/http"
8+
"net/http/httptest"
89
"regexp"
910
)
1011

@@ -162,16 +163,28 @@ func SequentialHandler(firstHandler http.Handler, remainingHandlers ...http.Hand
162163
})
163164
}
164165

165-
// PanicHandler creates an HTTP handler that will panic.
166+
// BrokenConnectionHandler creates an HTTP handler that will simulate an I/O error.
166167
//
167-
// This is not useful inside an actual HTTP server, but when used in conjunction with NewHTTPClientFromHandler,
168-
// it allows you to simulate an I/O error.
168+
// When used with an httptest.Server, the handler forces an early close of the connection.
169+
// When used in a client created with ClientFromHandler, it causes a panic which is recovered
170+
// and converted to an error result. However, do not use this with httptest.ResponseRecorder
171+
// or your test will panic.
169172
//
170-
// handler := PanicHandler(&net.AddrError{})
173+
// handler := BrokenConnectionHandler()
171174
// client := NewClientFromHandler(handler)
172-
// // All requests made with this client will return an AddrError (or some other error that wraps that error)
173-
func PanicHandler(err error) http.Handler {
175+
// // All requests made with this client will return an error
176+
func BrokenConnectionHandler() http.Handler {
174177
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
175-
panic(err)
178+
if _, ok := w.(*httptest.ResponseRecorder); ok {
179+
panic("httphelpers.BrokenConnectionHandler cannot be used with a ResponseRecorder")
180+
}
181+
if h, ok := w.(http.Hijacker); ok {
182+
conn, _, err := h.Hijack()
183+
if err == nil {
184+
_ = conn.Close()
185+
return
186+
}
187+
}
188+
panic("connection deliberately closed by httphelpers.BrokenConnectionHandler")
176189
})
177190
}

httphelpers/handlers_sse.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package httphelpers
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"net/http"
7+
)
8+
9+
// SSEEvent is a simple representation of a Server-Sent Events message.
10+
type SSEEvent struct {
11+
// ID is the optional unique ID of the event.
12+
ID string
13+
14+
// Event is the message type, if any.
15+
Event string
16+
17+
// Data is the message data.
18+
Data string
19+
}
20+
21+
// Bytes returns the stream data for the event.
22+
func (e SSEEvent) Bytes() []byte {
23+
var buf bytes.Buffer
24+
if e.ID != "" {
25+
buf.WriteString(fmt.Sprintf("id: %s\n", e.ID))
26+
}
27+
if e.Event != "" {
28+
buf.WriteString(fmt.Sprintf("event: %s\n", e.Event))
29+
}
30+
buf.WriteString(fmt.Sprintf("data: %s\n\n", e.Data))
31+
return buf.Bytes()
32+
}
33+
34+
// SSEStreamControl is the interface for manipulating streams created by SSEHandler.
35+
type SSEStreamControl interface {
36+
// Enqueue is the same as Send, except that if there are currently no open connections to this
37+
// endpoint, the event is enqueued and will be sent to the next client that connects.
38+
Enqueue(event SSEEvent)
39+
40+
// Send sends an SSE event. If there are multiple open connections to this endpoint, the same
41+
// event is sent to all of them. If there are no open connections, the event is discarded.
42+
Send(event SSEEvent)
43+
44+
// EndAll terminates any existing connections to this endpoint, but allows new connections
45+
// afterward.
46+
EndAll()
47+
48+
// Close terminates any existing connections to this endpoint and causes the handler to reject any
49+
// subsequent connections.
50+
Close() error
51+
}
52+
53+
type sseStreamControlImpl struct {
54+
streamControl StreamControl
55+
}
56+
57+
// SSEHandler creates an HTTP handler that streams Server-Sent Events data.
58+
//
59+
// The initialData parameter, if not nil, specifies a starting event that should always be sent to any
60+
// client that has connected to this endpoint. Then, any data provided via the SSEStreamControl interface
61+
// is copied to all connected clients. Connections remain open until either EndAll or Close is called on
62+
// on the SSEStreamControl.
63+
//
64+
// In this example, every request to this endpoint will receive an initial initEvent, and then another
65+
// event will be sent every second with a counter; every 30 seconds, all active stream connections are
66+
// are closed:
67+
//
68+
// initialEvent := httphelpers.SSEEvent{Data: "hello"}
69+
// handler, stream := httphelpers.SSEHandler(&initialEvent)
70+
// (start server with handler)
71+
// go func() {
72+
// n := 1
73+
// counter := time.NewTicker(time.Second)
74+
// interrupter := time.NewTicker(time.Second * 10)
75+
// for {
76+
// select {
77+
// case <-counter.C:
78+
// stream.Send(httphelpers.SSEEvent{Data: fmt.Sprintf("%d\n", n)})
79+
// n++
80+
// case <-interrupter.C:
81+
// stream.EndAll()
82+
// }
83+
// }
84+
// }()
85+
func SSEHandler(initialEvent *SSEEvent) (http.Handler, SSEStreamControl) {
86+
var initialData []byte
87+
if initialEvent != nil {
88+
initialData = initialEvent.Bytes()
89+
}
90+
handler, streamControl := ChunkedStreamingHandler(initialData, "text/event-stream; charset=utf-8")
91+
return handler, &sseStreamControlImpl{streamControl}
92+
}
93+
94+
func (s *sseStreamControlImpl) Enqueue(event SSEEvent) {
95+
s.streamControl.Enqueue(event.Bytes())
96+
}
97+
98+
func (s *sseStreamControlImpl) Send(event SSEEvent) {
99+
s.streamControl.Send(event.Bytes())
100+
}
101+
102+
func (s *sseStreamControlImpl) EndAll() {
103+
s.streamControl.EndAll()
104+
}
105+
106+
func (s *sseStreamControlImpl) Close() error {
107+
return s.streamControl.Close()
108+
}

httphelpers/handlers_sse_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package httphelpers
2+
3+
import (
4+
"io/ioutil"
5+
"net/http"
6+
"net/http/httptest"
7+
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestSSEHandler(t *testing.T) {
14+
initialEvent := SSEEvent{"id1", "event1", "data1"}
15+
handler, stream := SSEHandler(&initialEvent)
16+
defer stream.Close()
17+
18+
stream.Enqueue(SSEEvent{"", "event2", "data2"})
19+
stream.Send(SSEEvent{"", "", "this isn't sent becauset here are no connections"})
20+
21+
WithServer(handler, func(server *httptest.Server) {
22+
resp1, err := http.DefaultClient.Get(server.URL)
23+
require.NoError(t, err)
24+
defer resp1.Body.Close()
25+
26+
assert.Equal(t, 200, resp1.StatusCode)
27+
assert.Equal(t, "text/event-stream; charset=utf-8", resp1.Header.Get("Content-Type"))
28+
29+
stream.Enqueue(SSEEvent{"", "event3", "data3"})
30+
stream.EndAll()
31+
32+
data, err := ioutil.ReadAll(resp1.Body)
33+
34+
assert.NoError(t, err)
35+
assert.Equal(t, `id: id1
36+
event: event1
37+
data: data1
38+
39+
event: event2
40+
data: data2
41+
42+
event: event3
43+
data: data3
44+
45+
`, string(data))
46+
})
47+
}

0 commit comments

Comments
 (0)