Skip to content

Commit 8d57873

Browse files
authored
Merge pull request AndrewBurian#4 from AndrewBurian/remove-list
Remove list
2 parents 84da568 + 4b03a23 commit 8d57873

File tree

3 files changed

+51
-80
lines changed

3 files changed

+51
-80
lines changed

README.md

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ Swiss Army Knife for SSE in Golang
55
So you want to publish events to client that connect to your server?
66
```go
77
func main() {
8-
stream := &eventsource.Stream{}
8+
stream := eventsource.NewStream()
99

1010
go func(s *eventsource.Stream) {
1111
for {
@@ -24,7 +24,7 @@ The `Stream` object implements an `http.Handler` for you so it can be registered
2424
You got it! What do you need?
2525

2626
## Multiplexing / Topics / Rooms / Channels
27-
We call them "topics" but the gist is the same. All Clients always recieve `Broadcast` events, but you can `Publish` events to a specific topic, and then only clients that have `Subscribe`d to that topic will recieve the event.
27+
We call them "topics" but the gist is the same. All Clients always receive `Broadcast` events, but you can `Publish` events to a specific topic, and then only clients that have `Subscribe`d to that topic will receive the event.
2828

2929
```go
3030
stream.Subscribe("weather", myClient)
@@ -35,8 +35,8 @@ stream.Broadcast(tornadoWarningEvent)
3535
You can also just create multiple `Stream` objects much to the same effect, then only use `Broadcast`. Streams are cheap and run no background routines, so this is a valid pattern.
3636

3737
```go
38-
weatherStream := &eventstream.Stream{}
39-
lotteryStream := &eventstream.Stream{}
38+
weatherStream := eventstream.NewStream()
39+
lotteryStream := eventstream.NewStream()
4040

4141
weatherStream.Register(clientPlanningHikes)
4242
lotteryStream.Register(soonToBePoorClient)
@@ -49,7 +49,7 @@ lotteryStream.Broadcast(everyoneLosesEvent)
4949

5050
Use `TopicHandler` to create another handler for that stream that will subscribe clients to topics as well as broadcasts.
5151
```go
52-
stream := &eventsource.Stream{}
52+
stream := eventsource.NewStream()
5353
catsHandler := stream.TopicHandler([]string{"cat"})
5454

5555
http.ListenAndServe(":8080", catsHandler)
@@ -59,10 +59,10 @@ http.ListenAndServe(":8080", catsHandler)
5959
Use the stream's `Register`, `Subscribe`, `Remove`, `Unsubscribe`, and `CloseTopic` functions to control which clients are registered where.
6060

6161
## Tell me when clients connect
62-
Register a callback for the `Stream` to envoke everytime a new client connects with `Stream.ClientConnectHook`. It'll give you a handle to the resulting Client and the http request that created it, letting you do whatever you please.
62+
Register a callback for the `Stream` to invoke every time a new client connects with `Stream.ClientConnectHook`. It'll give you a handle to the resulting Client and the http request that created it, letting you do whatever you please.
6363

6464
```go
65-
stream := &eventsource.Stream{}
65+
stream := eventsource.NewStream()
6666
stream.ClientConnectHook(func(r *http.Requset, c *eventsource.Client){
6767
fmt.Println("Recieved connection from", r.Host)
6868
fmt.Println("Hate that guy")
@@ -71,13 +71,13 @@ stream.ClientConnectHook(func(r *http.Requset, c *eventsource.Client){
7171
})
7272
```
7373

74-
The callback will be on the same goroutine as the incoming web request that created it, but the Client is live and functioning so it'll start recieving broadcasts and publishments immediately before your callback has returned.
74+
The callback will be on the same goroutine as the incoming web request that created it, but the Client is live and functioning so it'll start receiving broadcasts and publications immediately before your callback has returned.
7575

7676
## Graceful shutdown
77-
The stream's `Shutdown` command will unsubscribe and disconnect all connected clients. However the `Stream` itself is not running any background routines, and may continue to register new clients if it's still registed as an http handler.
77+
The stream's `Shutdown` command will unsubscribe and disconnect all connected clients. However the `Stream` itself is not running any background routines, and may continue to register new clients if it's still registered as an http handler.
7878

7979
## Get out of my way
80-
Fine! The `Stream` object is entirely convinience. It runs no background routines and does no special handling. It just adds the topics abstraction and calls `NewClient` for you when it's connected to. Feel free not to use it.
80+
Fine! The `Stream` object is entirely convenience. It runs no background routines and does no special handling. It just adds the topics abstraction and calls `NewClient` for you when it's connected to. Feel free not to use it.
8181

