Skip to content

Commit 117ecc7

Browse files
committed
Merge branch 'develop' of https://github.com/rethinkdb/rethinkdb-go into develop
2 parents d19e335 + c201bf5 commit 117ecc7

File tree

4 files changed

+137
-16
lines changed

4 files changed

+137
-16
lines changed

README.md

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,39 @@ func TestSomething(t *testing.T) {
456456
}
457457
```
458458

459+
If you want the cursor to block on some of the response values, you can pass in
460+
a value of type `chan interface{}` and the cursor will block until a value is
461+
available to read on the channel. Or you can pass in a function with signature
462+
`func() interface{}`: the cursor will call the function (which may block). Here
463+
is the example above adapted to use a channel.
464+
465+
```go
466+
func TestSomething(t *testing.T) {
467+
mock := r.NewMock()
468+
ch := make(chan interface{})
469+
mock.On(r.Table("people")).Return([]interface{}{ch, ch}, nil)
470+
go func() {
471+
ch <- map[string]interface{}{"id": 1, "name": "John Smith"}
472+
ch <- map[string]interface{}{"id": 2, "name": "Jane Smith"}
473+
}()
474+
cursor, err := r.Table("people").Run(mock)
475+
if err != nil {
476+
t.Errorf("err is: %v", err)
477+
}
478+
479+
var rows []interface{}
480+
err = cursor.All(&rows)
481+
if err != nil {
482+
t.Errorf("err is: %v", err)
483+
}
484+
485+
// Test result of rows
486+
487+
mock.AssertExpectations(t)
488+
}
489+
490+
```
491+
459492
The mocking implementation is based on amazing https://github.com/stretchr/testify library, thanks to @stretchr for their awesome work!
460493

461494
## Benchmarks
@@ -464,17 +497,17 @@ Everyone wants their project's benchmarks to be speedy. And while we know that R
464497

465498
Thanks to @jaredfolkins for the contribution.
466499

467-
| Type | Value |
468-
| --- | --- |
469-
| **Model Name** | MacBook Pro |
470-
| **Model Identifier** | MacBookPro11,3 |
471-
| **Processor Name** | Intel Core i7 |
472-
| **Processor Speed** | 2.3 GHz |
473-
| **Number of Processors** | 1 |
474-
| **Total Number of Cores** | 4 |
475-
| **L2 Cache (per Core)** | 256 KB |
476-
| **L3 Cache** | 6 MB |
477-
| **Memory** | 16 GB |
500+
| Type | Value |
501+
| ------------------------- | -------------- |
502+
| **Model Name** | MacBook Pro |
503+
| **Model Identifier** | MacBookPro11,3 |
504+
| **Processor Name** | Intel Core i7 |
505+
| **Processor Speed** | 2.3 GHz |
506+
| **Number of Processors** | 1 |
507+
| **Total Number of Cores** | 4 |
508+
| **L2 Cache (per Core)** | 256 KB |
509+
| **L3 Cache** | 6 MB |
510+
| **Memory** | 16 GB |
478511

479512
```bash
480513
BenchmarkBatch200RandomWrites 20 557227775 ns/op

cursor.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,13 @@ func (c *Cursor) Next(dest interface{}) bool {
208208
return hasMore
209209
}
210210

