Skip to content

Commit 8957cde

Browse files
authored
Merge pull request #20 from digitalocean/mdl-ovsdb-echoloop
ovsdb: implement background echo loop in Client
2 parents d310314 + 1d288a8 commit 8957cde

File tree

2 files changed

+161
-9
lines changed

2 files changed

+161
-9
lines changed

ovsdb/client.go

Lines changed: 102 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@ import (
2323
"strings"
2424
"sync"
2525
"sync/atomic"
26+
"time"
2627

2728
"github.com/digitalocean/go-openvswitch/ovsdb/internal/jsonrpc"
2829
)
2930

30-
// A Client is an OVSDB client.
31+
// A Client is an OVSDB client. Clients can be customized by using OptionFuncs
32+
// in the Dial and New functions.
3133
type Client struct {
34+
// The RPC connection, and its logger.
3235
c *jsonrpc.Conn
3336
ll *log.Logger
3437

@@ -39,7 +42,14 @@ type Client struct {
3942
cbMu sync.RWMutex
4043
callbacks map[int]callback
4144

42-
wg *sync.WaitGroup
45+
// Interval at which echo RPCs should occur in the background, and statistics
46+
// about the echo loop.
47+
echoInterval time.Duration
48+
echoOK, echoFail *int64
49+
50+
// Track and clean up background goroutines.
51+
cancel func()
52+
wg *sync.WaitGroup
4353
}
4454

4555
// An OptionFunc is a function which can configure a Client.
@@ -53,6 +63,18 @@ func Debug(ll *log.Logger) OptionFunc {
5363
}
5464
}
5565

