Skip to content

Commit 519342b

Browse files
committed
HTTP handlers implemented
1 parent ee00307 commit 519342b

File tree

2 files changed

+61
-11
lines changed

2 files changed

+61
-11
lines changed

client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ func NewClient(w http.ResponseWriter) *Client {
3535
}
3636
c.close = closer
3737

38+
// Send the initial headers
39+
w.Header().Set("Content-Type", "text/event-stream")
40+
w.Header().Set("Cache-Control", "no-cache")
41+
w.Header().Set("Connection", "keep-alive")
42+
flush.Flush()
43+
3844
// start the sending thread
3945
c.waiter.Add(1)
4046
go c.run()

stream.go

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,6 @@ type registeredClient struct {
1818
topics map[string]bool
1919
}
2020

21-
func New() *Stream {
22-
s := &Stream{}
23-
return s
24-
}
25-
2621
// Register adds a client to the stream to receive all broadcast
2722
// messages. Has no effect if the client is already registered.
2823
func (s *Stream) Register(c *Client) {
@@ -95,12 +90,57 @@ func (s *Stream) CloseTopic(topic string) {
9590

9691
}
9792

93+
// ServeHTTP takes a client connection, registers it for broadcasts,
94+
// then blocks so long as the connection is alive.
9895
func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
9996

97+
// ensure the client accepts an event-stream
98+
if !checkRequest(r) {
99+
http.Error(w, "This is an EventStream endpoint", http.StatusNotAcceptable)
100+
return
101+
}
102+
103+
// create the client
104+
c := NewClient(w)
105+
if c == nil {
106+
http.Error(w, "EventStream not supported for this connection", http.StatusInternalServerError)
107+
return
108+
}
109+
110+
// wait for the client to exit or be shutdown
111+
s.Register(c)
112+
c.Wait()
100113
}
101114

102-
func (s *Stream) TopicHandler(topic string) http.HandlerFunc {
115+
// TopicHandler returns an HTTP handler that will register a client for broadcasts
116+
// and for any topics, and then block so long as they are connected
117+
func (s *Stream) TopicHandler(topics []string) http.HandlerFunc {
103118

119+
return func(w http.ResponseWriter, r *http.Request) {
120+
// ensure the client accepts an event-stream
121+
if !checkRequest(r) {
122+
http.Error(w, "This is an EventStream endpoint", http.StatusNotAcceptable)
123+
return
124+
}
125+
126+
// create the client
127+
c := NewClient(w)
128+
if c == nil {
129+
http.Error(w, "EventStream not supported for this connection", http.StatusInternalServerError)
130+
return
131+
}
132+
133+
// broadcasts
134+
s.Register(c)
135+
136+
// topics
137+
for _, topic := range topics {
138+
s.Subscribe(topic, c)
139+
}
140+
141+
// wait for the client to exit or be shutdown
142+
c.Wait()
143+
}
104144
}
105145

106146
// Register a function to be called when a client connects to this stream's
@@ -109,8 +149,9 @@ func (s *Stream) ClientConnectHook(fn func(*http.Request, *Client)) {
109149
s.clientConnectHook = fn
110150
}
111151

112-
func (s *Stream) sendAll(ev *Event) {
113-
152+
// Checks that a client expects an event-stream
153+
func checkRequest(r *http.Request) bool {
154+
return r.Header.Get("Accept") == "text/event-stream"
114155
}
115156

116157
func (s *Stream) getClient(c *Client) *registeredClient {
@@ -130,12 +171,15 @@ func (s *Stream) getClient(c *Client) *registeredClient {
130171
}
131172

132173
func (s *Stream) addClient(c *Client) *registeredClient {
133-
s.listLock.Lock()
134174

135-
s.clients.PushBack(&registeredClient{
175+
cli := &registeredClient{
136176
c: c,
137177
topics: make(map[string]bool),
138-
})
178+
}
139179

180+
s.listLock.Lock()
181+
s.clients.PushBack(cli)
140182
s.listLock.Unlock()
183+
184+
return cli
141185
}

0 commit comments

Comments
 (0)