Skip to content

Commit 380652f

Browse files
committed
Fixed issue causing infinite blocking by using a simple ping query instead
1 parent ac4cdaf commit 380652f

File tree

1 file changed

+55
-33
lines changed

1 file changed

+55
-33
lines changed

node.go

Lines changed: 55 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,17 @@ func newNode(id string, aliases []Host, cluster *Cluster, pool *Pool) *Node {
4444
refreshInterval = time.Second * 30
4545
}
4646

47-
if cluster.opts.DiscoverHosts {
48-
go func() {
49-
refreshTicker := time.NewTicker(refreshInterval)
50-
for {
51-
select {
52-
case <-refreshTicker.C:
53-
node.Refresh()
54-
case <-node.refreshDoneChan:
55-
return
56-
}
47+
go func() {
48+
refreshTicker := time.NewTicker(refreshInterval)
49+
for {
50+
select {
51+
case <-refreshTicker.C:
52+
node.Refresh()
53+
case <-node.refreshDoneChan:
54+
return
5755
}
58-
}()
59-
}
56+
}
57+
}()
6058

6159
return node
6260
}
@@ -149,26 +147,52 @@ func (n *Node) Exec(q Query) (err error) {
149147
// the nodes health is decrease, if there were no issues then the node is marked
150148
// as being healthy.
151149
func (n *Node) Refresh() {
152-
cursor, err := n.pool.Query(newQuery(
153-
DB("rethinkdb").Table("server_status").Get(n.ID),
154-
map[string]interface{}{},
155-
n.cluster.opts,
156-
))
157-
if err != nil {
158-
n.DecrementHealth()
159-
return
160-
}
161-
defer cursor.Close()
150+
if n.cluster.opts.DiscoverHosts {
151+
// If host discovery is enabled then check the servers status
152+
cursor, err := n.pool.Query(newQuery(
153+
DB("rethinkdb").Table("server_status").Get(n.ID),
154+
map[string]interface{}{},
155+
n.cluster.opts,
156+
))
157+
if err != nil {
158+
n.DecrementHealth()
159+
return
160+
}
161+
defer cursor.Close()
162162

163-
var status nodeStatus
164-
err = cursor.One(&status)
165-
if err != nil {
166-
return
167-
}
163+
var status nodeStatus
164+
err = cursor.One(&status)
165+
if err != nil {
166+
return
167+
}
168168

169-
if status.Status != "connected" {
170-
n.DecrementHealth()
171-
return
169+
if status.Status != "connected" {
170+
n.DecrementHealth()
171+
return
172+
}
173+
} else {
174+
// If host discovery is disabled just execute a simple ping query
175+
cursor, err := n.pool.Query(newQuery(
176+
Expr("OK"),
177+
map[string]interface{}{},
178+
n.cluster.opts,
179+
))
180+
if err != nil {
181+
n.DecrementHealth()
182+
return
183+
}
184+
defer cursor.Close()
185+
186+
var status string
187+
err = cursor.One(&status)
188+
if err != nil {
189+
return
190+
}
191+
192+
if status != "OK" {
193+
n.DecrementHealth()
194+
return
195+
}
172196
}
173197

174198
// If status check was successful reset health
@@ -177,9 +201,7 @@ func (n *Node) Refresh() {
177201

178202
// DecrementHealth decreases the nodes health by 1 (the nodes health starts at maxNodeHealth)
179203
func (n *Node) DecrementHealth() {
180-
if n.cluster.opts.DiscoverHosts {
181-
atomic.AddInt64(&n.health, -1)
182-
}
204+
atomic.AddInt64(&n.health, -1)
183205
}
184206

185207
// ResetHealth sets the nodes health back to maxNodeHealth (fully healthy)

0 commit comments

Comments
 (0)