Skip to content

Commit b415164

Browse files
authored
Merge pull request #11 from digitalocean/mdl-ovsdb-jsonrpc
ovsdb/internal/jsonrpc: initial commit
2 parents c910586 + 7958400 commit b415164

File tree

6 files changed

+430
-85
lines changed

6 files changed

+430
-85
lines changed

ovsdb/client.go

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,50 +18,82 @@ import (
1818
"bytes"
1919
"encoding/json"
2020
"fmt"
21-
"net/rpc"
22-
"net/rpc/jsonrpc"
21+
"log"
22+
"net"
23+
24+
"github.com/digitalocean/go-openvswitch/ovsdb/internal/jsonrpc"
2325
)
2426

2527
// A Client is an OVSDB client.
2628
type Client struct {
27-
rc *rpc.Client
29+
c *jsonrpc.Conn
30+
ll *log.Logger
31+
}
32+
33+
// An OptionFunc is a function which can configure a Client.
34+
type OptionFunc func(c *Client) error
35+
36+
// Debug enables debug logging for a Client.
37+
func Debug(ll *log.Logger) OptionFunc {
38+
return func(c *Client) error {
39+
c.ll = ll
40+
return nil
41+
}
2842
}
2943

3044
// Dial dials a connection to an OVSDB server and returns a Client.
31-
func Dial(network, addr string) (*Client, error) {
32-
c, err := jsonrpc.Dial(network, addr)
45+
func Dial(network, addr string, options ...OptionFunc) (*Client, error) {
46+
conn, err := net.Dial(network, addr)
3347
if err != nil {
3448
return nil, err
3549
}
3650

37-
return &Client{
38-
rc: c,
39-
}, nil
51+
return New(conn, options...)
52+
}
53+
54+
// New wraps an existing connection to an OVSDB server and returns a Client.
55+
func New(conn net.Conn, options ...OptionFunc) (*Client, error) {
56+
client := &Client{}
57+
for _, o := range options {
58+
if err := o(client); err != nil {
59+
return nil, err
60+
}
61+
}
62+
63+
client.c = jsonrpc.NewConn(conn, client.ll)
64+
65+
return client, nil
4066
}
4167

4268
// Close closes a Client's connection.
4369
func (c *Client) Close() error {
44-
return c.rc.Close()
70+
return c.c.Close()
4571
}
4672

4773
// ListDatabases returns the name of all databases known to the OVSDB server.
4874
func (c *Client) ListDatabases() ([]string, error) {
4975
var dbs []string
50-
if err := c.rpc("list_dbs", nil, &dbs); err != nil {
76+
if err := c.rpc("list_dbs", &dbs); err != nil {
5177
return nil, err
5278
}
5379

5480
return dbs, nil
5581
}
5682

5783
// rpc performs a single RPC request, and checks the response for errors.
58-
func (c *Client) rpc(method string, args, reply interface{}) error {
59-
// Captures any JSON-RPC errors.
84+
func (c *Client) rpc(method string, out interface{}, args ...interface{}) error {
85+
// Captures any OVSDB errors.
6086
r := result{
61-
Reply: reply,
87+
Reply: out,
88+
}
89+
90+
req := jsonrpc.Request{
91+
Method: method,
92+
Params: args,
93+
// Let the client handle the request ID.
6294
}
6395

64-
if err := c.rc.Call(method, args, &r); err != nil {
96+
if err := c.c.Execute(req, &r); err != nil {
6597
return err
6698
}
6799

ovsdb/client_test.go

Lines changed: 23 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,25 @@
1515
package ovsdb_test
1616

1717
import (
18-
"encoding/json"
1918
"fmt"
20-
"net"
21-
"sync"
2219
"testing"
2320

2421
"github.com/digitalocean/go-openvswitch/ovsdb"
22+
"github.com/digitalocean/go-openvswitch/ovsdb/internal/jsonrpc"
2523
"github.com/google/go-cmp/cmp"
2624
)
2725

2826
func TestClientError(t *testing.T) {
2927
const str = "some error"
3028

31-
c, done := testClient(t, func(_ string, _ []interface{}) interface{} {
32-
return &ovsdb.Error{
33-
Err: str,
34-
Details: "malformed",
35-
Syntax: "{}",
29+
c, done := testClient(t, func(_ jsonrpc.Request) jsonrpc.Response {
30+
return jsonrpc.Response{
31+
ID: 1,
32+
Result: &ovsdb.Error{
33+
Err: str,
34+
Details: "malformed",
35+
Syntax: "{}",
36+
},
3637
}
3738
})
3839
defer done()
@@ -51,19 +52,23 @@ func TestClientError(t *testing.T) {
5152
t.Fatalf("unexpected error (-want +got):\n%s", diff)
5253
}
5354
}
55+
5456
func TestClientListDatabases(t *testing.T) {
5557
want := []string{"Open_vSwitch", "test"}
5658

57-
c, done := testClient(t, func(method string, params []interface{}) interface{} {
58-
if diff := cmp.Diff("list_dbs", method); diff != "" {
59-
t.Fatalf("unexpected RPC method (-want +got):\n%s", diff)
59+
c, done := testClient(t, func(req jsonrpc.Request) jsonrpc.Response {
60+
if diff := cmp.Diff("list_dbs", req.Method); diff != "" {
61+
panicf("unexpected RPC method (-want +got):\n%s", diff)
6062
}
6163

62-
if diff := cmp.Diff(1, len(params)); diff != "" {
63-
t.Fatalf("unexpected number of RPC parameters (-want +got):\n%s", diff)
64+
if diff := cmp.Diff(0, len(req.Params)); diff != "" {
65+
panicf("unexpected number of RPC parameters (-want +got):\n%s", diff)
6466
}
6567

66-
return want
68+
return jsonrpc.Response{
69+
ID: 1,
70+
Result: want,
71+
}
6772
})
6873
defer done()
6974

@@ -77,70 +82,17 @@ func TestClientListDatabases(t *testing.T) {
7782
}
7883
}
7984

80-
type rpcFunc func(method string, params []interface{}) interface{}
81-
82-
func testClient(t *testing.T, fn rpcFunc) (*ovsdb.Client, func()) {
85+
func testClient(t *testing.T, fn jsonrpc.TestFunc) (*ovsdb.Client, func()) {
8386
t.Helper()
8487

85-
l, err := net.Listen("tcp", ":0")
86-
if err != nil {
87-
t.Fatalf("failed to listen: %v", err)
88-
}
89-
90-
var wg sync.WaitGroup
91-
wg.Add(1)
92-
93-
go func() {
94-
defer wg.Done()
95-
96-
// Accept a single connection.
97-
c, err := l.Accept()
98-
if err != nil {
99-
panicf("failed to accept: %v", err)
100-
}
101-
defer c.Close()
102-
_ = l.Close()
103-
104-
if err := handleConn(c, fn); err != nil {
105-
panicf("failed to handle connection: %v", err)
106-
}
107-
}()
88+
conn, done := jsonrpc.TestNetConn(t, fn)
10889

109-
c, err := ovsdb.Dial("tcp", l.Addr().String())
90+
c, err := ovsdb.New(conn)
11091
if err != nil {
11192
t.Fatalf("failed to dial: %v", err)
11293
}
11394

114-
return c, func() {
115-
// Ensure types are cleaned up, and ensure goroutine stops.
116-
_ = l.Close()
117-
_ = c.Close()
118-
wg.Wait()
119-
}
120-
}
121-
122-
func handleConn(c net.Conn, fn rpcFunc) error {
123-
var req struct {
124-
Method string `json:"method"`
125-
Params []interface{} `json:"params"`
126-
ID int `json:"id"`
127-
}
128-
129-
var res struct {
130-
Result interface{} `json:"result"`
131-
ID int `json:"id"`
132-
}
133-
134-
if err := json.NewDecoder(c).Decode(&req); err != nil {
135-
return err
136-
}
137-
138-
result := fn(req.Method, req.Params)
139-
140-
res.ID = req.ID
141-
res.Result = result
142-
143-
return json.NewEncoder(c).Encode(res)
95+
return c, done
14496
}
14597

14698
func panicf(format string, a ...interface{}) {

ovsdb/internal/jsonrpc/conn.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright 2017 DigitalOcean.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package jsonrpc
16+
17+
import (
18+
"encoding/json"
19+
"fmt"
20+
"io"
21+
"log"
22+
"sync"
23+
)
24+
25+
// A Request is a JSON-RPC request.
26+
type Request struct {
27+
ID int `json:"id"`
28+
Method string `json:"method"`
29+
Params []interface{} `json:"params"`
30+
}
31+
32+
// A Response is a JSON-RPC response.
33+
type Response struct {
34+
ID int `json:"id"`
35+
Result interface{} `json:"result"`
36+
Error interface{} `json:"error"`
37+
}
38+
39+
// NewConn creates a new Conn with the input io.ReadWriteCloser.
40+
// If a logger is specified, it is used for debug logs.
41+
func NewConn(rwc io.ReadWriteCloser, ll *log.Logger) *Conn {
42+
if ll != nil {
43+
rwc = &debugReadWriteCloser{
44+
rwc: rwc,
45+
ll: ll,
46+
}
47+
}
48+
49+
return &Conn{
50+
enc: json.NewEncoder(rwc),
51+
dec: json.NewDecoder(rwc),
52+
closer: rwc,
53+
}
54+
}
55+
56+
// A Conn is a JSON-RPC connection.
57+
type Conn struct {
58+
mu sync.RWMutex
59+
enc *json.Encoder
60+
dec *json.Decoder
61+
closer io.Closer
62+
seq int
63+
}
64+
65+
// Close closes the connection.
66+
func (c *Conn) Close() error {
67+
return c.closer.Close()
68+
}
69+
70+
// Execute executes a single request and unmarshals its results into out.
71+
func (c *Conn) Execute(req Request, out interface{}) error {
72+
c.mu.Lock()
73+
defer c.mu.Unlock()
74+
c.seq++
75+
76+
// Use auto-increment sequence, or user-defined if requested.
77+
seq := c.seq
78+
if req.ID != 0 {
79+
seq = req.ID
80+
} else {
81+
req.ID = seq
82+
}
83+
84+
// Non-nil array required for ovsdb-server to reply.
85+
if req.Params == nil {
86+
req.Params = []interface{}{}
87+
}
88+
89+
if err := c.enc.Encode(req); err != nil {
90+
return fmt.Errorf("failed to encode JSON-RPC request: %v", err)
91+
}
92+
93+
res := Response{
94+
Result: out,
95+
}
96+
97+
if err := c.dec.Decode(&res); err != nil {
98+
return fmt.Errorf("failed to decode JSON-RPC request: %v", err)
99+
}
100+
101+
if res.ID != seq {
102+
return fmt.Errorf("bad JSON-RPC sequence: %d, want: %d", res.ID, seq)
103+
}
104+
105+
// TODO(mdlayher): better errors.
106+
if res.Error != nil {
107+
return fmt.Errorf("received JSON-RPC error: %#v", res.Error)
108+
}
109+
110+
return nil
111+
}
112+
113+
type debugReadWriteCloser struct {
114+
rwc io.ReadWriteCloser
115+
ll *log.Logger
116+
}
117+
118+
func (rwc *debugReadWriteCloser) Read(b []byte) (int, error) {
119+
n, err := rwc.rwc.Read(b)
120+
if err != nil {
121+
return n, err
122+
}
123+
124+
rwc.ll.Printf(" read: %s", string(b[:n]))
125+
return n, nil
126+
}
127+
128+
func (rwc *debugReadWriteCloser) Write(b []byte) (int, error) {
129+
n, err := rwc.rwc.Write(b)
130+
if err != nil {
131+
return n, err
132+
}
133+
134+
rwc.ll.Printf("write: %s", string(b[:n]))
135+
return n, nil
136+
}
137+
138+
func (rwc *debugReadWriteCloser) Close() error {
139+
err := rwc.rwc.Close()
140+
rwc.ll.Println("close:", err)
141+
return err
142+
}

0 commit comments

Comments
 (0)