Skip to content

Commit 1e9f5ee

Browse files
committed
Merge branch 'feature/server-info' into develop
2 parents afd9813 + 0f18188 commit 1e9f5ee

File tree

9 files changed

+177
-175
lines changed

9 files changed

+177
-175
lines changed

cluster.go

Lines changed: 74 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ package gorethink
22

33
import (
44
"fmt"
5-
"math"
65
"strings"
76
"sync"
87
"sync/atomic"
98
"time"
109

1110
"github.com/Sirupsen/logrus"
1211
"github.com/cenkalti/backoff"
12+
"github.com/hailocab/go-hostpool"
1313
)
1414

1515
// A Cluster represents a connection to a RethinkDB cluster, a cluster is created
@@ -23,8 +23,9 @@ type Cluster struct {
2323
opts *ConnectOpts
2424

2525
mu sync.RWMutex
26-
seeds []Host // Initial host nodes specified by user.
27-
nodes []*Node // Active nodes in cluster.
26+
seeds []Host // Initial host nodes specified by user.
27+
hp hostpool.HostPool
28+
nodes map[string]*Node // Active nodes in cluster.
2829
closed bool
2930

3031
nodeIndex int64
@@ -33,6 +34,8 @@ type Cluster struct {
3334
// NewCluster creates a new cluster by connecting to the given hosts.
3435
func NewCluster(hosts []Host, opts *ConnectOpts) (*Cluster, error) {
3536
c := &Cluster{
37+
// TODO: Make this configurable
38+
hp: hostpool.NewEpsilonGreedy([]string{}, 0, &hostpool.LinearEpsilonValueCalculator{}),
3639
seeds: hosts,
3740
opts: opts,
3841
}
@@ -52,22 +55,38 @@ func NewCluster(hosts []Host, opts *ConnectOpts) (*Cluster, error) {
5255

5356
// Query executes a ReQL query using the cluster to connect to the database
5457
func (c *Cluster) Query(q Query) (cursor *Cursor, err error) {
55-
node, err := c.GetRandomNode()
58+
node, hpr, err := c.GetNextNode()
5659
if err != nil {
5760
return nil, err
5861
}
5962

60-
return node.Query(q)
63+
cursor, err = node.Query(q)
64+
hpr.Mark(err)
65+
return cursor, err
6166
}
6267

6368
// Exec executes a ReQL query using the cluster to connect to the database
6469
func (c *Cluster) Exec(q Query) (err error) {
65-
node, err := c.GetRandomNode()
70+
node, hpr, err := c.GetNextNode()
6671
if err != nil {
6772
return err
6873
}
6974

70-
return node.Exec(q)
75+
err = node.Exec(q)
76+
hpr.Mark(err)
77+
return err
78+
}
79+
80+
// Server returns the server name and server UUID being used by a connection.
81+
func (c *Cluster) Server() (response ServerResponse, err error) {
82+
node, hpr, err := c.GetNextNode()
83+
if err != nil {
84+
return ServerResponse{}, err
85+
}
86+
87+
response, err = node.Server()
88+
hpr.Mark(err)
89+
return response, err
7190
}
7291

7392
// SetMaxIdleConns sets the maximum number of connections in the idle
@@ -129,7 +148,7 @@ func (c *Cluster) discover() {
129148
// This function will block until the query fails
130149
func (c *Cluster) listenForNodeChanges() error {
131150
// Start listening to changes from a random active node
132-
node, err := c.GetRandomNode()
151+
node, hpr, err := c.GetNextNode()
133152
if err != nil {
134153
return err
135154
}
@@ -145,6 +164,7 @@ func (c *Cluster) listenForNodeChanges() error {
145164

146165
cursor, err := node.Query(q)
147166
if err != nil {
167+
hpr.Mark(err)
148168
return err
149169
}
150170

@@ -182,7 +202,9 @@ func (c *Cluster) listenForNodeChanges() error {
182202
}
183203
}
184204

185-
return cursor.Err()
205+
err = cursor.Err()
206+
hpr.Mark(err)
207+
return err
186208
}
187209

188210
func (c *Cluster) connectNodes(hosts []Host) {
@@ -217,6 +239,11 @@ func (c *Cluster) connectNodes(hosts []Host) {
217239
continue
218240
}
219241

242+
// TODO: connect to seed hosts using `.Server()` to get server ID. Need
243+
// some way of making this backwards compatible
244+
245+
// TODO: AFTER try to discover hosts
246+
220247
if c.opts.DiscoverHosts {
221248
var results []nodeStatus
222249
err = cursor.All(&results)
@@ -322,44 +349,32 @@ func (c *Cluster) getSeeds() []Host {
322349
return seeds
323350
}
324351

325-
// GetRandomNode returns a random node on the cluster
326-
// TODO(dancannon) replace with hostpool
327-
func (c *Cluster) GetRandomNode() (*Node, error) {
352+
// GetNextNode returns a random node on the cluster
353+
func (c *Cluster) GetNextNode() (*Node, hostpool.HostPoolResponse, error) {
328354
if !c.IsConnected() {
329-
return nil, ErrNoConnections
355+
return nil, nil, ErrNoConnections
330356
}
331-
// Must copy array reference for copy on write semantics to work.
332-
nodeArray := c.GetNodes()
333-
length := len(nodeArray)
334-
for i := 0; i < length; i++ {
335-
// Must handle concurrency with other non-tending goroutines, so nodeIndex is consistent.
336-
index := int(math.Abs(float64(c.nextNodeIndex() % int64(length))))
337-
node := nodeArray[index]
338-
339-
if !node.Closed() && node.IsHealthy() {
340-
return node, nil
357+
c.mu.RLock()
358+
defer c.mu.RUnlock()
359+
360+
nodes := c.nodes
361+
362+
hpr := c.hp.Get()
363+
if n, ok := nodes[hpr.Host()]; ok {
364+
if !n.Closed() {
365+
return n, hpr, nil
341366
}
342367
}
343-
return nil, ErrNoConnections
368+
369+
return nil, nil, ErrNoConnections
344370
}
345371

346372
// GetNodes returns a list of all nodes in the cluster
347373
func (c *Cluster) GetNodes() []*Node {
348374
c.mu.RLock()
349-
nodes := c.nodes
350-
c.mu.RUnlock()
351-
352-
return nodes
353-
}
354-
355-
// GetHealthyNodes returns a list of all healthy nodes in the cluster
356-
func (c *Cluster) GetHealthyNodes() []*Node {
357-
c.mu.RLock()
358-
nodes := []*Node{}
359-
for _, node := range c.nodes {
360-
if node.IsHealthy() {
361-
nodes = append(nodes, node)
362-
}
375+
nodes := make([]*Node, 0, len(c.nodes))
376+
for _, n := range c.nodes {
377+
nodes = append(nodes, n)
363378
}
364379
c.mu.RUnlock()
365380

@@ -376,20 +391,34 @@ func (c *Cluster) nodeExists(search *Node) bool {
376391
}
377392

378393
func (c *Cluster) addNode(node *Node) {
379-
c.mu.Lock()
380-
c.nodes = append(c.nodes, node)
381-
c.mu.Unlock()
394+
c.mu.RLock()
395+
nodes := append(c.GetNodes(), node)
396+
c.mu.RUnlock()
397+
398+
c.setNodes(nodes)
382399
}
383400

384401
func (c *Cluster) addNodes(nodesToAdd []*Node) {
385-
c.mu.Lock()
386-
c.nodes = append(c.nodes, nodesToAdd...)
387-
c.mu.Unlock()
402+
c.mu.RLock()
403+
nodes := append(c.GetNodes(), nodesToAdd...)
404+
c.mu.RUnlock()
405+
406+
c.setNodes(nodes)
388407
}
389408

390409
func (c *Cluster) setNodes(nodes []*Node) {
410+
nodesMap := make(map[string]*Node, len(nodes))
411+
hosts := make([]string, len(nodes))
412+
for i, node := range nodes {
413+
host := node.Host.String()
414+
415+
nodesMap[host] = node
416+
hosts[i] = host
417+
}
418+
391419
c.mu.Lock()
392-
c.nodes = nodes
420+
c.nodes = nodesMap
421+
c.hp.SetHosts(hosts)
393422
c.mu.Unlock()
394423
}
395424

connection.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (c *Connection) Query(q Query) (*Response, *Cursor, error) {
115115
}
116116

117117
// Add token if query is a START/NOREPLY_WAIT
118-
if q.Type == p.Query_START || q.Type == p.Query_NOREPLY_WAIT {
118+
if q.Type == p.Query_START || q.Type == p.Query_NOREPLY_WAIT || q.Type == p.Query_SERVER_INFO {
119119
q.Token = c.nextToken()
120120
if c.opts.Database != "" {
121121
var err error
@@ -154,6 +154,33 @@ func (c *Connection) Query(q Query) (*Response, *Cursor, error) {
154154
}
155155
}
156156

157+
type ServerResponse struct {
158+
ID string `gorethink:"id"`
159+
Name string `gorethink:"name"`
160+
}
161+
162+
// Server returns the server name and server UUID being used by a connection.
163+
func (c *Connection) Server() (ServerResponse, error) {
164+
var response ServerResponse
165+
166+
_, cur, err := c.Query(Query{
167+
Type: p.Query_SERVER_INFO,
168+
})
169+
if err != nil {
170+
return response, err
171+
}
172+
173+
if err = cur.One(&response); err != nil {
174+
return response, err
175+
}
176+
177+
if err = cur.Close(); err != nil {
178+
return response, err
179+
}
180+
181+
return response, nil
182+
}
183+
157184
// sendQuery marshals the Query and sends the JSON to the server.
158185
func (c *Connection) sendQuery(q Query) error {
159186
// Build query
@@ -231,7 +258,7 @@ func (c *Connection) processResponse(q Query, response *Response) (*Response, *C
231258
return c.processErrorResponse(q, response, RQLCompileError{rqlResponseError{response, q.Term}})
232259
case p.Response_RUNTIME_ERROR:
233260
return c.processErrorResponse(q, response, RQLRuntimeError{rqlResponseError{response, q.Term}})
234-
case p.Response_SUCCESS_ATOM:
261+
case p.Response_SUCCESS_ATOM, p.Response_SERVER_INFO:
235262
return c.processAtomResponse(q, response)
236263
case p.Response_SUCCESS_PARTIAL:
237264
return c.processPartialResponse(q, response)

0 commit comments

Comments
 (0)