Skip to content

Commit 7739bbd

Browse files
committed
Shimmed out Stream and Client
1 parent 358a1d2 commit 7739bbd

File tree

3 files changed

+146
-0
lines changed

3 files changed

+146
-0
lines changed

client.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package eventsource
2+
3+
import (
4+
"io"
5+
"net/http"
6+
)
7+
8+
type Client struct {
9+
flush http.Flusher
10+
write io.Writer
11+
}
12+
13+
func NewClient(w http.ResponseWriter) *Client {
14+
c := &Client{
15+
write: w,
16+
}
17+
18+
// Check to ensure we support flushing
19+
flush, ok := w.(http.Flusher)
20+
if !ok {
21+
return nil
22+
}
23+
c.flush = flush
24+
25+
return c
26+
}
27+
28+
func (c *Client) Send(ev *Event) {
29+
io.Copy(c.write, ev)
30+
c.flush.Flush()
31+
}
32+
33+
func (c *Client) run() {
34+
35+
}

event.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package eventsource
22

33
import (
44
"bytes"
5+
"io"
6+
"io/ioutil"
57
"strconv"
68
)
79

@@ -15,6 +17,18 @@ type Event struct {
1517
bufSet bool
1618
}
1719

20+
// DataEvent creates a new Event with the data set
21+
func DataEvent(data string) *Event {
22+
e := &Event{}
23+
io.WriteString(e, data)
24+
return e
25+
}
26+
27+
// ID sets the event ID
28+
func (e *Event) ID(id string) {
29+
e.id = []byte(id)
30+
}
31+
1832
// Read the event in wire format
1933
func (e *Event) Read(p []byte) (int, error) {
2034
if e.bufSet {
@@ -80,3 +94,17 @@ func (e *Event) Write(p []byte) (int, error) {
8094
e.bufSet = false
8195
return len(p), nil
8296
}
97+
98+
// WriteRaw sets an event directly in wire format
99+
//
100+
// This does no validation to ensure it is in a correct format
101+
// and should mostly be used to deep copy another event
102+
func (e *Event) WriteRaw(p []byte) (int, error) {
103+
return e.buf.Write(p)
104+
}
105+
106+
// String returns the Event in wire format as a string
107+
func (e *Event) String() string {
108+
fullEvent, _ := ioutil.ReadAll(e)
109+
return string(fullEvent)
110+
}

stream.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package eventsource
2+
3+
import (
4+
"container/list"
5+
"net/http"
6+
"sync"
7+
)
8+
9+
type Stream struct {
10+
clients list.List
11+
broadcast chan *Event
12+
shutdownWait sync.WaitGroup
13+
clientConnectHook func(*http.Request, *Client)
14+
}
15+
16+
func New() *Stream {
17+
s := &Stream{}
18+
go s.run()
19+
return s
20+
}
21+
22+
func (s *Stream) Register(c *Client) {
23+
24+
}
25+
26+
func (s *Stream) Broadcast(e *Event) {
27+
28+
}
29+
30+
func (s *Stream) Subscribe(topic string, c *Client) {
31+
32+
}
33+
34+
func (s *Stream) Publish(topic string, e *Event) {
35+
36+
}
37+
38+
func (s *Stream) Shutdown() {
39+
40+
}
41+
42+
func (s *Stream) CloseTopic(topic string) {
43+
44+
}
45+
46+
func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
47+
48+
}
49+
50+
func (s *Stream) TopicHandler(topic string) http.HandlerFunc {
51+
52+
}
53+
54+
// Register a function to be called when a client connects to this stream's
55+
// HTTP handler
56+
func (s *Stream) ClientConnectHook(fn func(*http.Request, *Client)) {
57+
s.clientConnectHook = fn
58+
}
59+
60+
func (s *Stream) run() {
61+
62+
for {
63+
select {
64+
65+
case ev, ok := <-s.broadcast:
66+
67+
// end of the broadcast channel indicates a shutdown
68+
if !ok {
69+
//s.closeAll()
70+
s.shutdownWait.Done()
71+
return
72+
}
73+
74+
// otherwise normal message
75+
s.sendAll(ev)
76+
77+
}
78+
}
79+
}
80+
81+
func (s *Stream) sendAll(ev *Event) {
82+
83+
}

0 commit comments

Comments
 (0)