Skip to content

Commit 04e70e6

Browse files
committed
ovsdb: thread context through callbacks
1 parent 83192a7 commit 04e70e6

File tree

1 file changed

+28
-11
lines changed

1 file changed

+28
-11
lines changed

ovsdb/client.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type Client struct {
3737

3838
// Callbacks for RPC responses.
3939
cbMu sync.RWMutex
40-
callbacks map[int]chan rpcResponse
40+
callbacks map[int]callback
4141

4242
wg *sync.WaitGroup
4343
}
@@ -80,7 +80,7 @@ func New(conn net.Conn, options ...OptionFunc) (*Client, error) {
8080
client.c = jsonrpc.NewConn(conn, client.ll)
8181

8282
// Set up callbacks.
83-
client.callbacks = make(map[int]chan rpcResponse)
83+
client.callbacks = make(map[int]callback)
8484

8585
// Start up any background routines.
8686
var wg sync.WaitGroup
@@ -134,8 +134,10 @@ func (c *Client) rpc(ctx context.Context, method string, out interface{}, args .
134134

135135
// Add callback for this RPC ID to return results via channel.
136136
ch := make(chan rpcResponse, 1)
137-
defer close(ch)
138-
c.addCallback(req.ID, ch)
137+
c.addCallback(req.ID, callback{
138+
Ctx: ctx,
139+
Response: ch,
140+
})
139141

140142
if err := c.c.Send(req); err != nil {
141143
return err
@@ -184,9 +186,15 @@ func (c *Client) listen() {
184186
}
185187
}
186188

187-
// addCallback registers a callback for an RPC response for the specified ID,
188-
// and accepts a channel to return the results on.
189-
func (c *Client) addCallback(id int, ch chan rpcResponse) {
189+
// A callback can be used to send a message back to a caller, or
190+
// allow the caller to cancel waiting for a message.
191+
type callback struct {
192+
Ctx context.Context
193+
Response chan rpcResponse
194+
}
195+
196+
// addCallback registers a callback for an RPC response for the specified ID.
197+
func (c *Client) addCallback(id int, cb callback) {
190198
c.cbMu.Lock()
191199
defer c.cbMu.Unlock()
192200

@@ -195,7 +203,7 @@ func (c *Client) addCallback(id int, ch chan rpcResponse) {
195203
panicf("OVSDB callback with ID %d already registered", id)
196204
}
197205

198-
c.callbacks[id] = ch
206+
c.callbacks[id] = cb
199207
}
200208

201209
// doCallback performs a callback for an RPC response and clears the
@@ -204,14 +212,23 @@ func (c *Client) doCallback(id int, res rpcResponse) {
204212
c.cbMu.Lock()
205213
defer c.cbMu.Unlock()
206214

207-
ch, ok := c.callbacks[id]
215+
cb, ok := c.callbacks[id]
208216
if !ok {
209217
// Nobody is listening to this callback.
210218
return
211219
}
212220

213-
// Return result and remove this callback.
214-
ch <- res
221+
// Producer can safely close channel on return.
222+
defer close(cb.Response)
223+
224+
// Wait for send or cancelation.
225+
select {
226+
case <-cb.Ctx.Done():
227+
// Request's context was canceled.
228+
case cb.Response <- res:
229+
// Message was successfully sent.
230+
}
231+
215232
delete(c.callbacks, id)
216233
}
217234

0 commit comments

Comments
 (0)