Skip to content

Commit 2ee46ab

Browse files
committed
ovsdb: implement background echo loop in Client
1 parent d310314 commit 2ee46ab

File tree

2 files changed

+108
-6
lines changed

2 files changed

+108
-6
lines changed

ovsdb/client.go

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@ import (
2323
"strings"
2424
"sync"
2525
"sync/atomic"
26+
"time"
2627

2728
"github.com/digitalocean/go-openvswitch/ovsdb/internal/jsonrpc"
2829
)
2930

30-
// A Client is an OVSDB client.
31+
// A Client is an OVSDB client. Clients can be customized by using OptionFuncs
32+
// in the Dial and New functions.
3133
type Client struct {
34+
// The RPC connection, and its logger.
3235
c *jsonrpc.Conn
3336
ll *log.Logger
3437

@@ -39,7 +42,12 @@ type Client struct {
3942
cbMu sync.RWMutex
4043
callbacks map[int]callback
4144

42-
wg *sync.WaitGroup
45+
// Interval at which echo RPCs should occur in the background.
46+
echoInterval time.Duration
47+
48+
// Track and clean up background goroutines.
49+
cancel func()
50+
wg *sync.WaitGroup
4351
}
4452

4553
// An OptionFunc is a function which can configure a Client.
@@ -53,6 +61,18 @@ func Debug(ll *log.Logger) OptionFunc {
5361
}
5462
}
5563

64+
// EchoInterval specifies an interval at which the Client will send
65+
// echo RPCs to an OVSDB server to keep the connection alive. If this
66+
// option is not used, the Client will not send any echo RPCs on its own.
67+
//
68+
// Specify a duration of 0 to disable sending background echo RPCs.
69+
func EchoInterval(d time.Duration) OptionFunc {
70+
return func(c *Client) error {
71+
c.echoInterval = d
72+
return nil
73+
}
74+
}
75+
5676
// Dial dials a connection to an OVSDB server and returns a Client.
5777
func Dial(network, addr string, options ...OptionFunc) (*Client, error) {
5878
conn, err := net.Dial(network, addr)
@@ -82,10 +102,22 @@ func New(conn net.Conn, options ...OptionFunc) (*Client, error) {
82102
// Set up callbacks.
83103
client.callbacks = make(map[int]callback)
84104

85-
// Start up any background routines.
105+
// Start up any background routines, and enable canceling them via context.
106+
ctx, cancel := context.WithCancel(context.Background())
107+
client.cancel = cancel
108+
86109
var wg sync.WaitGroup
87110
wg.Add(1)
88111

112+
// If configured, send echo RPCs in the background at a fixed interval.
113+
if d := client.echoInterval; d != 0 {
114+
wg.Add(1)
115+
go func() {
116+
defer wg.Done()
117+
client.echoLoop(ctx, d)
118+
}()
119+
}
120+
89121
// Handle all incoming RPC responses and notifications.
90122
go func() {
91123
defer wg.Done()
@@ -102,9 +134,10 @@ func (c *Client) requestID() int {
102134
return int(atomic.AddInt64(c.rpcID, 1))
103135
}
104136

105-
// Close closes a Client's connection.
137+
// Close closes a Client's connection and cleans up its resources.
106138
func (c *Client) Close() error {
107139
err := c.c.Close()
140+
c.cancel()
108141
c.wg.Wait()
109142
return err
110143
}
@@ -229,6 +262,25 @@ func (c *Client) listen() {
229262
}
230263
}
231264

265+
// echoLoop starts a loop that sends echo RPCs at the interval defined by d.
266+
func (c *Client) echoLoop(ctx context.Context, d time.Duration) {
267+
t := time.NewTicker(d)
268+
defer t.Stop()
269+
270+
for {
271+
select {
272+
case <-ctx.Done():
273+
return
274+
case <-t.C:
275+
}
276+
277+
// No feasible way to handle errors here. In the future, it may be
278+
// possible to do something like re-establishing the connection.
279+
// TOOD(mdlayher): improve error handling for echo loop.
280+
_ = c.Echo(ctx)
281+
}
282+
}
283+
232284
// A callback can be used to send a message back to a caller, or
233285
// allow the caller to cancel waiting for a message.
234286
type callback struct {

ovsdb/client_test.go

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"log"
2222
"os"
23+
"sync/atomic"
2324
"testing"
2425
"time"
2526

@@ -182,12 +183,61 @@ func TestClientLeakCallbacks(t *testing.T) {
182183
}
183184
}
184185

185-
func testClient(t *testing.T, fn jsonrpc.TestFunc) (*ovsdb.Client, chan<- *jsonrpc.Response, func()) {
186+
func TestClientEchoLoop(t *testing.T) {
187+
if testing.Short() {
188+
t.Skip("skipping during short test run")
189+
}
190+
191+
// Count the number of requests sent to the server.
192+
echo := ovsdb.EchoInterval(50 * time.Millisecond)
193+
var reqID int64
194+
195+
_, _, done := testClient(t, func(req jsonrpc.Request) jsonrpc.Response {
196+
if diff := cmp.Diff("echo", req.Method); diff != "" {
197+
panicf("unexpected RPC method (-want +got):\n%s", diff)
198+
}
199+
200+
// Keep incrementing the request ID to match the client.
201+
id := int(atomic.AddInt64(&reqID, 1))
202+
return jsonrpc.Response{
203+
ID: &id,
204+
Result: mustMarshalJSON(t, req.Params),
205+
}
206+
}, echo)
207+
defer done()
208+
209+
// Fail the test if the RPCs don't fire.
210+
timer := time.AfterFunc(2*time.Second, func() {
211+
panicf("took too long to wait for echo RPCs")
212+
})
213+
defer timer.Stop()
214+
215+
// Ensure that background echo RPCs are being sent.
216+
tick := time.NewTicker(100 * time.Millisecond)
217+
defer tick.Stop()
218+
219+
for {
220+
// Just wait for a handful of RPCs to be sent before success.
221+
<-tick.C
222+
if atomic.LoadInt64(&reqID) > 5 {
223+
break
224+
}
225+
}
226+
}
227+
228+
func testClient(t *testing.T, fn jsonrpc.TestFunc, options ...ovsdb.OptionFunc) (*ovsdb.Client, chan<- *jsonrpc.Response, func()) {
186229
t.Helper()
187230

231+
// Prepend a verbose logger so the caller can override it easily.
232+
if testing.Verbose() {
233+
options = append([]ovsdb.OptionFunc{
234+
ovsdb.Debug(log.New(os.Stderr, "", 0)),
235+
}, options...)
236+
}
237+
188238
conn, notifC, done := jsonrpc.TestNetConn(t, fn)
189239

190-
c, err := ovsdb.New(conn, ovsdb.Debug(log.New(os.Stderr, "", 0)))
240+
c, err := ovsdb.New(conn, options...)
191241
if err != nil {
192242
t.Fatalf("failed to dial: %v", err)
193243
}

0 commit comments

Comments
 (0)