Skip to content

Commit bd07137

Browse files
authored
Merge pull request #22 from digitalocean/mdl-ovsdb-echo-notif
ovsdb: handle echo RPCs from server
2 parents 4965ba7 + 4f0c24a commit bd07137

File tree

2 files changed

+118
-17
lines changed

2 files changed

+118
-17
lines changed

ovsdb/client.go

Lines changed: 69 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,15 @@ func Debug(ll *log.Logger) OptionFunc {
6565
}
6666

6767
// EchoInterval specifies an interval at which the Client will send
68-
// echo RPCs to an OVSDB server to keep the connection alive. If this
69-
// option is not used, the Client will not send any echo RPCs on its own.
68+
// echo RPCs to an OVSDB server to keep the connection alive. Note that the
69+
// OVSDB server may also send its own echo RPCs to the Client, and the Client
70+
// will always reply to those on behalf of the user.
7071
//
71-
// Specify a duration of 0 to disable sending background echo RPCs.
72+
// If this option is not used, the Client will only send echo RPCs when the
73+
// server sends an echo RPC to it.
74+
//
75+
// Specify a duration of 0 to disable sending background echo RPCs at a
76+
// fixed interval.
7277
func EchoInterval(d time.Duration) OptionFunc {
7378
return func(c *Client) error {
7479
c.echoInterval = d
@@ -105,30 +110,40 @@ func New(conn net.Conn, options ...OptionFunc) (*Client, error) {
105110
// Set up callbacks.
106111
client.callbacks = make(map[string]callback)
107112

113+
// Set up echo loop statistics.
114+
var echoOK, echoFail int64
115+
client.echoOK = &echoOK
116+
client.echoFail = &echoFail
117+
118+
// Coordinates the sending of echo messages among multiple goroutines.
119+
echoC := make(chan struct{})
120+
108121
// Start up any background routines, and enable canceling them via context.
109122
ctx, cancel := context.WithCancel(context.Background())
110123
client.cancel = cancel
111124

112125
var wg sync.WaitGroup
113-
wg.Add(1)
114-
115-
// If configured, send echo RPCs in the background at a fixed interval.
116-
var echoOK, echoFail int64
117-
client.echoOK = &echoOK
118-
client.echoFail = &echoFail
126+
wg.Add(2)
119127

128+
// If configured, trigger echo RPCs in the background at a fixed interval.
120129
if d := client.echoInterval; d != 0 {
121130
wg.Add(1)
122131
go func() {
123132
defer wg.Done()
124-
client.echoLoop(ctx, d)
133+
client.echoTicker(ctx, d, echoC)
125134
}()
126135
}
127136

137+
// Send echo RPCs when triggered by channel.
138+
go func() {
139+
defer wg.Done()
140+
client.echoLoop(ctx, echoC)
141+
}()
142+
128143
// Handle all incoming RPC responses and notifications.
129144
go func() {
130145
defer wg.Done()
131-
client.listen()
146+
client.listen(ctx, echoC)
132147
}()
133148

134149
client.wg = &wg
@@ -175,7 +190,6 @@ type ClientStats struct {
175190
}
176191

177192
// Statistics about the Client's internal echo RPC loop.
178-
// Note that all counters will be zero if the echo loop is disabled.
179193
EchoLoop struct {
180194
// The number of successful and failed echo RPCs in the loop.
181195
Success, Failure int
@@ -250,7 +264,7 @@ func (c *Client) rpc(ctx context.Context, method string, out interface{}, args .
250264

251265
// listen starts an RPC receive loop that can return RPC results to
252266
// clients via a callback.
253-
func (c *Client) listen() {
267+
func (c *Client) listen(ctx context.Context, echoC chan<- struct{}) {
254268
for {
255269
res, err := c.c.Receive()
256270
if err != nil {
@@ -263,7 +277,20 @@ func (c *Client) listen() {
263277
continue
264278
}
265279

266-
// TODO(mdlayher): deal with RPC notifications.
280+
// Handle any JSON-RPC notifications.
281+
// TODO(mdlayher): deal with other RPC notifications.
282+
switch res.Method {
283+
case "echo":
284+
// OVSDB server wants us to send an echo to it, but will also send
285+
// us a response to that echo. Since this goroutine is the one that
286+
// needs to receive that response and issue the callback for it, we
287+
// ask the echo loop goroutine to send an echo on our behalf.
288+
select {
289+
case <-ctx.Done():
290+
case echoC <- struct{}{}:
291+
}
292+
continue
293+
}
267294

268295
// Handle any JSON-RPC top-level errors.
269296
if err := res.Err(); err != nil {
@@ -280,13 +307,14 @@ func (c *Client) listen() {
280307
}
281308
}
282309

283-
// echoLoop starts a loop that sends echo RPCs at the interval defined by d.
284-
func (c *Client) echoLoop(ctx context.Context, d time.Duration) {
310+
// echoTicker starts a loop that triggers echo RPCs via channel at a fixed
311+
// time interval.
312+
func (c *Client) echoTicker(ctx context.Context, d time.Duration, echoC chan<- struct{}) {
285313
t := time.NewTicker(d)
286314
defer t.Stop()
287315

288316
for {
289-
// If context is canceled, we should exit this loop. If a tick is fired
317+
// If context is canceled, we should exit this loop. If a request is fired
290318
// and the context was already canceled, we exit there as well.
291319
select {
292320
case <-ctx.Done():
@@ -297,6 +325,30 @@ func (c *Client) echoLoop(ctx context.Context, d time.Duration) {
297325
}
298326
}
299327

328+
// Allow producer to stop if context closed instead of blocking if
329+
// the consumer is stopped.
330+
select {
331+
case <-ctx.Done():
332+
return
333+
case echoC <- struct{}{}:
334+
}
335+
}
336+
}
337+
338+
// echoLoop starts a loop that sends echo RPCs when requested via channel.
339+
func (c *Client) echoLoop(ctx context.Context, echoC <-chan struct{}) {
340+
for {
341+
// If context is canceled, we should exit this loop. If a request is fired
342+
// and the context was already canceled, we exit there as well.
343+
select {
344+
case <-ctx.Done():
345+
return
346+
case <-echoC:
347+
if err := ctx.Err(); err != nil {
348+
return
349+
}
350+
}
351+
300352
// For the time being, we will track metrics about the number of successes
301353
// and failures while sending echo RPCs.
302354
// TODO(mdlayher): improve error handling for echo loop.

ovsdb/client_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,55 @@ func TestClientEchoLoop(t *testing.T) {
233233
}
234234
}
235235

236+
func TestClientEchoNotification(t *testing.T) {
237+
if testing.Short() {
238+
t.Skip("skipping during short test run")
239+
}
240+
241+
c, notifC, done := testClient(t, func(req jsonrpc.Request) jsonrpc.Response {
242+
if diff := cmp.Diff("echo", req.Method); diff != "" {
243+
panicf("unexpected RPC method (-want +got):\n%s", diff)
244+
}
245+
246+
return jsonrpc.Response{
247+
ID: &req.ID,
248+
Result: mustMarshalJSON(t, req.Params),
249+
}
250+
})
251+
defer done()
252+
253+
// Prompt the client to send an echo in the same way ovsdb-server does.
254+
notifC <- &jsonrpc.Response{
255+
ID: strPtr("echo"),
256+
Method: "echo",
257+
}
258+
259+
// Fail the test if the RPCs don't fire.
260+
timer := time.AfterFunc(2*time.Second, func() {
261+
panicf("took too long to wait for echo RPCs")
262+
})
263+
defer timer.Stop()
264+
265+
// Ensure that background echo RPCs are being sent.
266+
tick := time.NewTicker(100 * time.Millisecond)
267+
defer tick.Stop()
268+
269+
for {
270+
// Just wait for a single echo RPC cycle before success.
271+
<-tick.C
272+
273+
stats := c.Stats()
274+
275+
if n := stats.EchoLoop.Failure; n > 0 {
276+
t.Fatalf("echo loop RPC failed %d times", n)
277+
}
278+
279+
if n := stats.EchoLoop.Success; n > 0 {
280+
break
281+
}
282+
}
283+
}
284+
236285
func testClient(t *testing.T, fn jsonrpc.TestFunc, options ...ovsdb.OptionFunc) (*ovsdb.Client, chan<- *jsonrpc.Response, func()) {
237286
t.Helper()
238287

0 commit comments

Comments
 (0)