Skip to content

Commit 56cae4b

Browse files
committed
ovsdb: fix callback leak when RPCs receive no response
1 parent 32040e0 commit 56cae4b

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

ovsdb/client.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,15 @@ func (c *Client) rpc(ctx context.Context, method string, out interface{}, args .
168168
// Await RPC completion or cancelation.
169169
select {
170170
case <-ctx.Done():
171-
// RPC canceled. Producer cleans up the callback.
171+
// RPC canceled. Clean up the callback in case no message ever arrives
172+
// with its request ID, so we don't leak callbacks. If the other case
173+
// in the select fires (meaning we got a matching RPC response), it's
174+
// the producer's responsibility to clean up the callback.
175+
c.cbMu.Lock()
176+
defer c.cbMu.Unlock()
177+
178+
delete(c.callbacks, req.ID)
179+
172180
return ctx.Err()
173181
case res, ok := <-ch:
174182
if !ok {

ovsdb/client_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,44 @@ func TestClientContextCancelDuringRPC(t *testing.T) {
144144
}
145145
}
146146

147+
func TestClientLeakCallbacks(t *testing.T) {
148+
if testing.Short() {
149+
t.Skip("skipping during short test run")
150+
}
151+
152+
c, _, done := testClient(t, func(_ jsonrpc.Request) jsonrpc.Response {
153+
// Only respond with messages that don't match an incoming request.
154+
return jsonrpc.Response{
155+
ID: intPtr(100),
156+
Result: mustMarshalJSON(t, []string{"foo"}),
157+
}
158+
})
159+
defer done()
160+
161+
// Expect no callbacks registered before RPCs, and none after RPCs time out.
162+
var want ovsdb.ClientStats
163+
want.Callbacks.Current = 0
164+
165+
if diff := cmp.Diff(want, c.Stats()); diff != "" {
166+
t.Fatalf("unexpected starting client stats (-want +got):\n%s", diff)
167+
}
168+
169+
for i := 0; i < 5; i++ {
170+
// Give enough time for an RPC to be sent so we don't immediately time out.
171+
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
172+
defer cancel()
173+
174+
_, err := c.ListDatabases(ctx)
175+
if err != context.DeadlineExceeded {
176+
t.Fatalf("expected context deadline exceeded error: %v", err)
177+
}
178+
}
179+
180+
if diff := cmp.Diff(want, c.Stats()); diff != "" {
181+
t.Fatalf("unexpected ending client stats (-want +got):\n%s", diff)
182+
}
183+
}
184+
147185
func testClient(t *testing.T, fn jsonrpc.TestFunc) (*ovsdb.Client, chan<- *jsonrpc.Response, func()) {
148186
t.Helper()
149187

0 commit comments

Comments
 (0)