Skip to content

Commit a5d4b62

Browse files
committed
improve flushing idle efficiency
1 parent 22e2b5c commit a5d4b62

File tree

1 file changed

+24
-29
lines changed

1 file changed

+24
-29
lines changed

client.go

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package eventsource
33
import (
44
"context"
55
"io"
6+
"log"
67
"net/http"
78
"sync"
89
"time"
@@ -11,15 +12,14 @@ import (
1112
// Client wraps an http connection and converts it to an
1213
// event stream.
1314
type Client struct {
14-
flush http.Flusher
15-
write io.Writer
16-
ctx context.Context
17-
events chan Event
18-
closed bool
19-
waiter sync.WaitGroup
20-
lock sync.Mutex
21-
lastFlush uint64
22-
lastWrite uint64
15+
flusher http.Flusher
16+
write io.Writer
17+
ctx context.Context
18+
events chan Event
19+
closed bool
20+
waiter sync.WaitGroup
21+
lock sync.Mutex
22+
flushing *time.Timer
2323
}
2424

2525
// NewClient creates a client wrapping a response writer.
@@ -35,11 +35,11 @@ func NewClient(w http.ResponseWriter, req *http.Request) *Client {
3535
}
3636

3737
// Check to ensure we support flushing
38-
flush, ok := w.(http.Flusher)
38+
flusher, ok := w.(http.Flusher)
3939
if !ok {
4040
return nil
4141
}
42-
c.flush = flush
42+
c.flusher = flusher
4343

4444
c.ctx = req.Context()
4545

@@ -49,12 +49,11 @@ func NewClient(w http.ResponseWriter, req *http.Request) *Client {
4949
if req == nil || req.ProtoMajor < 2 {
5050
w.Header().Set("Connection", "keep-alive")
5151
}
52-
flush.Flush()
52+
flusher.Flush()
5353

5454
// start the sending thread
55-
c.waiter.Add(2)
55+
c.waiter.Add(1)
5656
go c.run()
57-
go c.flusher()
5857
return c
5958
}
6059

@@ -115,7 +114,9 @@ func (c *Client) run() {
115114
// send the event
116115
c.lock.Lock()
117116
io.Copy(c.write, &ev)
118-
c.lastWrite += 1
117+
if c.flushing == nil {
118+
c.flushing = time.AfterFunc(100*time.Millisecond, c.flush)
119+
}
119120
c.lock.Unlock()
120121

121122
case <-done:
@@ -128,19 +129,13 @@ func (c *Client) run() {
128129
}
129130

130131
// flusher amortizes flushing costs for high activity SSE channels
131-
func (c *Client) flusher() {
132-
for {
133-
time.Sleep(100 * time.Millisecond)
134-
c.lock.Lock()
135-
if c.closed {
136-
break
137-
}
138-
if c.lastFlush < c.lastWrite {
139-
c.lastFlush = c.lastWrite
140-
c.flush.Flush()
141-
}
142-
c.lock.Unlock()
132+
func (c *Client) flush() {
133+
c.lock.Lock()
134+
log.Println("flushing!")
135+
defer c.lock.Unlock()
136+
if c.closed {
137+
return
143138
}
144-
145-
c.waiter.Done()
139+
c.flushing = nil
140+
c.flusher.Flush()
146141
}

0 commit comments

Comments
 (0)