Skip to content

Commit 5a752cb

Browse files
committed
changes offset
1 parent 212dee2 commit 5a752cb

File tree

4 files changed

+43
-20
lines changed

4 files changed

+43
-20
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).
77
- Reworked and tested new connection pools with multiple queries per connection
88
- Socket Read- and WriteTimeout replaced with context timeout
99
- Mock assert fix
10+
- Connection pool fixed initial size
11+
- Changes added offsets
1012

1113
## v6.0.0 - 2019-12-22
1214

pool.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,21 @@ var (
1212
errPoolClosed = errors.New("rethinkdb: pool is closed")
1313
)
1414

15+
const (
16+
poolIsNotClosed int32 = 0
17+
poolIsClosed int32 = 1
18+
)
19+
1520
// A Pool is used to store a pool of connections to a single RethinkDB server
1621
type Pool struct {
1722
host Host
1823
opts *ConnectOpts
1924

2025
conns []*Connection
2126
pointer int32
27+
closed int32
2228

23-
mu sync.RWMutex // protects following fields
24-
closed bool
29+
mu sync.Mutex // protects lazy creating connections
2530
}
2631

2732
// NewPool creates a new connection pool for the given host
@@ -40,7 +45,7 @@ func NewPool(host Host, opts *ConnectOpts) (*Pool, error) {
4045

4146
conns := make([]*Connection, maxOpen)
4247
var err error
43-
for i := range conns {
48+
for i := 0; i < opts.InitialCap; i++ {
4449
conns[i], err = NewConnection(host.String(), opts)
4550
if err != nil {
4651
return nil, err
@@ -52,6 +57,7 @@ func NewPool(host Host, opts *ConnectOpts) (*Pool, error) {
5257
pointer: -1,
5358
host: host,
5459
opts: opts,
60+
closed: poolIsNotClosed,
5561
}, nil
5662
}
5763

@@ -67,11 +73,17 @@ func (p *Pool) Ping() error {
6773
// It is rare to Close a Pool, as the Pool handle is meant to be
6874
// long-lived and shared between many goroutines.
6975
func (p *Pool) Close() error {
70-
p.mu.RLock()
71-
defer p.mu.RUnlock()
72-
if p.closed {
76+
if atomic.LoadInt32(&p.closed) == poolIsClosed {
77+
return nil
78+
}
79+
80+
p.mu.Lock()
81+
defer p.mu.Unlock()
82+
83+
if p.closed == poolIsClosed {
7384
return nil
7485
}
86+
p.closed = poolIsClosed
7587

7688
for _, c := range p.conns {
7789
err := c.Close()
@@ -84,20 +96,29 @@ func (p *Pool) Close() error {
8496
}
8597

8698
func (p *Pool) conn() (*Connection, error) {
87-
p.mu.RLock()
88-
89-
if p.closed {
90-
p.mu.RUnlock()
99+
if atomic.LoadInt32(&p.closed) == poolIsClosed {
91100
return nil, errPoolClosed
92101
}
93-
p.mu.RUnlock()
94102

95103
pos := atomic.AddInt32(&p.pointer, 1)
96104
if pos == int32(len(p.conns)) {
97105
atomic.StoreInt32(&p.pointer, 0)
98106
}
99107
pos = pos % int32(len(p.conns))
100108

109+
if p.conns[pos] == nil {
110+
p.mu.Lock()
111+
defer p.mu.Unlock()
112+
113+
if p.conns[pos] == nil {
114+
var err error
115+
p.conns[pos], err = NewConnection(p.host.String(), p.opts)
116+
if err != nil {
117+
return nil, err
118+
}
119+
}
120+
}
121+
101122
return p.conns[pos], nil
102123
}
103124

query.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -289,11 +289,13 @@ type WriteResponse struct {
289289
// ChangeResponse is a helper type used when dealing with changefeeds. The type
290290
// contains both the value before the query and the new value.
291291
type ChangeResponse struct {
292-
NewValue interface{} `rethinkdb:"new_val,omitempty"`
293-
OldValue interface{} `rethinkdb:"old_val,omitempty"`
294-
State string `rethinkdb:"state,omitempty"`
295-
Error string `rethinkdb:"error,omitempty"`
296-
Type string `rethinkdb:"type,omitempty"`
292+
NewValue interface{} `rethinkdb:"new_val,omitempty"`
293+
OldValue interface{} `rethinkdb:"old_val,omitempty"`
294+
State string `rethinkdb:"state,omitempty"`
295+
Error string `rethinkdb:"error,omitempty"`
296+
Type string `rethinkdb:"type,omitempty"`
297+
OldOffset int `rethinkdb:"old_offset,omitempty"`
298+
NewOffset int `rethinkdb:"new_offset,omitempty"`
297299
}
298300

299301
// RunOpts contains the optional arguments for the Run function.

session.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,8 @@ type ConnectOpts struct {
7878
// the first query is executed.
7979
InitialCap int `rethinkdb:"initial_cap,omitempty" json:"initial_cap,omitempty"`
8080
// MaxOpen is used by the internal connection pool and is used to configure
81-
// the maximum number of connections held in the pool. If all available
82-
// connections are being used then the driver will open new connections as
83-
// needed however they will not be returned to the pool. By default the
84-
// maximum number of connections is 2
81+
// the maximum number of connections held in the pool. By default the
82+
// maximum number of connections is 1
8583
MaxOpen int `rethinkdb:"max_open,omitempty" json:"max_open,omitempty"`
8684

8785
// Below options are for cluster discovery, please note there is a high

0 commit comments

Comments
 (0)