Skip to content

Commit 68427d9

Browse files
committed
Fixed some issues around closed connections
1 parent 9c04c10 commit 68427d9

File tree

6 files changed

+67
-9
lines changed

6 files changed

+67
-9
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).
1717
### Changed
1818
- Changed `Timeout` connect argument to only configure the connection timeout.
1919
- Replaced `Db` with `DB` in `RunOpts` and `ExecOpts` (`Db` still works for now)
20-
- Made `Cursor` safe for concurrent use (`Session` was already safe).
20+
- Made `Cursor` and `Session` safe for concurrent use
21+
- Replaced `ErrClusterClosed` with `ErrConnectionClosed`
2122

2223
## Deprecated
2324
- Deprecated `UseOutdated` optional argument

cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ func (c *Cluster) getSeeds() []Host {
326326
// TODO(dancannon) replace with hostpool
327327
func (c *Cluster) GetRandomNode() (*Node, error) {
328328
if !c.IsConnected() {
329-
return nil, ErrClusterClosed
329+
return nil, ErrNoConnections
330330
}
331331
// Must copy array reference for copy on write semantics to work.
332332
nodeArray := c.GetNodes()

connection.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,12 @@ func (c *Connection) Query(q Query) (*Response, *Cursor, error) {
106106
c.mu.Lock()
107107
if c == nil {
108108
c.mu.Unlock()
109-
return nil, nil, nil
109+
return nil, nil, ErrConnectionClosed
110110
}
111111
if c.conn == nil {
112112
c.bad = true
113113
c.mu.Unlock()
114-
return nil, nil, nil
114+
return nil, nil, ErrConnectionClosed
115115
}
116116

117117
// Add token if query is a START/NOREPLY_WAIT

errors.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@ var (
1818
// ErrInvalidNode is returned when attempting to connect to a node which
1919
// returns an invalid response.
2020
ErrInvalidNode = errors.New("invalid node")
21-
// ErrClusterClosed is returned when a query is executed after the connection
22-
// to the cluster has been closed.
23-
ErrClusterClosed = errors.New("cluster closed")
2421
// ErrNoConnections is returned when there are no active connections in the
2522
// clusters connection pool.
2623
ErrNoConnections = errors.New("gorethink: no connections were available")

session.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package gorethink
22

33
import (
44
"crypto/tls"
5+
"sync"
56
"time"
67

78
p "github.com/dancannon/gorethink/ql2"
@@ -10,8 +11,10 @@ import (
1011
// A Session represents a connection to a RethinkDB cluster and should be used
1112
// when executing queries.
1213
type Session struct {
13-
hosts []Host
14-
opts *ConnectOpts
14+
hosts []Host
15+
opts *ConnectOpts
16+
17+
mu sync.RWMutex
1518
cluster *Cluster
1619
closed bool
1720
}
@@ -114,25 +117,32 @@ func (s *Session) Reconnect(optArgs ...CloseOpts) error {
114117
return err
115118
}
116119

120+
s.mu.Lock()
117121
s.cluster, err = NewCluster(s.hosts, s.opts)
118122
if err != nil {
119123
return err
120124
}
121125

122126
s.closed = false
127+
s.mu.Unlock()
123128

124129
return nil
125130
}
126131

127132
// Close closes the session
128133
func (s *Session) Close(optArgs ...CloseOpts) error {
134+
s.mu.Lock()
135+
defer s.mu.Unlock()
136+
129137
if s.closed {
130138
return nil
131139
}
132140

133141
if len(optArgs) >= 1 {
134142
if optArgs[0].NoReplyWait {
143+
s.mu.Unlock()
135144
s.NoReplyWait()
145+
s.mu.Lock()
136146
}
137147
}
138148

@@ -148,12 +158,18 @@ func (s *Session) Close(optArgs ...CloseOpts) error {
148158
// SetMaxIdleConns sets the maximum number of connections in the idle
149159
// connection pool.
150160
func (s *Session) SetMaxIdleConns(n int) {
161+
s.mu.Lock()
162+
defer s.mu.Unlock()
163+
151164
s.opts.MaxIdle = n
152165
s.cluster.SetMaxIdleConns(n)
153166
}
154167

155168
// SetMaxOpenConns sets the maximum number of open connections to the database.
156169
func (s *Session) SetMaxOpenConns(n int) {
170+
s.mu.Lock()
171+
defer s.mu.Unlock()
172+
157173
s.opts.MaxOpen = n
158174
s.cluster.SetMaxOpenConns(n)
159175
}
@@ -162,28 +178,55 @@ func (s *Session) SetMaxOpenConns(n int) {
162178
// processed by the server. Note that this guarantee only applies to queries
163179
// run on the given connection
164180
func (s *Session) NoReplyWait() error {
181+
s.mu.RLock()
182+
defer s.mu.RUnlock()
183+
184+
if s.closed {
185+
return ErrConnectionClosed
186+
}
187+
165188
return s.cluster.Exec(Query{
166189
Type: p.Query_NOREPLY_WAIT,
167190
})
168191
}
169192

170193
// Use changes the default database used
171194
func (s *Session) Use(database string) {
195+
s.mu.RLock()
196+
defer s.mu.RUnlock()
197+
172198
s.opts.Database = database
173199
}
174200

175201
// Query executes a ReQL query using the session to connect to the database
176202
func (s *Session) Query(q Query) (*Cursor, error) {
203+
s.mu.RLock()
204+
defer s.mu.RUnlock()
205+
206+
if s.closed {
207+
return nil, ErrConnectionClosed
208+
}
209+
177210
return s.cluster.Query(q)
178211
}
179212

180213
// Exec executes a ReQL query using the session to connect to the database
181214
func (s *Session) Exec(q Query) error {
215+
s.mu.RLock()
216+
defer s.mu.RUnlock()
217+
218+
if s.closed {
219+
return ErrConnectionClosed
220+
}
221+
182222
return s.cluster.Exec(q)
183223
}
184224

185225
// SetHosts resets the hosts used when connecting to the RethinkDB cluster
186226
func (s *Session) SetHosts(hosts []Host) {
227+
s.mu.Lock()
228+
defer s.mu.Unlock()
229+
187230
s.hosts = hosts
188231
}
189232

session_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,23 @@ func (s *RethinkSuite) TestSessionConnectError(c *test.C) {
5858
c.Assert(err, test.NotNil)
5959
}
6060

61+
func (s *RethinkSuite) TestSessionClose(c *test.C) {
62+
session, err := Connect(ConnectOpts{
63+
Address: url,
64+
AuthKey: os.Getenv("RETHINKDB_AUTHKEY"),
65+
})
66+
c.Assert(err, test.IsNil)
67+
68+
_, err = Expr("Hello World").Run(session)
69+
c.Assert(err, test.IsNil)
70+
71+
err = session.Close()
72+
c.Assert(err, test.IsNil)
73+
74+
_, err = Expr("Hello World").Run(session)
75+
c.Assert(err, test.NotNil)
76+
}
77+
6178
func (s *RethinkSuite) TestSessionConnectDatabase(c *test.C) {
6279
session, err := Connect(ConnectOpts{
6380
Address: url,

0 commit comments

Comments
 (0)