Skip to content

Commit c537668

Browse files
committed
remove socket timeouts
1 parent 006e226 commit c537668

File tree

9 files changed

+115
-55
lines changed

9 files changed

+115
-55
lines changed

connection.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -378,11 +378,6 @@ func (c *Connection) sendQuery(q Query) error {
378378
binary.LittleEndian.PutUint64(b, uint64(q.Token))
379379
binary.LittleEndian.PutUint32(b[8:], uint32(len(b)-respHeaderLen))
380380

381-
// Set timeout
382-
if c.opts.WriteTimeout != 0 {
383-
c.Conn.SetWriteDeadline(time.Now().Add(c.opts.WriteTimeout))
384-
}
385-
386381
// Send the JSON encoding of the query itself.
387382
if err = c.writeData(b); err != nil {
388383
c.setBad()
@@ -402,10 +397,8 @@ func (c *Connection) nextToken() int64 {
402397
// readResponse attempts to read a Response from the server, if no response
403398
// could be read then an error is returned.
404399
func (c *Connection) readResponse() (*Response, error) {
405-
// Set timeout
406-
if c.opts.ReadTimeout != 0 {
407-
c.Conn.SetReadDeadline(time.Now().Add(c.opts.ReadTimeout))
408-
}
400+
// due to this is pooled connection, it always reads from socket even if idle
401+
// timeouts should be only on query-level with context
409402

410403
// Read response header (token+length)
411404
headerBuf := [respHeaderLen]byte{}

connection_helper.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@ func (c *Connection) read(buf []byte) (total int, err error) {
1717
}
1818

1919
func (c *Connection) contextFromConnectionOpts() context.Context {
20-
sum := c.opts.ReadTimeout + c.opts.WriteTimeout
21-
if c.opts.ReadTimeout == 0 || c.opts.WriteTimeout == 0 {
20+
// back compatibility
21+
min := c.opts.ReadTimeout
22+
if c.opts.WriteTimeout < min {
23+
min = c.opts.WriteTimeout
24+
}
25+
if min == 0 {
2226
return context.Background()
2327
}
24-
ctx, _ := context.WithTimeout(context.Background(), sum)
28+
ctx, _ := context.WithTimeout(context.Background(), min)
2529
return ctx
2630
}

connection_test.go

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ func (s *ConnectionSuite) TestConnection_Query_Ok(c *test.C) {
2525
header := respHeader(token, respData)
2626

2727
conn := &connMock{}
28-
conn.On("Write", writeData).Return(len(writeData), nil)
29-
conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil)
30-
conn.On("Read", len(respData)).Return(respData, len(respData), nil)
28+
conn.On("Write", writeData).Return(len(writeData), nil, nil)
29+
conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil, nil)
30+
conn.On("Read", len(respData)).Return(respData, len(respData), nil, nil)
3131
conn.On("Close").Return(nil)
3232

3333
connection := newConnection(conn, "addr", &ConnectOpts{})
@@ -60,9 +60,9 @@ func (s *ConnectionSuite) TestConnection_Query_DefaultDBOk(c *test.C) {
6060
header := respHeader(token, respData)
6161

6262
conn := &connMock{}
63-
conn.On("Write", writeData).Return(len(writeData), nil)
64-
conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil)
65-
conn.On("Read", len(respData)).Return(respData, len(respData), nil)
63+
conn.On("Write", writeData).Return(len(writeData), nil, nil)
64+
conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil, nil)
65+
conn.On("Read", len(respData)).Return(respData, len(respData), nil, nil)
6666
conn.On("Close").Return(nil)
6767

6868
connection := newConnection(conn, "addr", &ConnectOpts{Database: "db"})
@@ -106,7 +106,7 @@ func (s *ConnectionSuite) TestConnection_Query_SendFail(c *test.C) {
106106
writeData := serializeQuery(token, q)
107107

108108
conn := &connMock{}
109-
conn.On("Write", writeData).Return(0, io.EOF)
109+
conn.On("Write", writeData).Return(0, io.EOF, nil)
110110

111111
connection := newConnection(conn, "addr", &ConnectOpts{})
112112
response, cursor, err := connection.Query(ctx, q)
@@ -126,9 +126,9 @@ func (s *ConnectionSuite) TestConnection_Query_NoReplyOk(c *test.C) {
126126
header := respHeader(token, respData)
127127

128128
conn := &connMock{}
129-
conn.On("Write", writeData).Return(len(writeData), nil)
130-
conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil)
131-
conn.On("Read", len(respData)).Return(respData, len(respData), nil)
129+
conn.On("Write", writeData).Return(len(writeData), nil, nil)
130+
conn.On("Read", respHeaderLen).Return(header, respHeaderLen, nil, nil)
131+
conn.On("Read", len(respData)).Return(respData, len(respData), nil, nil)
132132
conn.On("Close").Return(nil)
133133

134134
connection := newConnection(conn, "addr", &ConnectOpts{})
@@ -151,9 +151,8 @@ func (s *ConnectionSuite) TestConnection_Query_TimeoutWrite(c *test.C) {
151151
stopData := serializeQuery(token, newStopQuery(token))
152152

153153
conn := &connMock{}
154-
conn.On("Write", writeData).Return(len(writeData), nil)
155-
conn.On("Write", stopData).Return(len(stopData), nil)
156-
conn.On("SetWriteDeadline").Return(nil)
154+
conn.On("Write", writeData).Return(len(writeData), nil, nil)
155+
conn.On("Write", stopData).Return(len(stopData), nil, nil)
157156

158157
connection := newConnection(conn, "addr", &ConnectOpts{ReadTimeout: time.Millisecond, WriteTimeout: time.Millisecond})
159158
connection.readRequestsChan = make(chan tokenAndPromise, 0)
@@ -174,9 +173,8 @@ func (s *ConnectionSuite) TestConnection_Query_TimeoutRead(c *test.C) {
174173
stopData := serializeQuery(token, newStopQuery(token))
175174

176175
conn := &connMock{}
177-
conn.On("Write", writeData).Return(len(writeData), nil)
178-
conn.On("Write", stopData).Return(len(stopData), nil)
179-
conn.On("SetWriteDeadline").Return(nil)
176+
conn.On("Write", writeData).Return(len(writeData), nil, 10*time.Millisecond)
177+
conn.On("Write", stopData).Return(len(stopData), nil, nil)
180178

181179
connection := newConnection(conn, "addr", &ConnectOpts{ReadTimeout: time.Millisecond, WriteTimeout: time.Millisecond})
182180
response, cursor, err := connection.Query(ctx, q)
@@ -196,7 +194,7 @@ func (s *ConnectionSuite) TestConnection_Query_SendFailTracing(c *test.C) {
196194
writeData := serializeQuery(token, q)
197195

198196
conn := &connMock{}
199-
conn.On("Write", writeData).Return(0, io.EOF)
197+
conn.On("Write", writeData).Return(0, io.EOF, nil)
200198

201199
connection := newConnection(conn, "addr", &ConnectOpts{UseOpentracing: true})
202200
response, cursor, err := connection.Query(ctx, q)
@@ -306,8 +304,7 @@ func (s *ConnectionSuite) TestConnection_readResponse_TimeoutHeader(c *test.C) {
306304
timeout := time.Second
307305

308306
conn := &connMock{}
309-
conn.On("SetReadDeadline").Return(nil)
310-
conn.On("Read", respHeaderLen).Return(nil, 0, io.EOF)
307+
conn.On("Read", respHeaderLen).Return(nil, 0, io.EOF, nil)
311308

312309
connection := newConnection(conn, "addr", &ConnectOpts{ReadTimeout: timeout})
313310

@@ -325,8 +322,8 @@ func (s *ConnectionSuite) TestConnection_readResponse_BodySocketErr(c *test.C) {
325322
header := respHeader(token, respData)
326323

327324
conn := &connMock{}
328-
conn.On("Read", respHeaderLen).Return(header, len(header), nil)
329-
conn.On("Read", len(respData)).Return(nil, 0, io.EOF)
325+
conn.On("Read", respHeaderLen).Return(header, len(header), nil, nil)
326+
conn.On("Read", len(respData)).Return(nil, 0, io.EOF, nil)
330327

331328
connection := newConnection(conn, "addr", &ConnectOpts{})
332329

@@ -344,8 +341,8 @@ func (s *ConnectionSuite) TestConnection_readResponse_BodyUnmarshalErr(c *test.C
344341
header := respHeader(token, respData)
345342

346343
conn := &connMock{}
347-
conn.On("Read", respHeaderLen).Return(header, len(header), nil)
348-
conn.On("Read", len(respData)).Return(make([]byte, len(respData)), len(respData), nil)
344+
conn.On("Read", respHeaderLen).Return(header, len(header), nil, nil)
345+
conn.On("Read", len(respData)).Return(make([]byte, len(respData)), len(respData), nil, nil)
349346

350347
connection := newConnection(conn, "addr", &ConnectOpts{})
351348

go.mod

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,28 @@
11
module gopkg.in/rethinkdb/rethinkdb-go.v6
22

33
require (
4-
github.com/cenkalti/backoff v2.0.0+incompatible
4+
github.com/bitly/go-hostpool v0.1.0 // indirect
5+
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
6+
github.com/cenkalti/backoff v2.2.1+incompatible
57
github.com/davecgh/go-spew v1.1.1 // indirect
6-
github.com/golang/protobuf v1.2.0
8+
github.com/golang/protobuf v1.3.4
79
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed
810
github.com/kr/pretty v0.1.0 // indirect
9-
github.com/opentracing/opentracing-go v1.0.2
10-
github.com/pmezard/go-difflib v1.0.0 // indirect
11+
github.com/kr/text v0.2.0 // indirect
12+
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
13+
github.com/onsi/ginkgo v1.12.0 // indirect
14+
github.com/onsi/gomega v1.9.0 // indirect
15+
github.com/opentracing/opentracing-go v1.1.0
1116
github.com/sirupsen/logrus v1.0.6
12-
github.com/stretchr/objx v0.1.1 // indirect
13-
github.com/stretchr/testify v1.2.2
14-
golang.org/x/crypto v0.0.0-20180820150726-614d502a4dac
15-
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d
16-
golang.org/x/sys v0.0.0-20180828065106-d99a578cf41b // indirect
17-
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127
18-
gopkg.in/fatih/pool.v2 v2.0.0
17+
github.com/stretchr/objx v0.2.0 // indirect
18+
github.com/stretchr/testify v1.5.1
19+
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073
20+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
21+
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect
22+
gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect
23+
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f
24+
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect
25+
gopkg.in/yaml.v2 v2.2.8 // indirect
1926
)
2027

21-
go 1.13
28+
go 1.14

mock_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,54 @@ func (s *MockSuite) TestMockRethinkStructsRunWrite(c *test.C) {
371371
mock.AssertExpectations(c)
372372
}
373373

374+
func (s *MockSuite) TestMockMapSliceResultOk(c *test.C) {
375+
type Some struct {
376+
Id string
377+
}
378+
379+
result := []map[string]interface{}{
380+
{"Id": "test1"},
381+
{"Id": "test2"},
382+
}
383+
384+
mock := NewMock()
385+
q := DB("test").Table("test").GetAll()
386+
mock.On(q).Return(result, nil)
387+
res, err := q.Run(mock)
388+
c.Assert(err, test.IsNil)
389+
390+
var casted []*Some
391+
err = res.All(&casted)
392+
c.Assert(err, test.IsNil)
393+
394+
c.Assert(casted[0].Id, test.Equals, "test1")
395+
c.Assert(casted[1].Id, test.Equals, "test2")
396+
}
397+
398+
func (s *MockSuite) TestMockPointerSliceResultOk(c *test.C) {
399+
type Some struct {
400+
Id string
401+
}
402+
403+
result := []*Some{
404+
{Id: "test1"},
405+
{Id: "test2"},
406+
}
407+
408+
mock := NewMock()
409+
q := DB("test").Table("test").GetAll()
410+
mock.On(q).Return(result, nil)
411+
res, err := q.Run(mock)
412+
c.Assert(err, test.IsNil)
413+
414+
var casted []*Some
415+
err = res.All(&casted)
416+
c.Assert(err, test.IsNil)
417+
418+
c.Assert(casted[0].Id, test.Equals, "test1")
419+
c.Assert(casted[1].Id, test.Equals, "test2")
420+
}
421+
374422
type simpleTestingT struct {
375423
failed bool
376424
}

mocks_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,19 @@ func (m *connMock) Read(b []byte) (n int, err error) {
1616
if ok {
1717
copy(b, rbuf)
1818
}
19+
timeout := args.Get(3)
20+
if timeout != nil {
21+
time.Sleep(timeout.(time.Duration))
22+
}
1923
return args.Int(1), args.Error(2)
2024
}
2125

2226
func (m *connMock) Write(b []byte) (n int, err error) {
2327
args := m.Called(b)
28+
timeout := args.Get(2)
29+
if timeout != nil {
30+
time.Sleep(timeout.(time.Duration))
31+
}
2432
return args.Int(0), args.Error(1)
2533
}
2634

pool.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"sync/atomic"
77

88
"golang.org/x/net/context"
9-
"gopkg.in/fatih/pool.v2"
109
)
1110

1211
var (
@@ -59,7 +58,7 @@ func NewPool(host Host, opts *ConnectOpts) (*Pool, error) {
5958
// Ping verifies a connection to the database is still alive,
6059
// establishing a connection if necessary.
6160
func (p *Pool) Ping() error {
62-
_, _, err := p.conn()
61+
_, err := p.conn()
6362
return err
6463
}
6564

@@ -84,12 +83,12 @@ func (p *Pool) Close() error {
8483
return nil
8584
}
8685

87-
func (p *Pool) conn() (*Connection, *pool.PoolConn, error) {
86+
func (p *Pool) conn() (*Connection, error) {
8887
p.mu.RLock()
8988

9089
if p.closed {
9190
p.mu.RUnlock()
92-
return nil, nil, errPoolClosed
91+
return nil, errPoolClosed
9392
}
9493
p.mu.RUnlock()
9594

@@ -99,7 +98,7 @@ func (p *Pool) conn() (*Connection, *pool.PoolConn, error) {
9998
}
10099
pos = pos % int32(len(p.conns))
101100

102-
return p.conns[pos], nil, nil
101+
return p.conns[pos], nil
103102
}
104103

105104
// SetInitialPoolCap sets the initial capacity of the connection pool.
@@ -128,7 +127,7 @@ func (p *Pool) SetMaxOpenConns(n int) {
128127

129128
// Exec executes a query without waiting for any response.
130129
func (p *Pool) Exec(ctx context.Context, q Query) error {
131-
c, _, err := p.conn()
130+
c, err := p.conn()
132131
if err != nil {
133132
return err
134133
}
@@ -139,7 +138,7 @@ func (p *Pool) Exec(ctx context.Context, q Query) error {
139138

140139
// Query executes a query and waits for the response
141140
func (p *Pool) Query(ctx context.Context, q Query) (*Cursor, error) {
142-
c, _, err := p.conn()
141+
c, err := p.conn()
143142
if err != nil {
144143
return nil, err
145144
}
@@ -152,7 +151,7 @@ func (p *Pool) Query(ctx context.Context, q Query) (*Cursor, error) {
152151
func (p *Pool) Server() (ServerResponse, error) {
153152
var response ServerResponse
154153

155-
c, _, err := p.conn()
154+
c, err := p.conn()
156155
if err != nil {
157156
return response, err
158157
}

rethinkdb.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
)
1111

1212
var (
13+
// Log is logger for debug purpuses.
14+
// deprecated
1315
Log *logrus.Logger
1416
)
1517

session.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,11 @@ type ConnectOpts struct {
4646
Timeout time.Duration `rethinkdb:"timeout,omitempty" json:"timeout,omitempty"`
4747
// WriteTimeout is the amount of time the driver will wait when sending the
4848
// query to the server
49+
// Deprecated: use RunOpts.Context instead
4950
WriteTimeout time.Duration `rethinkdb:"write_timeout,omitempty" json:"write_timeout,omitempty"`
5051
// ReadTimeout is the amount of time the driver will wait for a response from
5152
// the server when executing queries.
53+
// Deprecated: use RunOpts.Context instead
5254
ReadTimeout time.Duration `rethinkdb:"read_timeout,omitempty" json:"read_timeout,omitempty"`
5355
// KeepAlivePeriod is the keep alive period used by the connection, by default
5456
// this is 30s. It is not possible to disable keep alive messages

0 commit comments

Comments
 (0)