Skip to content

Commit c84a878

Browse files
committed
Basic stream functionality
1 parent 7739bbd commit c84a878

File tree

1 file changed

+75
-1
lines changed

1 file changed

+75
-1
lines changed

stream.go

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,31 +8,78 @@ import (
88

99
type Stream struct {
1010
clients list.List
11+
listLock sync.RWMutex
1112
broadcast chan *Event
1213
shutdownWait sync.WaitGroup
1314
clientConnectHook func(*http.Request, *Client)
1415
}
1516

17+
type registeredClient struct {
18+
c *Client
19+
topics map[string]bool
20+
}
21+
1622
func New() *Stream {
1723
s := &Stream{}
1824
go s.run()
1925
return s
2026
}
2127

28+
// Register adds a client to the stream to receive all broadcast
29+
// messages. Has no effect if the client is already registered.
2230
func (s *Stream) Register(c *Client) {
2331

32+
// see if the client has been registered
33+
if cli := s.getClient(c); cli != nil {
34+
return
35+
}
36+
37+
// append new client
38+
s.addClient(c)
2439
}
2540

41+
// Broadcast sends the event to all clients registered on this stream.
2642
func (s *Stream) Broadcast(e *Event) {
2743

44+
for element := s.clients.Front(); element != nil; element.Next() {
45+
cli := element.Value.(*registeredClient)
46+
cli.c.Send(e)
47+
}
2848
}
2949

50+
// Subscribe add the client to the list of clients receiving publications
51+
// to this topic. Subscribe will also Register an unregistered
52+
// client.
3053
func (s *Stream) Subscribe(topic string, c *Client) {
3154

55+
// see if the client is registered
56+
cli := s.getClient(c)
57+
58+
// register if not
59+
if cli == nil {
60+
cli = s.addClient(c)
61+
}
62+
63+
cli.topics[topic] = true
3264
}
3365

34-
func (s *Stream) Publish(topic string, e *Event) {
66+
// Unsubscribe removes clients from the topic, but not from broadcasts.
67+
func (s *Stream) Unsubscribe(topic string, c *Client) {
68+
cli := s.getClient(c)
69+
if cli == nil {
70+
return
71+
}
72+
cli.topics[topic] = false
73+
}
3574

75+
// Publish sends the event to clients that have subscribed to the given topic.
76+
func (s *Stream) Publish(topic string, e *Event) {
77+
for element := s.clients.Front(); element != nil; element.Next() {
78+
cli := element.Value.(*registeredClient)
79+
if cli.topics[topic] {
80+
cli.c.Send(e)
81+
}
82+
}
3683
}
3784

3885
func (s *Stream) Shutdown() {
@@ -81,3 +128,30 @@ func (s *Stream) run() {
81128
func (s *Stream) sendAll(ev *Event) {
82129

83130
}
131+
132+
func (s *Stream) getClient(c *Client) *registeredClient {
133+
if s.clients.Len() > 0 {
134+
// ensure client is not already registered
135+
s.listLock.RLock()
136+
137+
listItem := s.clients.Front()
138+
if regCli := listItem.Value.(*registeredClient); regCli.c == c {
139+
// client found
140+
s.listLock.RUnlock()
141+
return regCli
142+
}
143+
}
144+
145+
return nil
146+
}
147+
148+
func (s *Stream) addClient(c *Client) *registeredClient {
149+
s.listLock.Lock()
150+
151+
s.clients.PushBack(&registeredClient{
152+
c: c,
153+
topics: make(map[string]bool),
154+
})
155+
156+
s.listLock.Unlock()
157+
}

0 commit comments

Comments
 (0)