Skip to content

Commit b3f614e

Browse files
authored
Channel reader (refactor) (#26)
* add chreader * version bump
1 parent 4b39d93 commit b3f614e

File tree

6 files changed

+62
-60
lines changed

6 files changed

+62
-60
lines changed

hookah.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
)
1515

1616
// Version of hookah API
17-
const Version = "1.0.3"
17+
const Version = "1.0.4"
1818

1919
// API is an instance of the Hookah API.
2020
type API struct {

pkg/chreader/chreader.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Package chreader converts a []byte channel to an io.ReadCloser.
2+
package chreader
3+
4+
import (
5+
"io"
6+
"sync"
7+
)
8+
9+
type chReader struct {
10+
ch chan []byte
11+
mu sync.Mutex
12+
top []byte
13+
}
14+
15+
// New returns an io.ReadCloser that reads from and closes ch.
16+
func New(ch chan []byte) io.ReadCloser {
17+
return &chReader{ch: ch}
18+
}
19+
20+
func (c *chReader) Read(b []byte) (int, error) {
21+
c.mu.Lock()
22+
defer c.mu.Unlock()
23+
if len(c.top) == 0 {
24+
top := <-c.ch
25+
c.top = make([]byte, len(top))
26+
copy(c.top, top)
27+
if len(c.top) == 0 {
28+
// ch is closed
29+
return 0, io.EOF
30+
}
31+
}
32+
n := copy(b, c.top)
33+
c.top = c.top[n:]
34+
return n, nil
35+
}
36+
37+
func (c *chReader) Close() error {
38+
close(c.ch)
39+
return nil
40+
}

pkg/input/httplisten.go

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,16 @@ import (
44
"io"
55
"net/http"
66
"net/url"
7-
"sync"
7+
8+
"github.com/wybiral/hookah/pkg/chreader"
89
)
910

1011
type httpListenApp struct {
1112
server *http.Server
1213
// Channel of messages
1314
ch chan []byte
14-
// Lock for changing top
15-
mu *sync.Mutex
16-
// Current message being read
17-
top []byte
15+
// ch Reader
16+
r io.Reader
1817
}
1918

2019
// HTTPListen creates an HTTP listener and returns ReadCloser
@@ -25,28 +24,17 @@ func HTTPListen(addr string, opts url.Values) (io.ReadCloser, error) {
2524
Handler: http.HandlerFunc(app.handle),
2625
}
2726
app.ch = make(chan []byte)
28-
app.mu = &sync.Mutex{}
29-
app.top = nil
27+
app.r = chreader.New(app.ch)
3028
go app.server.ListenAndServe()
3129
return app, nil
3230
}
3331

3432
func (app *httpListenApp) Read(b []byte) (int, error) {
35-
app.mu.Lock()
36-
defer app.mu.Unlock()
37-
if len(app.top) == 0 {
38-
app.top = <-app.ch
39-
if len(app.top) == 0 {
40-
// ch is closed
41-
return 0, io.EOF
42-
}
43-
}
44-
n := copy(b, app.top)
45-
app.top = app.top[n:]
46-
return n, nil
33+
return app.r.Read(b)
4734
}
4835

4936
func (app *httpListenApp) Close() error {
37+
// Closing ch causes r.Read to return EOF
5038
close(app.ch)
5139
return app.server.Close()
5240
}

pkg/input/input.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ import (
1010
type Handler func(arg string, opts url.Values) (io.ReadCloser, error)
1111

1212
// Buffer size used for incoming messages to servers
13-
const bufferSize = 4 * 1024
13+
const bufferSize = 8 * 1024

pkg/input/listen.go

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,16 @@ package input
33
import (
44
"io"
55
"net"
6-
"sync"
6+
7+
"github.com/wybiral/hookah/pkg/chreader"
78
)
89

910
type listenApp struct {
1011
ln net.Listener
1112
// Channel of messages
1213
ch chan []byte
13-
// Lock for changing top
14-
mu *sync.Mutex
15-
// Current message being read
16-
top []byte
14+
// ch Reader
15+
r io.Reader
1716
}
1817

1918
// listen creates a generic listener and returns ReadCloser
@@ -25,28 +24,17 @@ func listen(network, addr string) (io.ReadCloser, error) {
2524
}
2625
app.ln = ln
2726
app.ch = make(chan []byte)
28-
app.mu = &sync.Mutex{}
29-
app.top = nil
27+
app.r = chreader.New(app.ch)
3028
go app.serve()
3129
return app, nil
3230
}
3331

3432
func (app *listenApp) Read(b []byte) (int, error) {
35-
app.mu.Lock()
36-
defer app.mu.Unlock()
37-
if len(app.top) == 0 {
38-
app.top = <-app.ch
39-
if len(app.top) == 0 {
40-
// ch is closed
41-
return 0, io.EOF
42-
}
43-
}
44-
n := copy(b, app.top)
45-
app.top = app.top[n:]
46-
return n, nil
33+
return app.r.Read(b)
4734
}
4835

4936
func (app *listenApp) Close() error {
37+
// Closing ch causes r.Read to return EOF
5038
close(app.ch)
5139
return app.ln.Close()
5240
}

pkg/input/wslisten.go

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,17 @@ import (
44
"io"
55
"net/http"
66
"net/url"
7-
"sync"
87

98
"github.com/gorilla/websocket"
9+
"github.com/wybiral/hookah/pkg/chreader"
1010
)
1111

1212
type wsListenApp struct {
1313
server *http.Server
1414
// Channel of messages
1515
ch chan []byte
16-
// Lock for top
17-
mu *sync.Mutex
18-
// Current message being read
19-
top []byte
16+
// ch Reader
17+
r io.Reader
2018
}
2119

2220
// WebSocket upgrader
@@ -36,29 +34,17 @@ func WSListen(addr string, opts url.Values) (io.ReadCloser, error) {
3634
Handler: http.HandlerFunc(app.handle),
3735
}
3836
app.ch = make(chan []byte)
39-
app.mu = &sync.Mutex{}
40-
app.top = nil
37+
app.r = chreader.New(app.ch)
4138
go app.server.ListenAndServe()
4239
return app, nil
4340
}
4441

4542
func (app *wsListenApp) Read(b []byte) (int, error) {
46-
app.mu.Lock()
47-
defer app.mu.Unlock()
48-
// top is empty, pull from ch
49-
if len(app.top) == 0 {
50-
app.top = <-app.ch
51-
if len(app.top) == 0 {
52-
// ch is closed
53-
return 0, io.EOF
54-
}
55-
}
56-
n := copy(b, app.top)
57-
app.top = app.top[n:]
58-
return n, nil
43+
return app.r.Read(b)
5944
}
6045

6146
func (app *wsListenApp) Close() error {
47+
// Closing ch causes r.Read to return EOF
6248
close(app.ch)
6349
return app.server.Close()
6450
}

0 commit comments

Comments
 (0)