Skip to content

Commit e31dda1

Browse files
authored
Merge pull request #422 from CMogilko/master
Cancelling queries
2 parents d99923c + 3913668 commit e31dda1

File tree

12 files changed

+141
-71
lines changed

12 files changed

+141
-71
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,12 @@ r.Table("test").Insert(doc, r.InsertOpts{
211211

212212
As shown above in the Between example optional arguments are passed to the function as a struct. Each function that has optional arguments as a related struct. This structs are named in the format FunctionNameOpts, for example BetweenOpts is the related struct for Between.
213213

214+
#### Cancelling queries
215+
216+
For query cancellation use `Context` argument at `RunOpts`. If `Context` is `nil` and `ReadTimeout` or `WriteTimeout` is not 0 from `ConnectionOpts`, `Context` will be formed by summation of these timeouts.
217+
218+
For unlimited timeouts for `Changes()` pass `context.Background()`.
219+
214220
## Results
215221

216222
Different result types are returned depending on what function is used to execute the query.

cluster.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/Sirupsen/logrus"
1111
"github.com/cenkalti/backoff"
1212
"github.com/hailocab/go-hostpool"
13+
"golang.org/x/net/context"
1314
)
1415

1516
// A Cluster represents a connection to a RethinkDB cluster, a cluster is created
@@ -57,7 +58,7 @@ func NewCluster(hosts []Host, opts *ConnectOpts) (*Cluster, error) {
5758
}
5859

5960
// Query executes a ReQL query using the cluster to connect to the database
60-
func (c *Cluster) Query(q Query) (cursor *Cursor, err error) {
61+
func (c *Cluster) Query(ctx context.Context, q Query) (cursor *Cursor, err error) {
6162
for i := 0; i < c.numRetries(); i++ {
6263
var node *Node
6364
var hpr hostpool.HostPoolResponse
@@ -67,7 +68,7 @@ func (c *Cluster) Query(q Query) (cursor *Cursor, err error) {
6768
return nil, err
6869
}
6970

70-
cursor, err = node.Query(q)
71+
cursor, err = node.Query(ctx, q)
7172
hpr.Mark(err)
7273

7374
if !shouldRetryQuery(q, err) {
@@ -79,7 +80,7 @@ func (c *Cluster) Query(q Query) (cursor *Cursor, err error) {
7980
}
8081

8182
// Exec executes a ReQL query using the cluster to connect to the database
82-
func (c *Cluster) Exec(q Query) (err error) {
83+
func (c *Cluster) Exec(ctx context.Context, q Query) (err error) {
8384
for i := 0; i < c.numRetries(); i++ {
8485
var node *Node
8586
var hpr hostpool.HostPoolResponse
@@ -89,7 +90,7 @@ func (c *Cluster) Exec(q Query) (err error) {
8990
return err
9091
}
9192

92-
err = node.Exec(q)
93+
err = node.Exec(ctx, q)
9394
hpr.Mark(err)
9495

9596
if !shouldRetryQuery(q, err) {
@@ -204,7 +205,7 @@ func (c *Cluster) listenForNodeChanges() error {
204205
return fmt.Errorf("Error building query: %s", err)
205206
}
206207

207-
cursor, err := node.Query(q)
208+
cursor, err := node.Query(context.Background(), q) // no need for timeout due to Changes()
208209
if err != nil {
209210
hpr.Mark(err)
210211
return err
@@ -279,7 +280,7 @@ func (c *Cluster) connectNodes(hosts []Host) error {
279280
continue
280281
}
281282

282-
_, cursor, err := conn.Query(q)
283+
_, cursor, err := conn.Query(nil, q) // nil = connection opts' timeout
283284
if err != nil {
284285
attemptErr = err
285286
Log.Warnf("Error fetching cluster status: %s", err)

connection.go

Lines changed: 57 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sync/atomic"
1111
"time"
1212

13+
"golang.org/x/net/context"
1314
p "gopkg.in/gorethink/gorethink.v3/ql2"
1415
)
1516

@@ -104,7 +105,11 @@ func (c *Connection) Close() error {
104105
// Cursor which should be used to view the query's response.
105106
//
106107
// This function is used internally by Run which should be used for most queries.
107-
func (c *Connection) Query(q Query) (*Response, *Cursor, error) {
108+
func (c *Connection) Query(ctx context.Context, q Query) (*Response, *Cursor, error) {
109+
if ctx == nil {
110+
ctx = c.contextFromConnectionOpts()
111+
}
112+
108113
if c == nil {
109114
return nil, nil, ErrConnectionClosed
110115
}
@@ -131,30 +136,51 @@ func (c *Connection) Query(q Query) (*Response, *Cursor, error) {
131136
}
132137
c.mu.Unlock()
133138

134-
err := c.sendQuery(q)
135-
if err != nil {
136-
return nil, nil, err
137-
}
139+
var response *Response
140+
var cursor *Cursor
141+
var errchan chan error = make(chan error, 1)
142+
go func() {
143+
err := c.sendQuery(q)
144+
if err != nil {
145+
errchan <- err
146+
return
147+
}
138148

139-
if noreply, ok := q.Opts["noreply"]; ok && noreply.(bool) {
140-
return nil, nil, nil
141-
}
149+
if noreply, ok := q.Opts["noreply"]; ok && noreply.(bool) {
150+
errchan <- nil
151+
return
152+
}
142153

143-
for {
144-
response, err := c.readResponse()
145-
if err != nil {
146-
return nil, nil, err
154+
for {
155+
response, err := c.readResponse()
156+
if err != nil {
157+
errchan <- err
158+
return
159+
}
160+
161+
if response.Token == q.Token {
162+
// If this was the requested response process and return
163+
response, cursor, err = c.processResponse(ctx, q, response)
164+
errchan <- err
165+
return
166+
} else if _, ok := c.cursors[response.Token]; ok {
167+
// If the token is in the cursor cache then process the response
168+
c.processResponse(ctx, q, response)
169+
} else {
170+
putResponse(response)
171+
}
147172
}
173+
}()
148174

149-
if response.Token == q.Token {
150-
// If this was the requested response process and return
151-
return c.processResponse(q, response)
152-
} else if _, ok := c.cursors[response.Token]; ok {
153-
// If the token is in the cursor cache then process the response
154-
c.processResponse(q, response)
155-
} else {
156-
putResponse(response)
175+
select {
176+
case err := <-errchan:
177+
return response, cursor, err
178+
case <-ctx.Done():
179+
if q.Type != p.Query_STOP {
180+
stopQuery := newStopQuery(q.Token)
181+
c.Query(c.contextFromConnectionOpts(), stopQuery)
157182
}
183+
return nil, nil, ErrQueryTimeout
158184
}
159185
}
160186

@@ -167,7 +193,7 @@ type ServerResponse struct {
167193
func (c *Connection) Server() (ServerResponse, error) {
168194
var response ServerResponse
169195

170-
_, cur, err := c.Query(Query{
196+
_, cur, err := c.Query(c.contextFromConnectionOpts(), Query{
171197
Type: p.Query_SERVER_INFO,
172198
})
173199
if err != nil {
@@ -255,7 +281,7 @@ func (c *Connection) readResponse() (*Response, error) {
255281
return response, nil
256282
}
257283

258-
func (c *Connection) processResponse(q Query, response *Response) (*Response, *Cursor, error) {
284+
func (c *Connection) processResponse(ctx context.Context, q Query, response *Response) (*Response, *Cursor, error) {
259285
switch response.Type {
260286
case p.Response_CLIENT_ERROR:
261287
return c.processErrorResponse(q, response, RQLClientError{rqlServerError{response, q.Term}})
@@ -264,11 +290,11 @@ func (c *Connection) processResponse(q Query, response *Response) (*Response, *C
264290
case p.Response_RUNTIME_ERROR:
265291
return c.processErrorResponse(q, response, createRuntimeError(response.ErrorType, response, q.Term))
266292
case p.Response_SUCCESS_ATOM, p.Response_SERVER_INFO:
267-
return c.processAtomResponse(q, response)
293+
return c.processAtomResponse(ctx, q, response)
268294
case p.Response_SUCCESS_PARTIAL:
269-
return c.processPartialResponse(q, response)
295+
return c.processPartialResponse(ctx, q, response)
270296
case p.Response_SUCCESS_SEQUENCE:
271-
return c.processSequenceResponse(q, response)
297+
return c.processSequenceResponse(ctx, q, response)
272298
case p.Response_WAIT_COMPLETE:
273299
return c.processWaitResponse(q, response)
274300
default:
@@ -287,17 +313,17 @@ func (c *Connection) processErrorResponse(q Query, response *Response, err error
287313
return response, cursor, err
288314
}
289315

290-
func (c *Connection) processAtomResponse(q Query, response *Response) (*Response, *Cursor, error) {
316+
func (c *Connection) processAtomResponse(ctx context.Context, q Query, response *Response) (*Response, *Cursor, error) {
291317
// Create cursor
292-
cursor := newCursor(c, "Cursor", response.Token, q.Term, q.Opts)
318+
cursor := newCursor(ctx, c, "Cursor", response.Token, q.Term, q.Opts)
293319
cursor.profile = response.Profile
294320

295321
cursor.extend(response)
296322

297323
return response, cursor, nil
298324
}
299325

300-
func (c *Connection) processPartialResponse(q Query, response *Response) (*Response, *Cursor, error) {
326+
func (c *Connection) processPartialResponse(ctx context.Context, q Query, response *Response) (*Response, *Cursor, error) {
301327
cursorType := "Cursor"
302328
if len(response.Notes) > 0 {
303329
switch response.Notes[0] {
@@ -318,7 +344,7 @@ func (c *Connection) processPartialResponse(q Query, response *Response) (*Respo
318344
cursor, ok := c.cursors[response.Token]
319345
if !ok {
320346
// Create a new cursor if needed
321-
cursor = newCursor(c, cursorType, response.Token, q.Term, q.Opts)
347+
cursor = newCursor(ctx, c, cursorType, response.Token, q.Term, q.Opts)
322348
cursor.profile = response.Profile
323349

324350
c.cursors[response.Token] = cursor
@@ -330,12 +356,12 @@ func (c *Connection) processPartialResponse(q Query, response *Response) (*Respo
330356
return response, cursor, nil
331357
}
332358

333-
func (c *Connection) processSequenceResponse(q Query, response *Response) (*Response, *Cursor, error) {
359+
func (c *Connection) processSequenceResponse(ctx context.Context, q Query, response *Response) (*Response, *Cursor, error) {
334360
c.mu.Lock()
335361
cursor, ok := c.cursors[response.Token]
336362
if !ok {
337363
// Create a new cursor if needed
338-
cursor = newCursor(c, "Cursor", response.Token, q.Term, q.Opts)
364+
cursor = newCursor(ctx, c, "Cursor", response.Token, q.Term, q.Opts)
339365
cursor.profile = response.Profile
340366
}
341367

connection_helper.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package gorethink
22

3-
import "encoding/binary"
3+
import (
4+
"encoding/binary"
5+
"golang.org/x/net/context"
6+
)
47

58
// Write 'data' to conn
69
func (c *Connection) writeData(data []byte) error {
@@ -39,3 +42,12 @@ func (c *Connection) writeQuery(token int64, q []byte) error {
3942

4043
return c.writeData(data)
4144
}
45+
46+
func (c *Connection) contextFromConnectionOpts() context.Context {
47+
sum := c.opts.ReadTimeout + c.opts.WriteTimeout
48+
if sum == 0 {
49+
return context.Background()
50+
}
51+
ctx, _ := context.WithTimeout(context.Background(), sum)
52+
return ctx
53+
}

cursor.go

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"reflect"
88
"sync"
99

10+
"golang.org/x/net/context"
1011
"gopkg.in/gorethink/gorethink.v3/encoding"
1112
p "gopkg.in/gorethink/gorethink.v3/ql2"
1213
)
@@ -16,7 +17,7 @@ var (
1617
errCursorClosed = errors.New("connection closed, cannot read cursor")
1718
)
1819

19-
func newCursor(conn *Connection, cursorType string, token int64, term *Term, opts map[string]interface{}) *Cursor {
20+
func newCursor(ctx context.Context, conn *Connection, cursorType string, token int64, term *Term, opts map[string]interface{}) *Cursor {
2021
if cursorType == "" {
2122
cursorType = "Cursor"
2223
}
@@ -35,6 +36,7 @@ func newCursor(conn *Connection, cursorType string, token int64, term *Term, opt
3536
opts: opts,
3637
buffer: make([]interface{}, 0),
3738
responses: make([]json.RawMessage, 0),
39+
ctx: ctx,
3840
}
3941

4042
return cursor
@@ -64,6 +66,7 @@ type Cursor struct {
6466
cursorType string
6567
term *Term
6668
opts map[string]interface{}
69+
ctx context.Context
6770

6871
mu sync.RWMutex
6972
lastErr error
@@ -145,15 +148,7 @@ func (c *Cursor) Close() error {
145148

146149
// Stop any unfinished queries
147150
if !c.finished {
148-
q := Query{
149-
Type: p.Query_STOP,
150-
Token: c.token,
151-
Opts: map[string]interface{}{
152-
"noreply": true,
153-
},
154-
}
155-
156-
_, _, err = conn.Query(q)
151+
_, _, err = conn.Query(c.ctx, newStopQuery(c.token))
157152
}
158153

159154
if c.releaseConn != nil {
@@ -552,7 +547,7 @@ func (c *Cursor) fetchMore() error {
552547
}
553548

554549
c.mu.Unlock()
555-
_, _, err = c.conn.Query(q)
550+
_, _, err = c.conn.Query(c.ctx, q)
556551
c.mu.Lock()
557552
}
558553

errors.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ var (
2525
// ErrConnectionClosed is returned when trying to send a query with a closed
2626
// connection.
2727
ErrConnectionClosed = errors.New("gorethink: the connection is closed")
28+
// ErrQueryTimeout is returned when query context deadline exceeded.
29+
ErrQueryTimeout = errors.New("gorethink: query timeout")
2830
)
2931

3032
func printCarrots(t Term, frames []*p.Frame) string {

mock.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"sync"
88
"time"
99

10+
"golang.org/x/net/context"
1011
p "gopkg.in/gorethink/gorethink.v3/ql2"
1112
)
1213

@@ -290,7 +291,7 @@ func (m *Mock) IsConnected() bool {
290291
return true
291292
}
292293

293-
func (m *Mock) Query(q Query) (*Cursor, error) {
294+
func (m *Mock) Query(ctx context.Context, q Query) (*Cursor, error) {
294295
found, query := m.findExpectedQuery(q)
295296

296297
if found < 0 {
@@ -328,7 +329,7 @@ func (m *Mock) Query(q Query) (*Cursor, error) {
328329
}
329330

330331
// Build cursor and return
331-
c := newCursor(nil, "", query.Query.Token, query.Query.Term, query.Query.Opts)
332+
c := newCursor(ctx, nil, "", query.Query.Token, query.Query.Term, query.Query.Opts)
332333
c.finished = true
333334
c.fetching = false
334335
c.isAtom = true
@@ -345,8 +346,8 @@ func (m *Mock) Query(q Query) (*Cursor, error) {
345346
return c, nil
346347
}
347348

348-
func (m *Mock) Exec(q Query) error {
349-
_, err := m.Query(q)
349+
func (m *Mock) Exec(ctx context.Context, q Query) error {
350+
_, err := m.Query(ctx, q)
350351

351352
return err
352353
}

0 commit comments

Comments
 (0)