Skip to content

Commit 34ae336

Browse files
committed
Merge branch 'release/v0.3.1'
2 parents 916b031 + c90be4d commit 34ae336

File tree

9 files changed

+225
-154
lines changed

9 files changed

+225
-154
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
## v0.3.1 - 14 June 2014
4+
5+
- Fixed "Token ## not in stream cache" error (#103)
6+
- Changed Exec to no longer use NoReply. It now waits for the server to respond.
7+
38
## v0.3 (RethinkDB v1.13) - 26 June 2014
49

510
- Replaced `ResultRows`/`ResultRow` with `Cursor`, `Cursor` has the `Next`, `All` and `One` methods which stores the relevant value in the value pointed at by result. For more information check the examples.

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ GoRethink - RethinkDB Driver for Go [![wercker status](https://app.wercker.com/s
44
[Go](http://golang.org/) driver for [RethinkDB](http://www.rethinkdb.com/) made by [Daniel Cannon](http://github.com/dancannon) and based off of Christopher Hesse's [RethinkGo](https://github.com/christopherhesse/rethinkgo) driver.
55

66

7-
Current version: v0.3 (RethinkDB v1.13)
7+
Current version: v0.3.1 (RethinkDB v1.13)
88

99
**Version 0.3 introduced some API changes, for more information check the [change log](CHANGELOG.md)**
1010

connection.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"io"
88
"net"
9+
"sync"
910
"time"
1011

1112
"code.google.com/p/goprotobuf/proto"
@@ -23,7 +24,9 @@ type Connection struct {
2324
// embed the net.Conn type, so that we can effectively define new methods on
2425
// it (interfaces do not allow that)
2526
net.Conn
27+
s *Session
2628

29+
sync.Mutex
2730
closed bool
2831
}
2932

@@ -73,7 +76,10 @@ func Dial(s *Session) (*Connection, error) {
7376
return nil, RqlDriverError{fmt.Sprintf("Server dropped connection with message: \"%s\"", response)}
7477
}
7578

76-
return &Connection{conn, false}, nil
79+
return &Connection{
80+
s: s,
81+
Conn: conn,
82+
}, nil
7783
}
7884

7985
func TestOnBorrow(c *Connection, t time.Time) error {
@@ -112,9 +118,9 @@ func (c *Connection) ReadResponse(s *Session, token int64) (*p.Response, error)
112118

113119
if response.GetToken() == token {
114120
return response, nil
115-
} else if _, ok := s.cache[response.GetToken()]; ok {
121+
} else if cursor, ok := s.checkCache(token); ok {
116122
// Handle batch response
117-
s.handleBatchResponse(response)
123+
s.handleBatchResponse(cursor, response)
118124
} else {
119125
return nil, RqlDriverError{"Unexpected response received"}
120126
}
@@ -153,6 +159,7 @@ func (c *Connection) SendQuery(s *Session, q *p.Query, t Term, opts map[string]i
153159

154160
// Return immediately if the noreply option was set
155161
if noreply, ok := opts["noreply"]; ok && noreply.(bool) {
162+
c.Close()
156163
return nil, nil
157164
} else if async {
158165
return nil, nil
@@ -185,17 +192,14 @@ func (c *Connection) SendQuery(s *Session, q *p.Query, t Term, opts map[string]i
185192
case p.Response_SUCCESS_PARTIAL, p.Response_SUCCESS_SEQUENCE, p.Response_SUCCESS_FEED:
186193
cursor := &Cursor{
187194
session: s,
195+
conn: c,
188196
query: q,
189197
term: t,
190198
opts: opts,
191199
profile: profile,
192-
timeout: -1,
193200
}
194-
cursor.gotReply.L = &cursor.mu
195201

196-
s.Lock()
197-
s.cache[*q.Token] = cursor
198-
s.Unlock()
202+
s.setCache(*q.Token, cursor)
199203

200204
cursor.extend(response)
201205

@@ -230,15 +234,14 @@ func (c *Connection) SendQuery(s *Session, q *p.Query, t Term, opts map[string]i
230234

231235
cursor := &Cursor{
232236
session: s,
237+
conn: c,
233238
query: q,
234239
term: t,
235240
opts: opts,
236241
profile: profile,
237242
buffer: value,
238243
finished: true,
239-
timeout: -1,
240244
}
241-
cursor.gotReply.L = &cursor.mu
242245

243246
return cursor, nil
244247
case p.Response_WAIT_COMPLETE:
@@ -249,7 +252,19 @@ func (c *Connection) SendQuery(s *Session, q *p.Query, t Term, opts map[string]i
249252
}
250253

251254
func (c *Connection) Close() error {
255+
err := c.s.noreplyWaitQuery()
256+
if err != nil {
257+
return err
258+
}
259+
260+
return c.CloseNoWait()
261+
}
262+
263+
func (c *Connection) CloseNoWait() error {
264+
c.Lock()
252265
c.closed = true
266+
c.Unlock()
267+
253268
return c.Conn.Close()
254269
}
255270

doc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Go driver for RethinkDB
22
//
3-
// Current version: v0.3 (RethinkDB v1.13)
3+
// Current version: v0.3.1 (RethinkDB v1.13)
44
// For more in depth information on how to use RethinkDB check out the API docs
55
// at http://rethinkdb.com/api
66
package gorethink

query.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -202,18 +202,16 @@ func (t Term) RunWrite(s *Session, optArgs ...RunOpts) (WriteResponse, error) {
202202
return response, err
203203
}
204204

205-
// Exec runs the query but does not return the result (It also automatically sets
206-
// the noreply option).
205+
// Exec runs the query but does not return the result.
207206
func (t Term) Exec(s *Session, optArgs ...RunOpts) error {
208-
// Ensure that noreply is set to true
209-
if len(optArgs) >= 1 {
210-
optArgs[0].NoReply = true
211-
} else {
212-
optArgs = append(optArgs, RunOpts{
213-
NoReply: true,
214-
})
207+
res, err := t.Run(s, optArgs...)
208+
if err != nil {
209+
return err
210+
}
211+
err = res.Close()
212+
if err != nil {
213+
return err
215214
}
216215

217-
_, err := t.Run(s, optArgs...)
218-
return err
216+
return nil
219217
}

query_control_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package gorethink
22

33
import (
4+
"testing"
45
"time"
56

67
test "launchpad.net/gocheck"
@@ -152,6 +153,10 @@ func (s *RethinkSuite) TestControlJs(c *test.C) {
152153
}
153154

154155
func (s *RethinkSuite) TestControlHttp(c *test.C) {
156+
if testing.Short() {
157+
c.Skip("-short set")
158+
}
159+
155160
var response map[string]interface{}
156161
query := Http("httpbin.org/get?data=1")
157162
res, err := query.Run(sess)
@@ -179,7 +184,7 @@ func (s *RethinkSuite) TestControlJson(c *test.C) {
179184

180185
func (s *RethinkSuite) TestControlError(c *test.C) {
181186
query := Error("An error occurred")
182-
_, err := query.Run(sess)
187+
err := query.Exec(sess)
183188
c.Assert(err, test.NotNil)
184189

185190
c.Assert(err, test.NotNil)

query_select_test.go

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package gorethink
22

3-
import test "launchpad.net/gocheck"
3+
import (
4+
"fmt"
5+
6+
test "launchpad.net/gocheck"
7+
)
48

59
func (s *RethinkSuite) TestSelectGet(c *test.C) {
610
// Ensure table + database exist
@@ -300,3 +304,58 @@ func (s *RethinkSuite) TestSelectMany(c *test.C) {
300304
c.Assert(res.Err(), test.IsNil)
301305
c.Assert(n, test.Equals, 100)
302306
}
307+
308+
func (s *RethinkSuite) TestSelectManyConcurrent(c *test.C) {
309+
// Ensure table + database exist
310+
DbCreate("test").RunWrite(sess)
311+
Db("test").TableCreate("TestMany").RunWrite(sess)
312+
Db("test").Table("TestMany").Delete().RunWrite(sess)
313+
314+
// Insert rows
315+
for i := 0; i < 1; i++ {
316+
data := []interface{}{}
317+
318+
for j := 0; j < 100; j++ {
319+
data = append(data, map[string]interface{}{
320+
"i": i,
321+
"j": j,
322+
})
323+
}
324+
325+
Db("test").Table("TestMany").Insert(data).Run(sess)
326+
}
327+
328+
// Test queries concurrently
329+
attempts := 1
330+
waitChannel := make(chan error, attempts)
331+
332+
for i := 0; i < attempts; i++ {
333+
go func(i int, c chan error) {
334+
res, err := Db("test").Table("TestMany").Run(sess, RunOpts{
335+
BatchConf: map[string]interface{}{"max_els": 5, "max_size": 20},
336+
})
337+
if err != nil {
338+
c <- err
339+
}
340+
341+
var response []map[string]interface{}
342+
err = res.All(&response)
343+
if err != nil {
344+
c <- err
345+
}
346+
347+
if len(response) != 100 {
348+
c <- fmt.Errorf("expected response length 100, received %d", len(response))
349+
}
350+
351+
c <- nil
352+
}(i, waitChannel)
353+
}
354+
355+
for i := 0; i < attempts; i++ {
356+
ret := <-waitChannel
357+
if ret != nil {
358+
c.Fatal("non-nil error returned (%s)", ret)
359+
}
360+
}
361+
}

0 commit comments

Comments
 (0)