211+
// This can be used for testing. Typically the function would block to simulate
212+
// network lag and test concurrency.
213+
type delayedData struct {
214+
f func() interface{} // This function produces the data
215+
data interface{} // This is initially nil, then equal to f()
216+
}
217+
211218
func (c *Cursor) nextLocked(dest interface{}, progressCursor bool) (bool, error) {
212219
for {
213220
if err := c.seekCursor(true); err != nil {
@@ -227,7 +234,17 @@ func (c *Cursor) nextLocked(dest interface{}, progressCursor bool) (bool, error)
227234
if progressCursor {
228235
c.buffer = c.buffer[1:]
229236
}
230-
237+
if dd, ok := data.(delayedData); ok {
238+
if dd.data == nil {
239+
240+
// Here we remove the lock as in c.fetchMore() because the
241+
// function is likely to block.
242+
c.mu.Unlock()
243+
dd.data = dd.f()
244+
c.mu.Lock()
245+
}
246+
data = dd.data
247+
}
231248
err := encoding.Decode(dest, data)
232249
if err != nil {
233250
return false, err
@@ -494,7 +511,6 @@ func (c *Cursor) Listen(channel interface{}) {
494511
if !c.Next(elemp.Interface()) {
495512
break
496513
}
497-
498514
channelv.Send(elemp.Elem())
499515
}
500516

mock.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,16 @@ func (mq *MockQuery) unlock() {
103103
// Return specifies the return arguments for the expectation.
104104
//
105105
// mock.On(r.Table("test")).Return(nil, errors.New("failed"))
106+
//
107+
// values of "chan interface{}" type will turn to delayed data that produce data
108+
// when there is an element available on the channel. Values of "func()
109+
// interface{}" type will produce data by calling the function. E.g.
110+
//
111+
// f := func() interface{} { return 2 }
112+
// ch := make(chan interface{})
113+
// mock.On(r.Table("test")).Return([]interface{}{ch, f, 3})
114+
//
115+
// Running the query above will block until a value is pushed onto ch.
106116
func (mq *MockQuery) Return(response interface{}, err error) *MockQuery {
107117
mq.lock()
108118
defer mq.unlock()
@@ -337,15 +347,34 @@ func (m *Mock) Query(ctx context.Context, q Query) (*Cursor, error) {
337347
responseVal := reflect.ValueOf(query.Response)
338348
if responseVal.Kind() == reflect.Slice || responseVal.Kind() == reflect.Array {
339349
for i := 0; i < responseVal.Len(); i++ {
340-
c.buffer = append(c.buffer, responseVal.Index(i).Interface())
350+
c.buffer = append(c.buffer, getMockValue(responseVal.Index(i).Interface()))
341351
}
342352
} else {
343-
c.buffer = append(c.buffer, query.Response)
353+
c.buffer = append(c.buffer, getMockValue(query.Response))
344354
}
345355

346356
return c, nil
347357
}
348358

359+
// getMockValue turns some responses to delayedData: values of "chan
360+
// interface{}" type will turn to delayed data that produce data when there is
361+
// an element available on the channel. Values of "func() interface{}" type
362+
// will produce data by calling the function.
363+
func getMockValue(val interface{}) interface{} {
364+
switch v := val.(type) {
365+
case chan interface{}:
366+
return delayedData{
367+
f: func() interface{} { return <-v },
368+
}
369+
case func() interface{}:
370+
return delayedData{
371+
f: v,
372+
}
373+
default:
374+
return val
375+
}
376+
}
377+
349378
func (m *Mock) Exec(ctx context.Context, q Query) error {
350379
_, err := m.Query(ctx, q)
351380

mock_test.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ package rethinkdb
33
import (
44
"fmt"
55

6+
"testing"
7+
68
test "gopkg.in/check.v1"
79
"gopkg.in/rethinkdb/rethinkdb-go.v5/internal/integration/tests"
8-
"testing"
910
)
1011

1112
// Hook up gocheck into the gotest runner.
@@ -79,6 +80,48 @@ func (s *MockSuite) TestMockRunSuccessMultipleResults(c *test.C) {
7980
res.Close()
8081
}
8182

83+
func (s *MockSuite) TestMockRunSuccessChannel(c *test.C) {
84+
mock := NewMock()
85+
ch := make(chan interface{})
86+
mock.On(DB("test").Table("test")).Return([]interface{}{ch, ch}, nil)
87+
go func() {
88+
ch <- 1
89+
ch <- 2
90+
}()
91+
res, err := DB("test").Table("test").Run(mock)
92+
c.Assert(err, test.IsNil)
93+
94+
var response []interface{}
95+
err = res.All(&response)
96+
97+
c.Assert(err, test.IsNil)
98+
c.Assert(response, tests.JsonEquals, []interface{}{1, 2})
99+
mock.AssertExpectations(c)
100+
101+
res.Close()
102+
}
103+
104+
func (s *MockSuite) TestMockRunSuccessFunction(c *test.C) {
105+
mock := NewMock()
106+
n := 0
107+
f := func() interface{} {
108+
n++
109+
return n
110+
}
111+
mock.On(DB("test").Table("test")).Return([]interface{}{f, f, 3}, nil)
112+
res, err := DB("test").Table("test").Run(mock)
113+
c.Assert(err, test.IsNil)
114+
115+
var response []interface{}
116+
err = res.All(&response)
117+
118+
c.Assert(err, test.IsNil)
119+
c.Assert(response, tests.JsonEquals, []interface{}{1, 2, 3})
120+
mock.AssertExpectations(c)
121+
122+
res.Close()
123+
}
124+
82125
func (s *MockSuite) TestMockRunSuccessMultipleResults_type(c *test.C) {
83126
type document struct {
84127
Id string

0 commit comments

Comments
 (0)