Skip to content

Commit 3907097

Browse files
authored
Merge pull request #16 from digitalocean/mdl-ovsdb-rpc-ctx
ovsdb: thread context through callbacks
2 parents 83192a7 + 24c679f commit 3907097

File tree

1 file changed

+36
-24
lines changed

1 file changed

+36
-24
lines changed

ovsdb/client.go

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type Client struct {
3737

3838
// Callbacks for RPC responses.
3939
cbMu sync.RWMutex
40-
callbacks map[int]chan rpcResponse
40+
callbacks map[int]callback
4141

4242
wg *sync.WaitGroup
4343
}
@@ -80,7 +80,7 @@ func New(conn net.Conn, options ...OptionFunc) (*Client, error) {
8080
client.c = jsonrpc.NewConn(conn, client.ll)
8181

8282
// Set up callbacks.
83-
client.callbacks = make(map[int]chan rpcResponse)
83+
client.callbacks = make(map[int]callback)
8484

8585
// Start up any background routines.
8686
var wg sync.WaitGroup
@@ -134,8 +134,10 @@ func (c *Client) rpc(ctx context.Context, method string, out interface{}, args .
134134

135135
// Add callback for this RPC ID to return results via channel.
136136
ch := make(chan rpcResponse, 1)
137-
defer close(ch)
138-
c.addCallback(req.ID, ch)
137+
c.addCallback(req.ID, callback{
138+
Ctx: ctx,
139+
Response: ch,
140+
})
139141

140142
if err := c.c.Send(req); err != nil {
141143
return err
@@ -144,9 +146,16 @@ func (c *Client) rpc(ctx context.Context, method string, out interface{}, args .
144146
// Await RPC completion or cancelation.
145147
select {
146148
case <-ctx.Done():
147-
// RPC canceled; clean up callback.
148-
return c.cancelCallback(ctx, req.ID)
149-
case res := <-ch:
149+
// RPC canceled. Producer cleans up the callback.
150+
return ctx.Err()
151+
case res, ok := <-ch:
152+
if !ok {
153+
// Channel was closed by producer after a context cancelation,
154+
// and woke up this consumer. The select statement happened
155+
// to pick this case even though the context was canceled.
156+
return ctx.Err()
157+
}
158+
150159
// RPC complete.
151160
return rpcResult(res, &r)
152161
}
@@ -184,9 +193,15 @@ func (c *Client) listen() {
184193
}
185194
}
186195

187-
// addCallback registers a callback for an RPC response for the specified ID,
188-
// and accepts a channel to return the results on.
189-
func (c *Client) addCallback(id int, ch chan rpcResponse) {
196+
// A callback can be used to send a message back to a caller, or
197+
// allow the caller to cancel waiting for a message.
198+
type callback struct {
199+
Ctx context.Context
200+
Response chan rpcResponse
201+
}
202+
203+
// addCallback registers a callback for an RPC response for the specified ID.
204+
func (c *Client) addCallback(id int, cb callback) {
190205
c.cbMu.Lock()
191206
defer c.cbMu.Unlock()
192207

@@ -195,7 +210,7 @@ func (c *Client) addCallback(id int, ch chan rpcResponse) {
195210
panicf("OVSDB callback with ID %d already registered", id)
196211
}
197212

198-
c.callbacks[id] = ch
213+
c.callbacks[id] = cb
199214
}
200215

201216
// doCallback performs a callback for an RPC response and clears the
@@ -204,27 +219,24 @@ func (c *Client) doCallback(id int, res rpcResponse) {
204219
c.cbMu.Lock()
205220
defer c.cbMu.Unlock()
206221

207-
ch, ok := c.callbacks[id]
222+
cb, ok := c.callbacks[id]
208223
if !ok {
209224
// Nobody is listening to this callback.
210225
return
211226
}
212227

213-
// Return result and remove this callback.
214-
ch <- res
215-
delete(c.callbacks, id)
216-
}
228+
// Producer can safely close channel on return.
229+
defer close(cb.Response)
217230

218-
// cancelCallback is invoked when an RPC is canceled by its context.
219-
func (c *Client) cancelCallback(ctx context.Context, id int) error {
220-
// RPC canceled; acquire the callback mutex and clean up the callback
221-
// for this RPC.
222-
c.cbMu.Lock()
223-
defer c.cbMu.Unlock()
231+
// Wait for send or cancelation.
232+
select {
233+
case <-cb.Ctx.Done():
234+
// Request's context was canceled.
235+
case cb.Response <- res:
236+
// Message was successfully sent.
237+
}
224238

225239
delete(c.callbacks, id)
226-
227-
return ctx.Err()
228240
}
229241

230242
func panicf(format string, a ...interface{}) {

0 commit comments

Comments
 (0)