Skip to content

Commit 69bd65e

Browse files
committed
add non-blocking send
1 parent 04d667f commit 69bd65e

File tree

3 files changed

+27
-14
lines changed

3 files changed

+27
-14
lines changed

client.go

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package eventsource
22

33
import (
4+
"context"
45
"io"
56
"net/http"
67
"sync"
@@ -12,7 +13,7 @@ import (
1213
type Client struct {
1314
flush http.Flusher
1415
write io.Writer
15-
close http.CloseNotifier
16+
ctx context.Context
1617
events chan *Event
1718
closed bool
1819
waiter sync.WaitGroup
@@ -22,8 +23,7 @@ type Client struct {
2223
}
2324

2425
// NewClient creates a client wrapping a response writer.
25-
// The response writer must support http.Flusher and http.CloseNotifier
26-
// interfaces.
26+
// The response writer must support http.Flusher interface.
2727
// When writing, the client will automatically send some headers. Passing the
2828
// original http.Request helps determine which headers, but the request it is
2929
// optional.
@@ -41,12 +41,7 @@ func NewClient(w http.ResponseWriter, req *http.Request) *Client {
4141
}
4242
c.flush = flush
4343

44-
// Check to ensure we support close notifications
45-
closer, ok := w.(http.CloseNotifier)
46-
if !ok {
47-
return nil
48-
}
49-
c.close = closer
44+
c.ctx = req.Context()
5045

5146
// Send the initial headers
5247
w.Header().Set("Content-Type", "text/event-stream")
@@ -57,14 +52,15 @@ func NewClient(w http.ResponseWriter, req *http.Request) *Client {
5752
flush.Flush()
5853

5954
// start the sending thread
60-
c.waiter.Add(1)
55+
c.waiter.Add(2)
6156
go c.run()
6257
go c.flusher()
6358
return c
6459
}
6560

6661
// Send queues an event to be sent to the client.
67-
// This does not block until the event has been sent.
62+
// This does not block until the event has been sent,
63+
// however it could block if the event queue is full.
6864
// Returns an error if the Client has disconnected
6965
func (c *Client) Send(ev *Event) error {
7066
if c.closed {
@@ -74,6 +70,22 @@ func (c *Client) Send(ev *Event) error {
7470
return nil
7571
}
7672

73+
// Send queues an event to be sent to the client.
74+
// This guarantees not block until the event has been sent.
75+
// Returns true if blocked
76+
// Returns an error if the Client has disconnected
77+
func (c *Client) SendNonBlocking(ev *Event) (bool, error) {
78+
if c.closed {
79+
return false, io.ErrClosedPipe
80+
}
81+
select {
82+
case c.events <- ev.Clone():
83+
default:
84+
return true, nil
85+
}
86+
return false, nil
87+
}
88+
7789
// Shutdown terminates a client connection
7890
func (c *Client) Shutdown() {
7991
close(c.events)
@@ -89,6 +101,7 @@ func (c *Client) Wait() {
89101

90102
// Worker thread for the client responsible for writing events
91103
func (c *Client) run() {
104+
done := c.ctx.Done()
92105
for {
93106
select {
94107
case ev, ok := <-c.events:
@@ -105,7 +118,7 @@ func (c *Client) run() {
105118
c.lastWrite = time.Now()
106119
c.lock.Unlock()
107120

108-
case <-c.close.CloseNotify():
121+
case <-done:
109122
c.closed = true
110123
c.waiter.Done()
111124
return
@@ -132,4 +145,5 @@ func (c *Client) flusher() {
132145
}
133146

134147
ticker.Stop()
148+
c.waiter.Done()
135149
}

event.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func (e *Event) WriteRaw(p []byte) (int, error) {
155155
// String returns the Event in wire format as a string
156156
func (e *Event) String() string {
157157
e.prepare()
158-
return string(e.buf.Bytes())
158+
return e.buf.String()
159159
}
160160

161161
// Clone returns a deep copy of the event

stream.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ import (
4444
type Stream struct {
4545
clients map[*Client]topicList
4646
listLock sync.RWMutex
47-
shutdownWait sync.WaitGroup
4847
clientConnectHook func(*http.Request, *Client)
4948
}
5049

0 commit comments

Comments
 (0)