Skip to content

Commit 6b993cf

Browse files
committed
Added hostpool in Cluster
1 parent 58cd5f1 commit 6b993cf

File tree

3 files changed

+80
-186
lines changed

3 files changed

+80
-186
lines changed

cluster.go

Lines changed: 67 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ package gorethink
22

33
import (
44
"fmt"
5-
"math"
65
"strings"
76
"sync"
87
"sync/atomic"
98
"time"
109

1110
"github.com/Sirupsen/logrus"
1211
"github.com/cenkalti/backoff"
12+
"github.com/hailocab/go-hostpool"
1313
)
1414

1515
// A Cluster represents a connection to a RethinkDB cluster, a cluster is created
@@ -23,8 +23,9 @@ type Cluster struct {
2323
opts *ConnectOpts
2424

2525
mu sync.RWMutex
26-
seeds []Host // Initial host nodes specified by user.
27-
nodes []*Node // Active nodes in cluster.
26+
seeds []Host // Initial host nodes specified by user.
27+
hp hostpool.HostPool
28+
nodes map[string]*Node // Active nodes in cluster.
2829
closed bool
2930

3031
nodeIndex int64
@@ -33,6 +34,8 @@ type Cluster struct {
3334
// NewCluster creates a new cluster by connecting to the given hosts.
3435
func NewCluster(hosts []Host, opts *ConnectOpts) (*Cluster, error) {
3536
c := &Cluster{
37+
// TODO: Make this configurable
38+
hp: hostpool.NewEpsilonGreedy([]string{}, 0, &hostpool.LinearEpsilonValueCalculator{}),
3639
seeds: hosts,
3740
opts: opts,
3841
}
@@ -52,32 +55,38 @@ func NewCluster(hosts []Host, opts *ConnectOpts) (*Cluster, error) {
5255

5356
// Query executes a ReQL query using the cluster to connect to the database
5457
func (c *Cluster) Query(q Query) (cursor *Cursor, err error) {
55-
node, err := c.GetRandomNode()
58+
node, hpr, err := c.GetNextNode()
5659
if err != nil {
5760
return nil, err
5861
}
5962

60-
return node.Query(q)
63+
cursor, err = node.Query(q)
64+
hpr.Mark(err)
65+
return cursor, err
6166
}
6267

6368
// Exec executes a ReQL query using the cluster to connect to the database
6469
func (c *Cluster) Exec(q Query) (err error) {
65-
node, err := c.GetRandomNode()
70+
node, hpr, err := c.GetNextNode()
6671
if err != nil {
6772
return err
6873
}
6974

70-
return node.Exec(q)
75+
err = node.Exec(q)
76+
hpr.Mark(err)
77+
return err
7178
}
7279

7380
// Server returns the server name and server UUID being used by a connection.
74-
func (c *Cluster) Server() (ServerResponse, error) {
75-
node, err := c.GetRandomNode()
81+
func (c *Cluster) Server() (response ServerResponse, err error) {
82+
node, hpr, err := c.GetNextNode()
7683
if err != nil {
7784
return ServerResponse{}, err
7885
}
7986

80-
return node.Server()
87+
response, err = node.Server()
88+
hpr.Mark(err)
89+
return response, err
8190
}
8291

8392
// SetMaxIdleConns sets the maximum number of connections in the idle
@@ -139,7 +148,7 @@ func (c *Cluster) discover() {
139148
// This function will block until the query fails
140149
func (c *Cluster) listenForNodeChanges() error {
141150
// Start listening to changes from a random active node
142-
node, err := c.GetRandomNode()
151+
node, hpr, err := c.GetNextNode()
143152
if err != nil {
144153
return err
145154
}
@@ -155,6 +164,7 @@ func (c *Cluster) listenForNodeChanges() error {
155164

156165
cursor, err := node.Query(q)
157166
if err != nil {
167+
hpr.Mark(err)
158168
return err
159169
}
160170

@@ -192,7 +202,9 @@ func (c *Cluster) listenForNodeChanges() error {
192202
}
193203
}
194204

195-
return cursor.Err()
205+
err = cursor.Err()
206+
hpr.Mark(err)
207+
return err
196208
}
197209

198210
func (c *Cluster) connectNodes(hosts []Host) {
@@ -227,6 +239,11 @@ func (c *Cluster) connectNodes(hosts []Host) {
227239
continue
228240
}
229241

242+
// TODO: connect to seed hosts using `.Server()` to get server ID. Need
243+
// some way of making this backwards compatible
244+
245+
// TODO: AFTER try to discover hosts
246+
230247
if c.opts.DiscoverHosts {
231248
var results []nodeStatus
232249
err = cursor.All(&results)
@@ -332,44 +349,32 @@ func (c *Cluster) getSeeds() []Host {
332349
return seeds
333350
}
334351

335-
// GetRandomNode returns a random node on the cluster
336-
// TODO(dancannon) replace with hostpool
337-
func (c *Cluster) GetRandomNode() (*Node, error) {
352+
// GetNextNode returns a random node on the cluster
353+
func (c *Cluster) GetNextNode() (*Node, hostpool.HostPoolResponse, error) {
338354
if !c.IsConnected() {
339-
return nil, ErrNoConnections
355+
return nil, nil, ErrNoConnections
340356
}
341-
// Must copy array reference for copy on write semantics to work.
342-
nodeArray := c.GetNodes()
343-
length := len(nodeArray)
344-
for i := 0; i < length; i++ {
345-
// Must handle concurrency with other non-tending goroutines, so nodeIndex is consistent.
346-
index := int(math.Abs(float64(c.nextNodeIndex() % int64(length))))
347-
node := nodeArray[index]
348-
349-
if !node.Closed() && node.IsHealthy() {
350-
return node, nil
357+
c.mu.RLock()
358+
defer c.mu.RUnlock()
359+
360+
nodes := c.nodes
361+
362+
hpr := c.hp.Get()
363+
if n, ok := nodes[hpr.Host()]; ok {
364+
if !n.Closed() {
365+
return n, hpr, nil
351366
}
352367
}
353-
return nil, ErrNoConnections
368+
369+
return nil, nil, ErrNoConnections
354370
}
355371

356372
// GetNodes returns a list of all nodes in the cluster
357373
func (c *Cluster) GetNodes() []*Node {
358374
c.mu.RLock()
359-
nodes := c.nodes
360-
c.mu.RUnlock()
361-
362-
return nodes
363-
}
364-
365-
// GetHealthyNodes returns a list of all healthy nodes in the cluster
366-
func (c *Cluster) GetHealthyNodes() []*Node {
367-
c.mu.RLock()
368-
nodes := []*Node{}
369-
for _, node := range c.nodes {
370-
if node.IsHealthy() {
371-
nodes = append(nodes, node)
372-
}
375+
nodes := make([]*Node, 0, len(c.nodes))
376+
for _, n := range c.nodes {
377+
nodes = append(nodes, n)
373378
}
374379
c.mu.RUnlock()
375380

@@ -386,20 +391,34 @@ func (c *Cluster) nodeExists(search *Node) bool {
386391
}
387392

388393
func (c *Cluster) addNode(node *Node) {
389-
c.mu.Lock()
390-
c.nodes = append(c.nodes, node)
391-
c.mu.Unlock()
394+
c.mu.RLock()
395+
nodes := append(c.GetNodes(), node)
396+
c.mu.RUnlock()
397+
398+
c.setNodes(nodes)
392399
}
393400

394401
func (c *Cluster) addNodes(nodesToAdd []*Node) {
395-
c.mu.Lock()
396-
c.nodes = append(c.nodes, nodesToAdd...)
397-
c.mu.Unlock()
402+
c.mu.RLock()
403+
nodes := append(c.GetNodes(), nodesToAdd...)
404+
c.mu.RUnlock()
405+
406+
c.setNodes(nodes)
398407
}
399408

400409
func (c *Cluster) setNodes(nodes []*Node) {
410+
nodesMap := make(map[string]*Node, len(nodes))
411+
hosts := make([]string, len(nodes))
412+
for i, node := range nodes {
413+
host := node.Host.String()
414+
415+
nodesMap[host] = node
416+
hosts[i] = host
417+
}
418+
401419
c.mu.Lock()
402-
c.nodes = nodes
420+
c.nodes = nodesMap
421+
c.hp.SetHosts(hosts)
403422
c.mu.Unlock()
404423
}
405424

0 commit comments

Comments
 (0)