Skip to content

Commit 1aa633e

Browse files
committed
ovsdb/internal/jsonrpc: break Execute into Send and Receive, add notification ability
1 parent b415164 commit 1aa633e

File tree

3 files changed

+236
-80
lines changed

3 files changed

+236
-80
lines changed

ovsdb/internal/jsonrpc/conn.go

Lines changed: 56 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package jsonrpc
1616

1717
import (
1818
"encoding/json"
19+
"errors"
1920
"fmt"
2021
"io"
2122
"log"
@@ -29,11 +30,28 @@ type Request struct {
2930
Params []interface{} `json:"params"`
3031
}
3132

32-
// A Response is a JSON-RPC response.
33+
// A Response is either a JSON-RPC response, or a JSON-RPC request notification.
3334
type Response struct {
34-
ID int `json:"id"`
35-
Result interface{} `json:"result"`
36-
Error interface{} `json:"error"`
35+
// Non-null for response; null for request notification.
36+
ID *int `json:"id"`
37+
38+
// Response fields.
39+
Result json.RawMessage `json:"result,omitempty"`
40+
Error interface{} `json:"error"`
41+
42+
// Request notification fields.
43+
Method string `json:"method,omitempty"`
44+
Params json.RawMessage `json:"params,omitempty"`
45+
}
46+
47+
// Err returns an error, if one occurred in a Response.
48+
func (r *Response) Err() error {
49+
// TODO(mdlayher): better errors.
50+
if r.Error == nil {
51+
return nil
52+
}
53+
54+
return fmt.Errorf("received JSON-RPC error: %#v", r.Error)
3755
}
3856

3957
// NewConn creates a new Conn with the input io.ReadWriteCloser.
@@ -47,67 +65,67 @@ func NewConn(rwc io.ReadWriteCloser, ll *log.Logger) *Conn {
4765
}
4866

4967
return &Conn{
50-
enc: json.NewEncoder(rwc),
51-
dec: json.NewDecoder(rwc),
52-
closer: rwc,
68+
c: rwc,
69+
enc: json.NewEncoder(rwc),
70+
dec: json.NewDecoder(rwc),
5371
}
5472
}
5573

5674
// A Conn is a JSON-RPC connection.
5775
type Conn struct {
58-
mu sync.RWMutex
59-
enc *json.Encoder
60-
dec *json.Decoder
61-
closer io.Closer
62-
seq int
76+
c io.Closer
77+
78+
encMu sync.Mutex
79+
enc *json.Encoder
80+
81+
decMu sync.Mutex
82+
dec *json.Decoder
6383
}
6484

6585
// Close closes the connection.
6686
func (c *Conn) Close() error {
67-
return c.closer.Close()
87+
// TODO(mdlayher): acquiring mutex will block forever if receive loop
88+
// is happening elsewhere. Any way to avoid this?
89+
return c.c.Close()
6890
}
6991

