Skip to content

Commit 1d288a8

Browse files
committed
ovsdb: increment counters for background echo RPC status
1 parent 2ee46ab commit 1d288a8

File tree

2 files changed

+62
-12
lines changed

2 files changed

+62
-12
lines changed

ovsdb/client.go

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ type Client struct {
4242
cbMu sync.RWMutex
4343
callbacks map[int]callback
4444

45-
// Interval at which echo RPCs should occur in the background.
46-
echoInterval time.Duration
45+
// Interval at which echo RPCs should occur in the background, and statistics
46+
// about the echo loop.
47+
echoInterval time.Duration
48+
echoOK, echoFail *int64
4749

4850
// Track and clean up background goroutines.
4951
cancel func()
@@ -110,6 +112,10 @@ func New(conn net.Conn, options ...OptionFunc) (*Client, error) {
110112
wg.Add(1)
111113

112114
// If configured, send echo RPCs in the background at a fixed interval.
115+
var echoOK, echoFail int64
116+
client.echoOK = &echoOK
117+
client.echoFail = &echoFail
118+
113119
if d := client.echoInterval; d != 0 {
114120
wg.Add(1)
115121
go func() {
@@ -136,8 +142,8 @@ func (c *Client) requestID() int {
136142

137143
// Close closes a Client's connection and cleans up its resources.
138144
func (c *Client) Close() error {
139-
err := c.c.Close()
140145
c.cancel()
146+
err := c.c.Close()
141147
c.wg.Wait()
142148
return err
143149
}
@@ -147,9 +153,11 @@ func (c *Client) Stats() ClientStats {
147153
var s ClientStats
148154

149155
c.cbMu.RLock()
150-
defer c.cbMu.RUnlock()
151-
152156
s.Callbacks.Current = len(c.callbacks)
157+
c.cbMu.RUnlock()
158+
159+
s.EchoLoop.Success = int(atomic.LoadInt64(c.echoOK))
160+
s.EchoLoop.Failure = int(atomic.LoadInt64(c.echoFail))
153161

154162
return s
155163
}
@@ -162,6 +170,13 @@ type ClientStats struct {
162170
// for RPC responses.
163171
Current int
164172
}
173+
174+
// Statistics about the Client's internal echo RPC loop.
175+
// Note that all counters will be zero if the echo loop is disabled.
176+
EchoLoop struct {
177+
// The number of successful and failed echo RPCs in the loop.
178+
Success, Failure int
179+
}
165180
}
166181

167182
// rpc performs a single RPC request, and checks the response for errors.
@@ -237,7 +252,7 @@ func (c *Client) listen() {
237252
res, err := c.c.Receive()
238253
if err != nil {
239254
// EOF or closed connection means time to stop serving.
240-
if err == io.EOF || strings.Contains(err.Error(), "use of closed network") {
255+
if err == io.EOF || isClosedNetwork(err) {
241256
return
242257
}
243258

@@ -268,16 +283,34 @@ func (c *Client) echoLoop(ctx context.Context, d time.Duration) {
268283
defer t.Stop()
269284

270285
for {
286+
// If context is canceled, we should exit this loop. If a tick is fired
287+
// and the context was already canceled, we exit there as well.
271288
select {
272289
case <-ctx.Done():
273290
return
274291
case <-t.C:
292+
if err := ctx.Err(); err != nil {
293+
return
294+
}
275295
}
276296

277-
// No feasible way to handle errors here. In the future, it may be
278-
// possible to do something like re-establishing the connection.
279-
// TOOD(mdlayher): improve error handling for echo loop.
280-
_ = c.Echo(ctx)
297+
// For the time being, we will track metrics about the number of successes
298+
// and failures while sending echo RPCs.
299+
// TODO(mdlayher): improve error handling for echo loop.
300+
if err := c.Echo(ctx); err != nil {
301+
if isClosedNetwork(err) {
302+
// Our socket was closed, which means the context should be canceled
303+
// and we should terminate on the next loop. No need to increment
304+
// errors counter.
305+
continue
306+
}
307+
308+
// Count other errors as failures.
309+
atomic.AddInt64(c.echoFail, 1)
310+
continue
311+
}
312+
313+
atomic.AddInt64(c.echoOK, 1)
281314
}
282315
}
283316

@@ -327,6 +360,16 @@ func (c *Client) doCallback(id int, res rpcResponse) {
327360
delete(c.callbacks, id)
328361
}
329362

363+
// isClosedNetwork checks for errors caused by a closed network connection.
364+
func isClosedNetwork(err error) bool {
365+
if err == nil {
366+
return false
367+
}
368+
369+
// Not an awesome solution, but see: https://github.com/golang/go/issues/4373.
370+
return strings.Contains(err.Error(), "use of closed network connection")
371+
}
372+
330373
func panicf(format string, a ...interface{}) {
331374
panic(fmt.Sprintf(format, a...))
332375
}

ovsdb/client_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func TestClientEchoLoop(t *testing.T) {
192192
echo := ovsdb.EchoInterval(50 * time.Millisecond)
193193
var reqID int64
194194

195-
_, _, done := testClient(t, func(req jsonrpc.Request) jsonrpc.Response {
195+
c, _, done := testClient(t, func(req jsonrpc.Request) jsonrpc.Response {
196196
if diff := cmp.Diff("echo", req.Method); diff != "" {
197197
panicf("unexpected RPC method (-want +got):\n%s", diff)
198198
}
@@ -219,7 +219,14 @@ func TestClientEchoLoop(t *testing.T) {
219219
for {
220220
// Just wait for a handful of RPCs to be sent before success.
221221
<-tick.C
222-
if atomic.LoadInt64(&reqID) > 5 {
222+
223+
stats := c.Stats()
224+
225+
if n := stats.EchoLoop.Failure; n > 0 {
226+
t.Fatalf("echo loop RPC failed %d times", n)
227+
}
228+
229+
if n := stats.EchoLoop.Success; n > 5 {
223230
break
224231
}
225232
}

0 commit comments

Comments
 (0)