Skip to content

Commit 0be37ff

Browse files
committed
Locking and removing logic
1 parent 519342b commit 0be37ff

File tree

2 files changed

+33
-13
lines changed

2 files changed

+33
-13
lines changed

client.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,6 @@ func (c *Client) run() {
9090

9191
case _ = <-c.close.CloseNotify():
9292
c.closed = true
93-
// empty the event channel
94-
for _, ok := <-c.events; ok != false; {
95-
}
9693
c.waiter.Done()
9794
return
9895
}

stream.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,26 @@ func (s *Stream) Register(c *Client) {
3131
s.addClient(c)
3232
}
3333

34+
// Remove will remove a client from this stream, but not shut the client down.
35+
func (s *Stream) Remove(c *Client) {
36+
s.listLock.Lock()
37+
defer s.listLock.Unlock()
38+
39+
for element := s.clients.Front(); element != nil; element = element.Next() {
40+
if regCli := element.Value.(*registeredClient); regCli.c == c {
41+
// client found
42+
s.clients.Remove(element)
43+
return
44+
}
45+
}
46+
}
47+
3448
// Broadcast sends the event to all clients registered on this stream.
3549
func (s *Stream) Broadcast(e *Event) {
50+
s.listLock.RLock()
51+
defer s.listLock.RUnlock()
3652

37-
for element := s.clients.Front(); element != nil; element.Next() {
53+
for element := s.clients.Front(); element != nil; element = element.Next() {
3854
cli := element.Value.(*registeredClient)
3955
cli.c.Send(e)
4056
}
@@ -44,7 +60,6 @@ func (s *Stream) Broadcast(e *Event) {
4460
// to this topic. Subscribe will also Register an unregistered
4561
// client.
4662
func (s *Stream) Subscribe(topic string, c *Client) {
47-
4863
// see if the client is registered
4964
cli := s.getClient(c)
5065

@@ -58,6 +73,7 @@ func (s *Stream) Subscribe(topic string, c *Client) {
5873

5974
// Unsubscribe removes clients from the topic, but not from broadcasts.
6075
func (s *Stream) Unsubscribe(topic string, c *Client) {
76+
6177
cli := s.getClient(c)
6278
if cli == nil {
6379
return
@@ -67,7 +83,10 @@ func (s *Stream) Unsubscribe(topic string, c *Client) {
6783

6884
// Publish sends the event to clients that have subscribed to the given topic.
6985
func (s *Stream) Publish(topic string, e *Event) {
70-
for element := s.clients.Front(); element != nil; element.Next() {
86+
s.listLock.RLock()
87+
defer s.listLock.Unlock()
88+
89+
for element := s.clients.Front(); element != nil; element = element.Next() {
7190
cli := element.Value.(*registeredClient)
7291
if cli.topics[topic] {
7392
cli.c.Send(e)
@@ -77,7 +96,10 @@ func (s *Stream) Publish(topic string, e *Event) {
7796

7897
// Shutdown terminates all clients connected to the stream and removes them
7998
func (s *Stream) Shutdown() {
80-
for element := s.clients.Front(); element != nil; element.Next() {
99+
s.listLock.Lock()
100+
defer s.listLock.Unlock()
101+
102+
for element := s.clients.Front(); element != nil; element = element.Next() {
81103
cli := element.Value.(*registeredClient)
82104
cli.c.Shutdown()
83105
s.clients.Remove(element)
@@ -110,6 +132,7 @@ func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
110132
// wait for the client to exit or be shutdown
111133
s.Register(c)
112134
c.Wait()
135+
s.Remove(c)
113136
}
114137

115138
// TopicHandler returns an HTTP handler that will register a client for broadcasts
@@ -140,6 +163,7 @@ func (s *Stream) TopicHandler(topics []string) http.HandlerFunc {
140163

141164
// wait for the client to exit or be shutdown
142165
c.Wait()
166+
s.Remove(c)
143167
}
144168
}
145169

@@ -155,18 +179,17 @@ func checkRequest(r *http.Request) bool {
155179
}
156180

157181
func (s *Stream) getClient(c *Client) *registeredClient {
158-
if s.clients.Len() > 0 {
159-
// ensure client is not already registered
160-
s.listLock.RLock()
182+
s.listLock.RLock()
183+
defer s.listLock.RUnlock()
161184

162-
listItem := s.clients.Front()
163-
if regCli := listItem.Value.(*registeredClient); regCli.c == c {
185+
for element := s.clients.Front(); element != nil; element = element.Next() {
186+
if regCli := element.Value.(*registeredClient); regCli.c == c {
164187
// client found
165-
s.listLock.RUnlock()
166188
return regCli
167189
}
168190
}
169191

192+
// not found
170193
return nil
171194
}
172195

0 commit comments

Comments
 (0)