Skip to content

Commit 435e7f3

Browse files
committed
Merge branch 'feature/simple-pool-again' into develop
2 parents 565dcf8 + 1b4e3eb commit 435e7f3

File tree

10 files changed

+164
-559
lines changed

10 files changed

+164
-559
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,13 @@ This project adheres to [Semantic Versioning](http://semver.org/).
1313
### Changed
1414
- Timezones from `time.Time` are now stored in the database, before all times were stored as UTC. To convert a go `time.Time` back to UTC you can call `t.In(time.UTC)`.
1515
- Improved host selection to use `hailocab/go-hostpool` to select nodes based on recent responses and timings.
16+
- Changed connection pool to use `fatih/pool` instead of a custom connection pool, this has caused some internal API changes and the behaviour of `MaxIdle` and `MaxOpen` has slightly changed. This change was made mostly to make driver maintenance easier.
17+
+ `MaxIdle` now configures the initial size of the pool, the name of this field will likely change in the future.
18+
+ Not setting `MaxOpen` no longer creates an unbounded connection pool per host but instead creates a pool with a maximum capacity of 2 per host.
1619

1720
### Deprecated
1821
- Deprecated the option `NodeRefreshInterval` in `ConnectOpts`
22+
- Deprecated `SetMaxIdleConns` and `SetMaxOpenConns`, these options should now only be set when creating the session.
1923

2024
## v1.1.4
2125
### Added

cluster.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (c *Cluster) discover() {
138138

139139
return c.listenForNodeChanges()
140140
}, b, func(err error, wait time.Duration) {
141-
Log.Debugf("Error discovering hosts %s, waiting %s", err, wait)
141+
Log.Debugf("Error discovering hosts %s, waiting: %s", err, wait)
142142
})
143143
}
144144
}
@@ -158,7 +158,7 @@ func (c *Cluster) listenForNodeChanges() error {
158158
c.opts,
159159
)
160160
if err != nil {
161-
return fmt.Errorf("Error building query %s", err)
161+
return fmt.Errorf("Error building query: %s", err)
162162
}
163163

164164
cursor, err := node.Query(q)
@@ -217,7 +217,7 @@ func (c *Cluster) connectNodes(hosts []Host) {
217217
for _, host := range hosts {
218218
conn, err := NewConnection(host.String(), c.opts)
219219
if err != nil {
220-
Log.Warnf("Error creating connection %s", err.Error())
220+
Log.Warnf("Error creating connection: %s", err.Error())
221221
continue
222222
}
223223
defer conn.Close()
@@ -228,13 +228,13 @@ func (c *Cluster) connectNodes(hosts []Host) {
228228
c.opts,
229229
)
230230
if err != nil {
231-
Log.Warnf("Error building query %s", err)
231+
Log.Warnf("Error building query: %s", err)
232232
continue
233233
}
234234

235235
_, cursor, err := conn.Query(q)
236236
if err != nil {
237-
Log.Warnf("Error fetching cluster status %s", err)
237+
Log.Warnf("Error fetching cluster status: %s", err)
238238
continue
239239
}
240240

@@ -260,6 +260,8 @@ func (c *Cluster) connectNodes(hosts []Host) {
260260
}).Debug("Connected to node")
261261
nodeSet[node.ID] = node
262262
}
263+
} else {
264+
Log.Warnf("Error connecting to node: %s", err)
263265
}
264266
}
265267
} else {
@@ -272,6 +274,8 @@ func (c *Cluster) connectNodes(hosts []Host) {
272274
}).Debug("Connected to node")
273275
nodeSet[node.ID] = node
274276
}
277+
} else {
278+
Log.Warnf("Error connecting to node: %s", err)
275279
}
276280
}
277281
}