66+
// EchoInterval specifies an interval at which the Client will send
67+
// echo RPCs to an OVSDB server to keep the connection alive. If this
68+
// option is not used, the Client will not send any echo RPCs on its own.
69+
//
70+
// Specify a duration of 0 to disable sending background echo RPCs.
71+
func EchoInterval(d time.Duration) OptionFunc {
72+
return func(c *Client) error {
73+
c.echoInterval = d
74+
return nil
75+
}
76+
}
77+
5678
// Dial dials a connection to an OVSDB server and returns a Client.
5779
func Dial(network, addr string, options ...OptionFunc) (*Client, error) {
5880
conn, err := net.Dial(network, addr)
@@ -82,10 +104,26 @@ func New(conn net.Conn, options ...OptionFunc) (*Client, error) {
82104
// Set up callbacks.
83105
client.callbacks = make(map[int]callback)
84106

85-
// Start up any background routines.
107+
// Start up any background routines, and enable canceling them via context.
108+
ctx, cancel := context.WithCancel(context.Background())
109+
client.cancel = cancel
110+
86111
var wg sync.WaitGroup
87112
wg.Add(1)
88113

114+
// If configured, send echo RPCs in the background at a fixed interval.
115+
var echoOK, echoFail int64
116+
client.echoOK = &echoOK
117+
client.echoFail = &echoFail
118+
119+
if d := client.echoInterval; d != 0 {
120+
wg.Add(1)
121+
go func() {
122+
defer wg.Done()
123+
client.echoLoop(ctx, d)
124+
}()
125+
}
126+
89127
// Handle all incoming RPC responses and notifications.
90128
go func() {
91129
defer wg.Done()
@@ -102,8 +140,9 @@ func (c *Client) requestID() int {
102140
return int(atomic.AddInt64(c.rpcID, 1))
103141
}
104142

105-
// Close closes a Client's connection.
143+
// Close closes a Client's connection and cleans up its resources.
106144
func (c *Client) Close() error {
145+
c.cancel()
107146
err := c.c.Close()
108147
c.wg.Wait()
109148
return err
@@ -114,9 +153,11 @@ func (c *Client) Stats() ClientStats {
114153
var s ClientStats
115154

116155
c.cbMu.RLock()
117-
defer c.cbMu.RUnlock()
118-
119156
s.Callbacks.Current = len(c.callbacks)
157+
c.cbMu.RUnlock()
158+
159+
s.EchoLoop.Success = int(atomic.LoadInt64(c.echoOK))
160+
s.EchoLoop.Failure = int(atomic.LoadInt64(c.echoFail))
120161

121162
return s
122163
}
@@ -129,6 +170,13 @@ type ClientStats struct {
129170
// for RPC responses.
130171
Current int
131172
}
173+
174+
// Statistics about the Client's internal echo RPC loop.
175+
// Note that all counters will be zero if the echo loop is disabled.
176+
EchoLoop struct {
177+
// The number of successful and failed echo RPCs in the loop.
178+
Success, Failure int
179+
}
132180
}
133181

134182
// rpc performs a single RPC request, and checks the response for errors.
@@ -204,7 +252,7 @@ func (c *Client) listen() {
204252
res, err := c.c.Receive()
205253
if err != nil {
206254
// EOF or closed connection means time to stop serving.
207-
if err == io.EOF || strings.Contains(err.Error(), "use of closed network") {
255+
if err == io.EOF || isClosedNetwork(err) {
208256
return
209257
}
210258

@@ -229,6 +277,43 @@ func (c *Client) listen() {
229277
}
230278
}
231279

280+
// echoLoop starts a loop that sends echo RPCs at the interval defined by d.
281+
func (c *Client) echoLoop(ctx context.Context, d time.Duration) {
282+
t := time.NewTicker(d)
283+
defer t.Stop()
284+
285+
for {
286+
// If context is canceled, we should exit this loop. If a tick is fired
287+
// and the context was already canceled, we exit there as well.
288+
select {
289+
case <-ctx.Done():
290+
return
291+
case <-t.C:
292+
if err := ctx.Err(); err != nil {
293+
return
294+
}
295+
}
296+
297+
// For the time being, we will track metrics about the number of successes
298+
// and failures while sending echo RPCs.
299+
// TODO(mdlayher): improve error handling for echo loop.
300+
if err := c.Echo(ctx); err != nil {
301+
if isClosedNetwork(err) {
302+
// Our socket was closed, which means the context should be canceled
303+
// and we should terminate on the next loop. No need to increment
304+
// errors counter.
305+
continue
306+
}
307+
308+
// Count other errors as failures.
309+
atomic.AddInt64(c.echoFail, 1)
310+
continue
311+
}
312+
313+
atomic.AddInt64(c.echoOK, 1)
314+
}
315+
}
316+
232317
// A callback can be used to send a message back to a caller, or
233318
// allow the caller to cancel waiting for a message.
234319
type callback struct {
@@ -275,6 +360,16 @@ func (c *Client) doCallback(id int, res rpcResponse) {
275360
delete(c.callbacks, id)
276361
}
277362

363+
// isClosedNetwork checks for errors caused by a closed network connection.
364+
func isClosedNetwork(err error) bool {
365+
if err == nil {
366+
return false
367+
}
368+
369+
// Not an awesome solution, but see: https://github.com/golang/go/issues/4373.
370+
return strings.Contains(err.Error(), "use of closed network connection")
371+
}
372+
278373
func panicf(format string, a ...interface{}) {
279374
panic(fmt.Sprintf(format, a...))
280375
}

ovsdb/client_test.go

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"log"
2222
"os"
23+
"sync/atomic"
2324
"testing"
2425
"time"
2526

@@ -182,12 +183,68 @@ func TestClientLeakCallbacks(t *testing.T) {
182183
}
183184
}
184185

185-
func testClient(t *testing.T, fn jsonrpc.TestFunc) (*ovsdb.Client, chan<- *jsonrpc.Response, func()) {
186+
func TestClientEchoLoop(t *testing.T) {
187+
if testing.Short() {
188+
t.Skip("skipping during short test run")
189+
}
190+
191+
// Count the number of requests sent to the server.
192+
echo := ovsdb.EchoInterval(50 * time.Millisecond)
193+
var reqID int64
194+
195+
c, _, done := testClient(t, func(req jsonrpc.Request) jsonrpc.Response {
196+
if diff := cmp.Diff("echo", req.Method); diff != "" {
197+
panicf("unexpected RPC method (-want +got):\n%s", diff)
198+
}
199+
200+
// Keep incrementing the request ID to match the client.
201+
id := int(atomic.AddInt64(&reqID, 1))
202+
return jsonrpc.Response{
203+
ID: &id,
204+
Result: mustMarshalJSON(t, req.Params),
205+
}
206+
}, echo)
207+
defer done()
208+
209+
// Fail the test if the RPCs don't fire.
210+
timer := time.AfterFunc(2*time.Second, func() {
211+
panicf("took too long to wait for echo RPCs")
212+
})
213+
defer timer.Stop()
214+
215+
// Ensure that background echo RPCs are being sent.
216+
tick := time.NewTicker(100 * time.Millisecond)
217+
defer tick.Stop()
218+
219+
for {
220+
// Just wait for a handful of RPCs to be sent before success.
221+
<-tick.C
222+
223+
stats := c.Stats()
224+
225+
if n := stats.EchoLoop.Failure; n > 0 {
226+
t.Fatalf("echo loop RPC failed %d times", n)
227+
}
228+
229+
if n := stats.EchoLoop.Success; n > 5 {
230+
break
231+
}
232+
}
233+
}
234+
235+
func testClient(t *testing.T, fn jsonrpc.TestFunc, options ...ovsdb.OptionFunc) (*ovsdb.Client, chan<- *jsonrpc.Response, func()) {
186236
t.Helper()
187237

238+
// Prepend a verbose logger so the caller can override it easily.
239+
if testing.Verbose() {
240+
options = append([]ovsdb.OptionFunc{
241+
ovsdb.Debug(log.New(os.Stderr, "", 0)),
242+
}, options...)
243+
}
244+
188245
conn, notifC, done := jsonrpc.TestNetConn(t, fn)
189246

190-
c, err := ovsdb.New(conn, ovsdb.Debug(log.New(os.Stderr, "", 0)))
247+
c, err := ovsdb.New(conn, options...)
191248
if err != nil {
192249
t.Fatalf("failed to dial: %v", err)
193250
}

0 commit comments

Comments
 (0)