70-
// Execute executes a single request and unmarshals its results into out.
71-
func (c *Conn) Execute(req Request, out interface{}) error {
72-
c.mu.Lock()
73-
defer c.mu.Unlock()
74-
c.seq++
75-
76-
// Use auto-increment sequence, or user-defined if requested.
77-
seq := c.seq
78-
if req.ID != 0 {
79-
seq = req.ID
80-
} else {
81-
req.ID = seq
92+
// Send sends a single JSON-RPC request.
93+
func (c *Conn) Send(req Request) error {
94+
if req.ID == 0 {
95+
return errors.New("JSON-RPC request ID must be non-zero")
8296
}
8397

8498
// Non-nil array required for ovsdb-server to reply.
8599
if req.Params == nil {
86100
req.Params = []interface{}{}
87101
}
88102

103+
c.encMu.Lock()
104+
defer c.encMu.Unlock()
105+
89106
if err := c.enc.Encode(req); err != nil {
90107
return fmt.Errorf("failed to encode JSON-RPC request: %v", err)
91108
}
92109

93-
res := Response{
94-
Result: out,
95-
}
110+
return nil
111+
}
96112

97-
if err := c.dec.Decode(&res); err != nil {
98-
return fmt.Errorf("failed to decode JSON-RPC request: %v", err)
99-
}
113+
// Receive receives a single JSON-RPC response.
114+
func (c *Conn) Receive() (*Response, error) {
115+
c.decMu.Lock()
116+
defer c.decMu.Unlock()
100117

101-
if res.ID != seq {
102-
return fmt.Errorf("bad JSON-RPC sequence: %d, want: %d", res.ID, seq)
103-
}
118+
var res Response
119+
if err := c.dec.Decode(&res); err != nil {
120+
// Don't mask EOF errors with added detail.
121+
if err == io.EOF {
122+
return nil, err
123+
}
104124

105-
// TODO(mdlayher): better errors.
106-
if res.Error != nil {
107-
return fmt.Errorf("received JSON-RPC error: %#v", res.Error)
125+
return nil, fmt.Errorf("failed to decode JSON-RPC response: %v", err)
108126
}
109127

110-
return nil
128+
return &res, nil
111129
}
112130

113131
type debugReadWriteCloser struct {

ovsdb/internal/jsonrpc/conn_test.go

Lines changed: 135 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,57 +15,69 @@
1515
package jsonrpc_test
1616

1717
import (
18+
"encoding/json"
1819
"fmt"
20+
"io"
1921
"testing"
2022

2123
"github.com/digitalocean/go-openvswitch/ovsdb/internal/jsonrpc"
2224
"github.com/google/go-cmp/cmp"
2325
)
2426

25-
func TestConnExecuteBadSequence(t *testing.T) {
26-
req := jsonrpc.Request{
27-
ID: 10,
28-
Method: "test",
29-
}
30-
31-
c, done := jsonrpc.TestConn(t, func(_ jsonrpc.Request) jsonrpc.Response {
32-
return jsonrpc.Response{
33-
// Bad sequence.
34-
ID: 1,
35-
}
36-
})
27+
func TestConnSendNoRequestID(t *testing.T) {
28+
c, _, done := jsonrpc.TestConn(t, nil)
3729
defer done()
3830

39-
if err := c.Execute(req, nil); err == nil {
31+
if err := c.Send(jsonrpc.Request{}); err == nil {
4032
t.Fatal("expected an error, but none occurred")
4133
}
4234
}
4335

44-
func TestConnExecuteError(t *testing.T) {
36+
func TestConnReceiveEOF(t *testing.T) {
37+
c := jsonrpc.NewConn(&eofReadWriteCloser{}, nil)
38+
39+
// Conn should not mask io.EOF.
40+
_, err := c.Receive()
41+
if err != io.EOF {
42+
t.Fatalf("unexpected error: %v", err)
43+
}
44+
}
45+
46+
func TestConnSendReceiveError(t *testing.T) {
4547
// TODO(mdlayher): what does this actually look like?
4648
type rpcError struct {
4749
Details string
4850
}
4951

50-
c, done := jsonrpc.TestConn(t, func(_ jsonrpc.Request) jsonrpc.Response {
52+
c, _, done := jsonrpc.TestConn(t, func(_ jsonrpc.Request) jsonrpc.Response {
5153
return jsonrpc.Response{
52-
ID: 10,
54+
ID: intPtr(10),
5355
Error: rpcError{
5456
Details: "some error",
5557
},
5658
}
5759
})
5860
defer done()
5961

60-
if err := c.Execute(jsonrpc.Request{ID: 10}, nil); err == nil {
62+
if err := c.Send(jsonrpc.Request{ID: 10}); err != nil {
63+
t.Fatalf("failed to send request: %v", err)
64+
}
65+
66+
res, err := c.Receive()
67+
if err != nil {
68+
t.Fatalf("failed to receive response: %v", err)
69+
}
70+
71+
if err := res.Err(); err == nil {
6172
t.Fatal("expected an error, but none occurred")
6273
}
6374
}
6475

65-
func TestConnExecuteOK(t *testing.T) {
76+
func TestConnSendReceiveOK(t *testing.T) {
6677
req := jsonrpc.Request{
6778
Method: "hello",
6879
Params: []interface{}{"world"},
80+
ID: 1,
6981
}
7082

7183
type message struct {
@@ -76,30 +88,128 @@ func TestConnExecuteOK(t *testing.T) {
7688
Message: "hello world",
7789
}
7890

79-
c, done := jsonrpc.TestConn(t, func(got jsonrpc.Request) jsonrpc.Response {
80-
req.ID = 1
81-
91+
c, _, done := jsonrpc.TestConn(t, func(got jsonrpc.Request) jsonrpc.Response {
8292
if diff := cmp.Diff(req, got); diff != "" {
8393
panicf("unexpected request (-want +got):\n%s", diff)
8494
}
8595

8696
return jsonrpc.Response{
87-
ID: 1,
88-
Result: want,
97+
ID: intPtr(1),
98+
Result: mustMarshalJSON(t, want),
8999
}
90100
})
91101
defer done()
92102

103+
if err := c.Send(req); err != nil {
104+
t.Fatalf("failed to send request: %v", err)
105+
}
106+
107+
res, err := c.Receive()
108+
if err != nil {
109+
t.Fatalf("failed to receive response: %v", err)
110+
}
111+
112+
if err := res.Err(); err != nil {
113+
t.Fatalf("request failed: %v", err)
114+
}
115+
93116
var out message
94-
if err := c.Execute(req, &out); err != nil {
95-
t.Fatalf("failed to execute: %v", err)
117+
if err := json.Unmarshal(res.Result, &out); err != nil {
118+
t.Fatalf("failed to unmarshal JSON: %v", err)
96119
}
97120

98121
if diff := cmp.Diff(want, out); diff != "" {
99122
t.Fatalf("unexpected response (-want +got):\n%s", diff)
100123
}
101124
}
102125

126+
func TestConnSendReceiveNotificationsOK(t *testing.T) {
127+
const id = 10
128+
129+
req := jsonrpc.Request{
130+
ID: id,
131+
Method: "monitor",
132+
Params: []interface{}{"Open_vSwitch"},
133+
}
134+
135+
res := jsonrpc.Response{
136+
ID: intPtr(id),
137+
Result: mustMarshalJSON(t, "some bytes"),
138+
}
139+
140+
c, notifC, done := jsonrpc.TestConn(t, func(got jsonrpc.Request) jsonrpc.Response {
141+
if diff := cmp.Diff(req, got); diff != "" {
142+
panicf("unexpected request (-want +got):\n%s", diff)
143+
}
144+
145+
return res
146+
})
147+
defer done()
148+
149+
note := &jsonrpc.Response{
150+
Method: "notify",
151+
}
152+
notifC <- note
153+
notifC <- note
154+
155+
if err := c.Send(req); err != nil {
156+
t.Fatalf("failed to send request: %v", err)
157+
}
158+
159+
var responses, notes int
160+
for i := 0; i < 3; i++ {
161+
res, err := c.Receive()
162+
if err != nil {
163+
t.Fatalf("failed to receive response: %v", err)
164+
}
165+
166+
if res.ID != nil {
167+
responses++
168+
if diff := cmp.Diff(req.ID, *res.ID); diff != "" {
169+
t.Fatalf("unexpected response request ID (-want +got):\n%s", diff)
170+
}
171+
172+
continue
173+
}
174+
175+
notes++
176+
if diff := cmp.Diff(note.Method, res.Method); diff != "" {
177+
t.Fatalf("unexpected notification method (-want +got):\n%s", diff)
178+
}
179+
}
180+
181+
if diff := cmp.Diff(1, responses); diff != "" {
182+
t.Fatalf("unexpected number of responses (-want +got):\n%s", diff)
183+
}
184+
185+
if diff := cmp.Diff(2, notes); diff != "" {
186+
t.Fatalf("unexpected number of notifications (-want +got):\n%s", diff)
187+
}
188+
}
189+
190+
func mustMarshalJSON(t *testing.T, v interface{}) []byte {
191+
t.Helper()
192+
193+
b, err := json.Marshal(v)
194+
if err != nil {
195+
t.Fatalf("failed to marshal JSON: %v", err)
196+
}
197+
198+
return b
199+
}
200+
201+
func intPtr(i int) *int {
202+
return &i
203+
}
204+
103205
func panicf(format string, a ...interface{}) {
104206
panic(fmt.Sprintf(format, a...))
105207
}
208+
209+
type eofReadWriteCloser struct {
210+
io.ReadWriteCloser
211+
}
212+
213+
func (rwc *eofReadWriteCloser) Read(b []byte) (int, error) {
214+
return 0, io.EOF
215+
}

0 commit comments

Comments
 (0)