Skip to content

Commit ee00307

Browse files
committed
Event sending thread logic and shutdown behaviour
1 parent c84a878 commit ee00307

File tree

3 files changed

+75
-31
lines changed

3 files changed

+75
-31
lines changed

client.go

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,22 @@ package eventsource
33
import (
44
"io"
55
"net/http"
6+
"sync"
67
)
78

89
type Client struct {
9-
flush http.Flusher
10-
write io.Writer
10+
flush http.Flusher
11+
write io.Writer
12+
close http.CloseNotifier
13+
events chan *Event
14+
closed bool
15+
waiter sync.WaitGroup
1116
}
1217

1318
func NewClient(w http.ResponseWriter) *Client {
1419
c := &Client{
15-
write: w,
20+
events: make(chan *Event, 1),
21+
write: w,
1622
}
1723

1824
// Check to ensure we support flushing
@@ -22,14 +28,68 @@ func NewClient(w http.ResponseWriter) *Client {
2228
}
2329
c.flush = flush
2430

31+
// Check to ensure we support close notifications
32+
closer, ok := w.(http.CloseNotifier)
33+
if !ok {
34+
return nil
35+
}
36+
c.close = closer
37+
38+
// start the sending thread
39+
c.waiter.Add(1)
40+
go c.run()
2541
return c
2642
}
2743

28-
func (c *Client) Send(ev *Event) {
29-
io.Copy(c.write, ev)
30-
c.flush.Flush()
44+
// Send queues an event to be sent to the client.
45+
// This does not block until the event has been sent.
46+
// Returns an error if the Client has disconnected
47+
func (c *Client) Send(ev *Event) error {
48+
if c.closed {
49+
return io.ErrClosedPipe
50+
}
51+
c.events <- ev
52+
return nil
53+
}
54+
55+
// Shutdown terminates a client connection
56+
func (c *Client) Shutdown() {
57+
close(c.events)
58+
c.waiter.Wait()
3159
}
3260

61+
// Wait blocks and waits for the client to be shutdown.
62+
// Call this is http handler threads to prevent the server from closing
63+
// the client connection.
64+
func (c *Client) Wait() {
65+
c.waiter.Wait()
66+
}
67+
68+
// Worker thread for the client responsible for writing events
3369
func (c *Client) run() {
3470

71+
for {
72+
select {
73+
case ev, ok := <-c.events:
74+
// check for shutdown
75+
if !ok {
76+
c.closed = true
77+
c.waiter.Done()
78+
return
79+
}
80+
81+
// send the event
82+
io.Copy(c.write, ev)
83+
c.flush.Flush()
84+
85+
case _ = <-c.close.CloseNotify():
86+
c.closed = true
87+
// empty the event channel
88+
for _, ok := <-c.events; ok != false; {
89+
}
90+
c.waiter.Done()
91+
return
92+
}
93+
94+
}
3595
}

event.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type Event struct {
1717
bufSet bool
1818
}
1919

20-
// DataEvent creates a new Event with the data set
20+
// DataEvent creates a new Event with the data field set
2121
func DataEvent(data string) *Event {
2222
e := &Event{}
2323
io.WriteString(e, data)

stream.go

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
type Stream struct {
1010
clients list.List
1111
listLock sync.RWMutex
12-
broadcast chan *Event
1312
shutdownWait sync.WaitGroup
1413
clientConnectHook func(*http.Request, *Client)
1514
}
@@ -21,7 +20,6 @@ type registeredClient struct {
2120

2221
func New() *Stream {
2322
s := &Stream{}
24-
go s.run()
2523
return s
2624
}
2725

@@ -82,10 +80,17 @@ func (s *Stream) Publish(topic string, e *Event) {
8280
}
8381
}
8482

83+
// Shutdown terminates all clients connected to the stream and removes them
8584
func (s *Stream) Shutdown() {
86-
85+
for element := s.clients.Front(); element != nil; element.Next() {
86+
cli := element.Value.(*registeredClient)
87+
cli.c.Shutdown()
88+
s.clients.Remove(element)
89+
}
8790
}
8891

92+
// CloseTopic removes all client associations with this topic, but does not
93+
// terminate them or remove
8994
func (s *Stream) CloseTopic(topic string) {
9095

9196
}
@@ -104,27 +109,6 @@ func (s *Stream) ClientConnectHook(fn func(*http.Request, *Client)) {
104109
s.clientConnectHook = fn
105110
}
106111

107-
func (s *Stream) run() {
108-
109-
for {
110-
select {
111-
112-
case ev, ok := <-s.broadcast:
113-
114-
// end of the broadcast channel indicates a shutdown
115-
if !ok {
116-
//s.closeAll()
117-
s.shutdownWait.Done()
118-
return
119-
}
120-
121-
// otherwise normal message
122-
s.sendAll(ev)
123-
124-
}
125-
}
126-
}
127-
128112
func (s *Stream) sendAll(ev *Event) {
129113

130114
}

0 commit comments

Comments
 (0)