Skip to content

Commit b3681ab

Browse files
committed
Replaced linked list with map
Much better performance, simpler code
1 parent 84da568 commit b3681ab

File tree

2 files changed

+33
-69
lines changed

2 files changed

+33
-69
lines changed

README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ Swiss Army Knife for SSE in Golang
55
So you want to publish events to client that connect to your server?
66
```go
77
func main() {
8-
stream := &eventsource.Stream{}
8+
stream := eventsource.NewStream()
99

1010
go func(s *eventsource.Stream) {
1111
for {
@@ -35,8 +35,8 @@ stream.Broadcast(tornadoWarningEvent)
3535
You can also just create multiple `Stream` objects much to the same effect, then only use `Broadcast`. Streams are cheap and run no background routines, so this is a valid pattern.
3636

3737
```go
38-
weatherStream := &eventstream.Stream{}
39-
lotteryStream := &eventstream.Stream{}
38+
weatherStream := eventstream.NewStream()
39+
lotteryStream := eventstream.NewStream()
4040

4141
weatherStream.Register(clientPlanningHikes)
4242
lotteryStream.Register(soonToBePoorClient)
@@ -49,7 +49,7 @@ lotteryStream.Broadcast(everyoneLosesEvent)
4949

5050
Use `TopicHandler` to create another handler for that stream that will subscribe clients to topics as well as broadcasts.
5151
```go
52-
stream := &eventsource.Stream{}
52+
stream := eventsource.NewStream()
5353
catsHandler := stream.TopicHandler([]string{"cat"})
5454

5555
http.ListenAndServe(":8080", catsHandler)
@@ -62,7 +62,7 @@ Use the stream's `Register`, `Subscribe`, `Remove`, `Unsubscribe`, and `CloseTop
6262
Register a callback for the `Stream` to envoke everytime a new client connects with `Stream.ClientConnectHook`. It'll give you a handle to the resulting Client and the http request that created it, letting you do whatever you please.
6363