connection.go

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ type Response struct {
3030
// Connection is a connection to a rethinkdb database. Connection is not thread
3131
// safe and should only be accessed be a single goroutine
3232
type Connection struct {
33+
net.Conn
34+
3335
address string
3436
opts *ConnectOpts
35-
conn net.Conn
3637

3738
_ [4]byte
3839
mu sync.Mutex
@@ -52,19 +53,19 @@ func NewConnection(address string, opts *ConnectOpts) (*Connection, error) {
5253
// Connect to Server
5354
nd := net.Dialer{Timeout: c.opts.Timeout}
5455
if c.opts.TLSConfig == nil {
55-
c.conn, err = nd.Dial("tcp", address)
56+
c.Conn, err = nd.Dial("tcp", address)
5657
} else {
57-
c.conn, err = tls.DialWithDialer(&nd, "tcp", address, c.opts.TLSConfig)
58+
c.Conn, err = tls.DialWithDialer(&nd, "tcp", address, c.opts.TLSConfig)
5859
}
5960
if err != nil {
6061
return nil, err
6162
}
6263
// Enable TCP Keepalives on TCP connections
63-
if tc, ok := c.conn.(*net.TCPConn); ok {
64+
if tc, ok := c.Conn.(*net.TCPConn); ok {
6465
if err := tc.SetKeepAlive(true); err != nil {
6566
// Don't send COM_QUIT before handshake.
66-
c.conn.Close()
67-
c.conn = nil
67+
c.Conn.Close()
68+
c.Conn = nil
6869
return nil, err
6970
}
7071
}
@@ -88,9 +89,9 @@ func (c *Connection) Close() error {
8889
c.mu.Lock()
8990
defer c.mu.Unlock()
9091

91-
if c.conn != nil {
92-
c.conn.Close()
93-
c.conn = nil
92+
if c.Conn != nil {
93+
c.Conn.Close()
94+
c.Conn = nil
9495
}
9596

9697
c.cursors = nil
@@ -108,7 +109,7 @@ func (c *Connection) Query(q Query) (*Response, *Cursor, error) {
108109
c.mu.Unlock()
109110
return nil, nil, ErrConnectionClosed
110111
}
111-
if c.conn == nil {
112+
if c.Conn == nil {
112113
c.bad = true
113114
c.mu.Unlock()
114115
return nil, nil, ErrConnectionClosed
@@ -191,9 +192,9 @@ func (c *Connection) sendQuery(q Query) error {
191192

192193
// Set timeout
193194
if c.opts.WriteTimeout == 0 {
194-
c.conn.SetWriteDeadline(time.Time{})
195+
c.Conn.SetWriteDeadline(time.Time{})
195196
} else {
196-
c.conn.SetWriteDeadline(time.Now().Add(c.opts.WriteTimeout))
197+
c.Conn.SetWriteDeadline(time.Now().Add(c.opts.WriteTimeout))
197198
}
198199

199200
// Send the JSON encoding of the query itself.
@@ -217,9 +218,9 @@ func (c *Connection) nextToken() int64 {
217218
func (c *Connection) readResponse() (*Response, error) {
218219
// Set timeout
219220
if c.opts.ReadTimeout == 0 {
220-
c.conn.SetReadDeadline(time.Time{})
221+
c.Conn.SetReadDeadline(time.Time{})
221222
} else {
222-
c.conn.SetReadDeadline(time.Now().Add(c.opts.ReadTimeout))
223+
c.Conn.SetReadDeadline(time.Now().Add(c.opts.ReadTimeout))
223224
}
224225

225226
// Read response header (token+length)
@@ -350,6 +351,13 @@ func (c *Connection) processWaitResponse(q Query, response *Response) (*Response
350351
return response, nil, nil
351352
}
352353

354+
func (c *Connection) isBad() bool {
355+
c.mu.Lock()
356+
defer c.mu.Unlock()
357+
358+
return c.bad
359+
}
360+
353361
var responseCache = make(chan *Response, 16)
354362

355363
func newCachedResponse() *Response {

connection_helper.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212

1313
// Write 'data' to conn
1414
func (c *Connection) writeData(data []byte) error {
15-
_, err := c.conn.Write(data[:])
15+
_, err := c.Conn.Write(data[:])
1616
if err != nil {
1717
return RQLConnectionError{err.Error()}
1818
}
@@ -46,7 +46,7 @@ func (c *Connection) writeHandshakeReq() error {
4646
}
4747

4848
func (c *Connection) readHandshakeSuccess() error {
49-
reader := bufio.NewReader(c.conn)
49+
reader := bufio.NewReader(c.Conn)
5050
line, err := reader.ReadBytes('\x00')
5151
if err != nil {
5252
if err == io.EOF {
@@ -68,7 +68,7 @@ func (c *Connection) readHandshakeSuccess() error {
6868
func (c *Connection) read(buf []byte, length int) (total int, err error) {
6969
var n int
7070
for total < length {
71-
if n, err = c.conn.Read(buf[total:length]); err != nil {
71+
if n, err = c.Conn.Read(buf[total:length]); err != nil {
7272
break
7373
}
7474
total += n

cursor.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func newCursor(conn *Connection, cursorType string, token int64, term *Term, opt
4747
// err = cursor.Err() // get any error encountered during iteration
4848
// ...
4949
type Cursor struct {
50-
releaseConn func(error)
50+
releaseConn func() error
5151

5252
conn *Connection
5353
token int64
@@ -111,7 +111,7 @@ func (c *Cursor) Close() error {
111111
if conn == nil {
112112
return nil
113113
}
114-
if conn.conn == nil {
114+
if conn.Conn == nil {
115115
return nil
116116
}
117117

@@ -129,7 +129,9 @@ func (c *Cursor) Close() error {
129129
}
130130

131131
if c.releaseConn != nil {
132-
c.releaseConn(err)
132+
if err := c.releaseConn(); err != nil {
133+
return err
134+
}
133135
}
134136

135137
c.closed = true

cursor_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,7 @@ func (s *RethinkSuite) TestCursorChangesClose(c *test.C) {
355355
// res, err := DB("test").Table("Table3").Changes().Run(session)
356356
res, err := DB("test").Table("Table3").Changes().Run(session)
357357
c.Assert(err, test.IsNil)
358+
c.Assert(res, test.NotNil)
358359

359360
// Ensure that the cursor can be closed
360361
err = res.Close()

0 commit comments

Comments
 (0)