Skip to content

Commit 01daf1c

Browse files
authored
Merge pull request #17 from digitalocean/mdl-client-stats
ovsdb: add ClientStats, fix callback leak
2 parents 3907097 + eacb484 commit 01daf1c

File tree

2 files changed

+75
-1
lines changed

2 files changed

+75
-1
lines changed

ovsdb/client.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,28 @@ func (c *Client) Close() error {
109109
return err
110110
}
111111

112+
// Stats returns a ClientStats with current statistics for the Cient.
113+
func (c *Client) Stats() ClientStats {
114+
var s ClientStats
115+
116+
c.cbMu.RLock()
117+
defer c.cbMu.RUnlock()
118+
119+
s.Callbacks.Current = len(c.callbacks)
120+
121+
return s
122+
}
123+
124+
// ClientStats contains statistics about a Client.
125+
type ClientStats struct {
126+
// Statistics about the Client's internal callbacks.
127+
Callbacks struct {
128+
// The number of callback hooks currently registered and waiting
129+
// for RPC responses.
130+
Current int
131+
}
132+
}
133+
112134
// rpc performs a single RPC request, and checks the response for errors.
113135
func (c *Client) rpc(ctx context.Context, method string, out interface{}, args ...interface{}) error {
114136
// Was the context canceled before sending the RPC?
@@ -139,14 +161,28 @@ func (c *Client) rpc(ctx context.Context, method string, out interface{}, args .
139161
Response: ch,
140162
})
141163

164+
// Ensure that the callback is always cleaned up on return from this function.
165+
// Note that this will result in the callback being deleted twice if the RPC
166+
// returns successfully, but that's okay; it's a no-op.
167+
//
168+
// TODO(mdlayher): a more robust solution around callback map modifications.
169+
defer func() {
170+
c.cbMu.Lock()
171+
defer c.cbMu.Unlock()
172+
173+
delete(c.callbacks, req.ID)
174+
}()
175+
142176
if err := c.c.Send(req); err != nil {
143177
return err
144178
}
145179

146180
// Await RPC completion or cancelation.
147181
select {
148182
case <-ctx.Done():
149-
// RPC canceled. Producer cleans up the callback.
183+
// RPC canceled. The callback is cleaned up by deferred function in
184+
// case no message ever arrives with its request ID, so we don't leak
185+
// callbacks.
150186
return ctx.Err()
151187
case res, ok := <-ch:
152188
if !ok {

ovsdb/client_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,44 @@ func TestClientContextCancelDuringRPC(t *testing.T) {
144144
}
145145
}
146146

147+
func TestClientLeakCallbacks(t *testing.T) {
148+
if testing.Short() {
149+
t.Skip("skipping during short test run")
150+
}
151+
152+
c, _, done := testClient(t, func(_ jsonrpc.Request) jsonrpc.Response {
153+
// Only respond with messages that don't match an incoming request.
154+
return jsonrpc.Response{
155+
ID: intPtr(100),
156+
Result: mustMarshalJSON(t, []string{"foo"}),
157+
}
158+
})
159+
defer done()
160+
161+
// Expect no callbacks registered before RPCs, and none after RPCs time out.
162+
var want ovsdb.ClientStats
163+
want.Callbacks.Current = 0
164+
165+
if diff := cmp.Diff(want, c.Stats()); diff != "" {
166+
t.Fatalf("unexpected starting client stats (-want +got):\n%s", diff)
167+
}
168+
169+
for i := 0; i < 5; i++ {
170+
// Give enough time for an RPC to be sent so we don't immediately time out.
171+
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
172+
defer cancel()
173+
174+
_, err := c.ListDatabases(ctx)
175+
if err != context.DeadlineExceeded {
176+
t.Fatalf("expected context deadline exceeded error: %v", err)
177+
}
178+
}
179+
180+
if diff := cmp.Diff(want, c.Stats()); diff != "" {
181+
t.Fatalf("unexpected ending client stats (-want +got):\n%s", diff)
182+
}
183+
}
184+
147185
func testClient(t *testing.T, fn jsonrpc.TestFunc) (*ovsdb.Client, chan<- *jsonrpc.Response, func()) {
148186
t.Helper()
149187

0 commit comments

Comments
 (0)