6464
```go
65-
stream := &eventsource.Stream{}
65+
stream := eventsource.NewStream()
6666
stream.ClientConnectHook(func(r *http.Requset, c *eventsource.Client){
6767
fmt.Println("Recieved connection from", r.Host)
6868
fmt.Println("Hate that guy")

stream.go

Lines changed: 28 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ connections and disconnectinos.
1919
A quick example of a simple sever that broadcasts a "tick" event every second
2020
2121
func main() {
22-
stream := &eventsource.Stream{}
22+
stream := eventsource.NewStream()
2323
go func(s *eventsource.Stream) {
2424
for {
2525
time.Sleep(time.Second)
@@ -33,7 +33,6 @@ A quick example of a simple sever that broadcasts a "tick" event every second
3333
package eventsource
3434

3535
import (
36-
"container/list"
3736
"net/http"
3837
"sync"
3938
)
@@ -43,52 +42,49 @@ import (
4342
// A stream also implements an http.Handler to easily register incoming
4443
// http requests as new clients.
4544
type Stream struct {
46-
clients list.List
45+
clients map[*Client]topicList
4746
listLock sync.RWMutex
4847
shutdownWait sync.WaitGroup
4948
clientConnectHook func(*http.Request, *Client)
5049
}
5150

52-
type registeredClient struct {
53-
c *Client
54-
topics map[string]bool
51+
type topicList map[string]bool
52+
53+
// NewStream creates a new stream object
54+
func NewStream() *Stream {
55+
return &Stream{
56+
clients: make(map[*Client]topicList),
57+
}
5558
}
5659

5760
// Register adds a client to the stream to receive all broadcast
5861
// messages. Has no effect if the client is already registered.
5962
func (s *Stream) Register(c *Client) {
6063

6164
// see if the client has been registered
62-
if cli := s.getClient(c); cli != nil {
65+
if _, found := s.clients[c]; found {
6366
return
6467
}
6568

6669
// append new client
67-
s.addClient(c)
70+
s.clients[c] = make(topicList)
6871
}
6972

7073
// Remove will remove a client from this stream, but not shut the client down.
7174
func (s *Stream) Remove(c *Client) {
7275
s.listLock.Lock()
7376
defer s.listLock.Unlock()
7477

75-
for element := s.clients.Front(); element != nil; element = element.Next() {
76-
if regCli := element.Value.(*registeredClient); regCli.c == c {
77-
// client found
78-
s.clients.Remove(element)
79-
return
80-
}
81-
}
78+
delete(s.clients, c)
8279
}
8380

8481
// Broadcast sends the event to all clients registered on this stream.
8582
func (s *Stream) Broadcast(e *Event) {
8683
s.listLock.RLock()
8784
defer s.listLock.RUnlock()
8885

89-
for element := s.clients.Front(); element != nil; element = element.Next() {
90-
cli := element.Value.(*registeredClient)
91-
cli.c.Send(e)
86+
for cli := range s.clients {
87+
cli.Send(e)
9288
}
9389
}
9490

@@ -97,35 +93,35 @@ func (s *Stream) Broadcast(e *Event) {
9793
// client.
9894
func (s *Stream) Subscribe(topic string, c *Client) {
9995
// see if the client is registered
100-
cli := s.getClient(c)
96+
topics, found := s.clients[c]
10197

10298
// register if not
103-
if cli == nil {
104-
cli = s.addClient(c)
99+
if !found {
100+
topics = make(topicList)
101+
s.clients[c] = topics
105102
}
106103

107-
cli.topics[topic] = true
104+
topics[topic] = true
108105
}
109106

110107
// Unsubscribe removes clients from the topic, but not from broadcasts.
111108
func (s *Stream) Unsubscribe(topic string, c *Client) {
112109

113-
cli := s.getClient(c)
114-
if cli == nil {
110+
topics, found := s.clients[c]
111+
if !found {
115112
return
116113
}
117-
cli.topics[topic] = false
114+
topics[topic] = false
118115
}
119116

120117
// Publish sends the event to clients that have subscribed to the given topic.
121118
func (s *Stream) Publish(topic string, e *Event) {
122119
s.listLock.RLock()
123120
defer s.listLock.RUnlock()
124121

125-
for element := s.clients.Front(); element != nil; element = element.Next() {
126-
cli := element.Value.(*registeredClient)
127-
if cli.topics[topic] {
128-
cli.c.Send(e)
122+
for cli, topics := range s.clients {
123+
if topics[topic] {
124+
cli.Send(e)
129125
}
130126
}
131127
}
@@ -135,12 +131,9 @@ func (s *Stream) Shutdown() {
135131
s.listLock.Lock()
136132
defer s.listLock.Unlock()
137133

138-
for element := s.clients.Front(); element != nil; {
139-
cli := element.Value.(*registeredClient)
140-
cli.c.Shutdown()
141-
next := element.Next()
142-
s.clients.Remove(element)
143-
element = next
134+
for client := range s.clients {
135+
client.Shutdown()
136+
delete(s.clients, client)
144137
}
145138
}
146139

@@ -223,32 +216,3 @@ func (s *Stream) ClientConnectHook(fn func(*http.Request, *Client)) {
223216
func checkRequest(r *http.Request) bool {
224217
return r.Header.Get("Accept") == "text/event-stream"
225218
}
226-
227-
func (s *Stream) getClient(c *Client) *registeredClient {
228-
s.listLock.RLock()
229-
defer s.listLock.RUnlock()
230-
231-
for element := s.clients.Front(); element != nil; element = element.Next() {
232-
if regCli := element.Value.(*registeredClient); regCli.c == c {
233-
// client found
234-
return regCli
235-
}
236-
}
237-
238-
// not found
239-
return nil
240-
}
241-
242-
func (s *Stream) addClient(c *Client) *registeredClient {
243-
244-
cli := &registeredClient{
245-
c: c,
246-
topics: make(map[string]bool),
247-
}
248-
249-
s.listLock.Lock()
250-
s.clients.PushBack(cli)
251-
s.listLock.Unlock()
252-
253-
return cli
254-
}

0 commit comments

Comments
 (0)