Skip to content

Commit 023cb14

Browse files
authored
add register methods (#8)
1 parent 7ec4784 commit 023cb14

File tree

4 files changed

+164
-44
lines changed

4 files changed

+164
-44
lines changed

examples/customproto/main.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Example of using hookah with a custom input protocol. In this case the input
2+
// protocol is named numbers:// and it can accept "odd" or "even" as the
3+
// argument.
4+
package main
5+
6+
import (
7+
"errors"
8+
"fmt"
9+
"io"
10+
"log"
11+
"time"
12+
13+
"github.com/wybiral/hookah"
14+
)
15+
16+
func main() {
17+
// Register new protocol
18+
hookah.RegisterInput("numbers", numbersHandler)
19+
// Create hookah input (using new numbers:// protocol)
20+
r, err := hookah.NewInput("numbers://odd")
21+
if err != nil {
22+
log.Fatal(err)
23+
}
24+
defer r.Close()
25+
// Create hookah output (stdout)
26+
w, err := hookah.NewOutput("stdout")
27+
if err != nil {
28+
log.Fatal(err)
29+
}
30+
defer w.Close()
31+
// Copy forever
32+
io.Copy(w, r)
33+
}
34+
35+
// struct type to implement interface on.
36+
type numbers struct {
37+
counter int64
38+
}
39+
40+
// Input handlers take an arg string and return an io.ReadCloser for the input
41+
// stream (or an error).
42+
func numbersHandler(arg string) (io.ReadCloser, error) {
43+
var counter int64
44+
if arg == "odd" {
45+
counter = 1
46+
} else if arg == "even" {
47+
counter = 2
48+
} else {
49+
return nil, errors.New("numbers requires: odd or even")
50+
}
51+
return &numbers{counter: counter}, nil
52+
}
53+
54+
// Read method satisfies the io.ReadCloser interface
55+
func (num *numbers) Read(b []byte) (int, error) {
56+
// Artificial delay
57+
time.Sleep(time.Second)
58+
// Format counter
59+
s := fmt.Sprintf("%d\n", num.counter)
60+
// Increment counter
61+
num.counter += 2
62+
// Copy to byte array
63+
n := copy(b, []byte(s))
64+
return n, nil
65+
}
66+
67+
// Close method satisfies the io.ReadCloser interface
68+
func (num *numbers) Close() error {
69+
return nil
70+
}

hookah.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,13 @@ func NewInput(opts string) (io.ReadCloser, error) {
1717
func NewOutput(opts string) (io.WriteCloser, error) {
1818
return output.New(opts)
1919
}
20+
21+
// RegisterInput registers a new input protocol.
22+
func RegisterInput(proto string, h input.Handler) {
23+
input.Register(proto, h)
24+
}
25+
26+
// RegisterOutput registers a new output protocol.
27+
func RegisterOutput(proto string, h output.Handler) {
28+
output.Register(proto, h)
29+
}

pkg/input/input.go

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,74 +6,94 @@ import (
66
"io"
77
"os"
88
"strings"
9+
"sync"
910
)
1011

12+
// User defined protocols
13+
var protocols sync.Map
14+
15+
// Handler is the function type for user defined input protocols.
16+
type Handler func(arg string) (io.ReadCloser, error)
17+
1118
// Buffer size used for incoming messages to servers
1219
const bufferSize = 4 * 1024
1320

1421
// New parses an option string and returns a new ReadCloser.
1522
func New(opts string) (io.ReadCloser, error) {
1623
parts := strings.SplitN(opts, "://", 2)
1724
proto := parts[0]
25+
arg := ""
26+
if len(parts) > 1 {
27+
arg = parts[1]
28+
}
29+
fn, ok := protocols.Load(proto)
30+
if ok {
31+
return fn.(Handler)(arg)
32+
}
1833
switch proto {
1934
case "stdin":
2035
return os.Stdin, nil
2136
case "file":
22-
if len(parts) < 2 {
37+
if arg == "" {
2338
return nil, errors.New("file: no path supplied")
2439
}
25-
return file(parts[1])
40+
return file(arg)
2641
case "http":
27-
if len(parts) < 2 {
42+
if arg == "" {
2843
return nil, errors.New("http client: no address supplied")
2944
}
30-
return httpClient("http://" + parts[1])
45+
return httpClient("http://" + arg)
3146
case "https":
32-
if len(parts) < 2 {
47+
if arg == "" {
3348
return nil, errors.New("https client: no address supplied")
3449
}
35-
return httpClient("https://" + parts[1])
50+
return httpClient("https://" + arg)
3651
case "http-server":
37-
if len(parts) < 2 {
52+
if arg == "" {
3853
return nil, errors.New("http server: no address supplied")
3954
}
40-
return httpServer(parts[1])
55+
return httpServer(arg)
4156
case "tcp":
42-
if len(parts) < 2 {
57+
if arg == "" {
4358
return nil, errors.New("tcp client: no address supplied")
4459
}
45-
return tcpClient(parts[1])
60+
return tcpClient(arg)
4661
case "tcp-server":
47-
if len(parts) < 2 {
62+
if arg == "" {
4863
return nil, errors.New("tcp server: no address supplied")
4964
}
50-
return tcpServer(parts[1])
65+
return tcpServer(arg)
5166
case "unix":
52-
if len(parts) < 2 {
67+
if arg == "" {
5368
return nil, errors.New("unix client: no address supplied")
5469
}
55-
return unixClient(parts[1])
70+
return unixClient(arg)
5671
case "unix-server":
57-
if len(parts) < 2 {
72+
if arg == "" {
5873
return nil, errors.New("unix server: no address supplied")
5974
}
60-
return unixServer(parts[1])
75+
return unixServer(arg)
6176
case "ws":
62-
if len(parts) < 2 {
77+
if arg == "" {
6378
return nil, errors.New("ws client: no address supplied")
6479
}
65-
return wsClient("ws://" + parts[1])
80+
return wsClient("ws://" + arg)
6681
case "wss":
67-
if len(parts) < 2 {
82+
if arg == "" {
6883
return nil, errors.New("wss client: no address supplied")
6984
}
70-
return wsClient("wss://" + parts[1])
85+
return wsClient("wss://" + arg)
7186
case "ws-server":
72-
if len(parts) < 2 {
87+
if arg == "" {
7388
return nil, errors.New("ws server: no address supplied")
7489
}
75-
return wsServer(parts[1])
90+
return wsServer(arg)
7691
default:
7792
return nil, errors.New("unknown in protocol: " + proto)
7893
}
7994
}
95+
96+
// Register a new input protocol.
97+
func Register(proto string, fn Handler) {
98+
protocols.Store(proto, fn)
99+
}

pkg/output/output.go

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,76 +6,96 @@ import (
66
"io"
77
"os"
88
"strings"
9+
"sync"
910
)
1011

12+
// User defined protocols
13+
var protocols sync.Map
14+
15+
// Handler is the function type for user defined input protocols.
16+
type Handler func(arg string) (io.WriteCloser, error)
17+
1118
// Number of buffered messages for each incoming server connection.
1219
const queueSize = 10
1320

1421
// New parses an option string and returns a new WriteCloser.
1522
func New(opts string) (io.WriteCloser, error) {
1623
parts := strings.SplitN(opts, "://", 2)
1724
proto := parts[0]
25+
arg := ""
26+
if len(parts) > 1 {
27+
arg = parts[1]
28+
}
29+
fn, ok := protocols.Load(proto)
30+
if ok {
31+
return fn.(Handler)(arg)
32+
}
1833
switch proto {
1934
case "stdout":
2035
return os.Stdout, nil
2136
case "stderr":
2237
return os.Stderr, nil
2338
case "file":
24-
if len(parts) < 2 {
39+
if arg == "" {
2540
return nil, errors.New("file: no path supplied")
2641
}
27-
return file(parts[1])
42+
return file(arg)
2843
case "http":
29-
if len(parts) < 2 {
44+
if arg == "" {
3045
return nil, errors.New("http client: no address supplied")
3146
}
32-
return httpClient("http://" + parts[1])
47+
return httpClient("http://" + arg)
3348
case "https":
34-
if len(parts) < 2 {
49+
if arg == "" {
3550
return nil, errors.New("https client: no address supplied")
3651
}
37-
return httpClient("https://" + parts[1])
52+
return httpClient("https://" + arg)
3853
case "http-server":
39-
if len(parts) < 2 {
54+
if arg == "" {
4055
return nil, errors.New("http server: no address supplied")
4156
}
42-
return httpServer(parts[1])
57+
return httpServer(arg)
4358
case "tcp":
44-
if len(parts) < 2 {
59+
if arg == "" {
4560
return nil, errors.New("tcp client: no address supplied")
4661
}
47-
return tcpClient(parts[1])
62+
return tcpClient(arg)
4863
case "tcp-server":
49-
if len(parts) < 2 {
64+
if arg == "" {
5065
return nil, errors.New("tcp server: no address supplied")
5166
}
52-
return tcpServer(parts[1])
67+
return tcpServer(arg)
5368
case "unix":
54-
if len(parts) < 2 {
69+
if arg == "" {
5570
return nil, errors.New("unix client: no address supplied")
5671
}
57-
return unixClient(parts[1])
72+
return unixClient(arg)
5873
case "unix-server":
59-
if len(parts) < 2 {
74+
if arg == "" {
6075
return nil, errors.New("unix server: no address supplied")
6176
}
62-
return unixServer(parts[1])
77+
return unixServer(arg)
6378
case "ws":
64-
if len(parts) < 2 {
79+
if arg == "" {
6580
return nil, errors.New("ws client: no address supplied")
6681
}
67-
return wsClient("ws://" + parts[1])
82+
return wsClient("ws://" + arg)
6883
case "wss":
69-
if len(parts) < 2 {
84+
if arg == "" {
7085
return nil, errors.New("wss client: no address supplied")
7186
}
72-
return wsClient("wss://" + parts[1])
87+
return wsClient("wss://" + arg)
7388
case "ws-server":
74-
if len(parts) < 2 {
89+
if arg == "" {
7590
return nil, errors.New("ws server: no address supplied")
7691
}
77-
return wsServer(parts[1])
92+
return wsServer(arg)
7893
default:
7994
return nil, errors.New("unknown out protocol: " + proto)
8095
}
8196
}
97+
98+
// Register a new output protocol.
99+
func Register(proto string, fn Handler) {
100+
protocols.Store(proto, fn)
101+
}

0 commit comments

Comments
 (0)