8282
# More control of the `Client`
8383
You betcha.
@@ -101,12 +101,12 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request) {
101101
Letting the http handler routine that created the Client return may cause the underlying connection to be closed by the server. Since `NewClient` does not block, use `Wait` to block until the client is shutdown.
102102

103103
## Shutdown the client
104-
The client's `Shutdown` function terminates the background routine and marks the client as closed. It does not actually sever the connection. It does unblock any routines waiting on `Wait`, which assuming the main http hander routine was waiting there, will cause the connection to close as it returns.
104+
The client's `Shutdown` function terminates the background routine and marks the client as closed. It does not actually sever the connection. It does unblock any routines waiting on `Wait`, which assuming the main http handler routine was waiting there, will cause the connection to close as it returns.
105105

106106
Attempts to `Send` events to a client after it has been shutdown will result in an error
107107

108108
# More control of Events
109-
Events are the most critical part of the library, and are the most versitile.
109+
Events are the most critical part of the library, and are the most versatile.
110110

111111
## Write my own events from scratch
112112
Events implement the `io.ReadWriter` interface so that data can be written to it from practically any source. However the `Write` interface _does not_ write an entire event in wire format. It writes the provided buffer into `data:` sections in the resulting event.
@@ -135,7 +135,7 @@ newEvent.WriteRaw(evData) // that will work
135135
newEvent.AppendData("Moar") // ... and you ruined it
136136
```
137137

138-
Use `Clone` to create a perfect deep copy that survives further mutation. Though this is less efficient in memeory.
138+
Use `Clone` to create a perfect deep copy that survives further mutation. Though this is less efficient in memory.
139139

140140
## Create events more easily
141141
Since you will probably be creating more than just a few events, the `EventFactory` interface and a couple helper factories and functions have been provided to speed things up.

event.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (e *Event) Write(p []byte) (int, error) {
119119
}
120120

121121
// WriteString adds string data to the event.
122-
// Equivilant to calling Write([]byte(string))
122+
// Equivalent to calling Write([]byte(string))
123123
func (e *Event) WriteString(p string) {
124124
// split event on newlines
125125
split := strings.Split(p, "\n")

stream.go

Lines changed: 37 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ Package eventsource is a library for dealing with server sent events in Go.
44
The library attempts to make as few assumptions as possible about how your apps
55
will be written, and remains as generic as possible while still being simple and useful.
66
7-
The three core obects to the library are Clients, Events, and Streams.
7+
The three core objects to the library are Clients, Events, and Streams.
88
99
Client wraps an HTTP connection, and runs a worker routine to send events to the connected
1010
client in a thread-safe way. It gracefully handles client disconnects.
@@ -14,12 +14,12 @@ it to wire format to send. Events are not thread-safe by themselves.
1414
1515
Stream is an abstraction for 0 or more Client connections, and adds some multiplexing and filtering
1616
on top of the Client. It also can act as an http.Handler to automatically register inbound client
17-
connections and disconnectinos.
17+
connections and disconnections.
1818
1919
A quick example of a simple sever that broadcasts a "tick" event every second
2020
2121
func main() {
22-
stream := &eventsource.Stream{}
22+
stream := eventsource.NewStream()
2323
go func(s *eventsource.Stream) {
2424
for {
2525
time.Sleep(time.Second)
@@ -33,7 +33,6 @@ A quick example of a simple sever that broadcasts a "tick" event every second
3333
package eventsource
3434

3535
import (
36-
"container/list"
3736
"net/http"
3837
"sync"
3938
)
@@ -43,89 +42,93 @@ import (
4342
// A stream also implements an http.Handler to easily register incoming
4443
// http requests as new clients.
4544
type Stream struct {
46-
clients list.List
45+
clients map[*Client]topicList
4746
listLock sync.RWMutex
4847
shutdownWait sync.WaitGroup
4948
clientConnectHook func(*http.Request, *Client)
5049
}
5150

52-
type registeredClient struct {
53-
c *Client
54-
topics map[string]bool
51+
type topicList map[string]bool
52+
53+
// NewStream creates a new stream object
54+
func NewStream() *Stream {
55+
return &Stream{
56+
clients: make(map[*Client]topicList),
57+
}
5558
}
5659

5760
// Register adds a client to the stream to receive all broadcast
5861
// messages. Has no effect if the client is already registered.
5962
func (s *Stream) Register(c *Client) {
63+
s.listLock.Lock()
64+
defer s.listLock.Unlock()
6065

6166
// see if the client has been registered
62-
if cli := s.getClient(c); cli != nil {
67+
if _, found := s.clients[c]; found {
6368
return
6469
}
6570

6671
// append new client
67-
s.addClient(c)
72+
s.clients[c] = make(topicList)
6873
}
6974

7075
// Remove will remove a client from this stream, but not shut the client down.
7176
func (s *Stream) Remove(c *Client) {
7277
s.listLock.Lock()
7378
defer s.listLock.Unlock()
7479

75-
for element := s.clients.Front(); element != nil; element = element.Next() {
76-
if regCli := element.Value.(*registeredClient); regCli.c == c {
77-
// client found
78-
s.clients.Remove(element)
79-
return
80-
}
81-
}
80+
delete(s.clients, c)
8281
}
8382

8483
// Broadcast sends the event to all clients registered on this stream.
8584
func (s *Stream) Broadcast(e *Event) {
8685
s.listLock.RLock()
8786
defer s.listLock.RUnlock()
8887

89-
for element := s.clients.Front(); element != nil; element = element.Next() {
90-
cli := element.Value.(*registeredClient)
91-
cli.c.Send(e)
88+
for cli := range s.clients {
89+
cli.Send(e)
9290
}
9391
}
9492

9593
// Subscribe add the client to the list of clients receiving publications
9694
// to this topic. Subscribe will also Register an unregistered
9795
// client.
9896
func (s *Stream) Subscribe(topic string, c *Client) {
97+
s.listLock.Lock()
98+
defer s.listLock.Unlock()
99+
99100
// see if the client is registered
100-
cli := s.getClient(c)
101+
topics, found := s.clients[c]
101102

102103
// register if not
103-
if cli == nil {
104-
cli = s.addClient(c)
104+
if !found {
105+
topics = make(topicList)
106+
s.clients[c] = topics
105107
}
106108

107-
cli.topics[topic] = true
109+
topics[topic] = true
108110
}
109111

110112
// Unsubscribe removes clients from the topic, but not from broadcasts.
111113
func (s *Stream) Unsubscribe(topic string, c *Client) {
114+
s.listLock.Lock()
115+
defer s.listLock.Unlock()
112116

113-
cli := s.getClient(c)
114-
if cli == nil {
117+
topics, found := s.clients[c]
118+
if !found {
115119
return
116120
}
117-
cli.topics[topic] = false
121+
topics[topic] = false
118122
}
119123

120124
// Publish sends the event to clients that have subscribed to the given topic.
121125
func (s *Stream) Publish(topic string, e *Event) {
122126
s.listLock.RLock()
123127
defer s.listLock.RUnlock()
124128

125-
for element := s.clients.Front(); element != nil; element = element.Next() {
126-
cli := element.Value.(*registeredClient)
127-
if cli.topics[topic] {
128-
cli.c.Send(e)
129+
for cli, topics := range s.clients {
130+
if topics[topic] {
131+
cli.Send(e)
129132
}
130133
}
131134
}
@@ -135,12 +138,9 @@ func (s *Stream) Shutdown() {
135138
s.listLock.Lock()
136139
defer s.listLock.Unlock()
137140

138-
for element := s.clients.Front(); element != nil; {
139-
cli := element.Value.(*registeredClient)
140-
cli.c.Shutdown()
141-
next := element.Next()
142-
s.clients.Remove(element)
143-
element = next
141+
for client := range s.clients {
142+
client.Shutdown()
143+
delete(s.clients, client)
144144
}
145145
}
146146

@@ -223,32 +223,3 @@ func (s *Stream) ClientConnectHook(fn func(*http.Request, *Client)) {
223223
func checkRequest(r *http.Request) bool {
224224
return r.Header.Get("Accept") == "text/event-stream"
225225
}
226-
227-
func (s *Stream) getClient(c *Client) *registeredClient {
228-
s.listLock.RLock()
229-
defer s.listLock.RUnlock()
230-
231-
for element := s.clients.Front(); element != nil; element = element.Next() {
232-
if regCli := element.Value.(*registeredClient); regCli.c == c {
233-
// client found
234-
return regCli
235-
}
236-
}
237-
238-
// not found
239-
return nil
240-
}
241-
242-
func (s *Stream) addClient(c *Client) *registeredClient {
243-
244-
cli := &registeredClient{
245-
c: c,
246-
topics: make(map[string]bool),
247-
}
248-
249-
s.listLock.Lock()
250-
s.clients.PushBack(cli)
251-
s.listLock.Unlock()
252-
253-
return cli
254-
}

0 commit comments

Comments
 (0)