|
8 | 8 | "time" |
9 | 9 |
|
10 | 10 | "github.com/Sirupsen/logrus" |
11 | | - "github.com/cenkalti/backoff" |
| 11 | + "github.com/cenk/backoff" |
12 | 12 | "github.com/hailocab/go-hostpool" |
13 | 13 | ) |
14 | 14 |
|
@@ -58,37 +58,68 @@ func NewCluster(hosts []Host, opts *ConnectOpts) (*Cluster, error) { |
58 | 58 |
|
59 | 59 | // Query executes a ReQL query using the cluster to connect to the database |
60 | 60 | func (c *Cluster) Query(q Query) (cursor *Cursor, err error) { |
61 | | - node, hpr, err := c.GetNextNode() |
62 | | - if err != nil { |
63 | | - return nil, err |
| 61 | + for i := 0; i < c.numRetries(); i++ { |
| 62 | + var node *Node |
| 63 | + var hpr hostpool.HostPoolResponse |
| 64 | + |
| 65 | + node, hpr, err = c.GetNextNode() |
| 66 | + if err != nil { |
| 67 | + return nil, err |
| 68 | + } |
| 69 | + |
| 70 | + cursor, err = node.Query(q) |
| 71 | + hpr.Mark(err) |
| 72 | + |
| 73 | + if !shouldRetryQuery(q, err) { |
| 74 | + break |
| 75 | + } |
64 | 76 | } |
65 | 77 |
|
66 | | - cursor, err = node.Query(q) |
67 | | - hpr.Mark(err) |
68 | 78 | return cursor, err |
69 | 79 | } |
70 | 80 |
|
71 | 81 | // Exec executes a ReQL query using the cluster to connect to the database |
72 | 82 | func (c *Cluster) Exec(q Query) (err error) { |
73 | | - node, hpr, err := c.GetNextNode() |
74 | | - if err != nil { |
75 | | - return err |
| 83 | + for i := 0; i < c.numRetries(); i++ { |
| 84 | + var node *Node |
| 85 | + var hpr hostpool.HostPoolResponse |
| 86 | + |
| 87 | + node, hpr, err = c.GetNextNode() |
| 88 | + if err != nil { |
| 89 | + return err |
| 90 | + } |
| 91 | + |
| 92 | + err = node.Exec(q) |
| 93 | + hpr.Mark(err) |
| 94 | + |
| 95 | + if !shouldRetryQuery(q, err) { |
| 96 | + break |
| 97 | + } |
76 | 98 | } |
77 | 99 |
|
78 | | - err = node.Exec(q) |
79 | | - hpr.Mark(err) |
80 | 100 | return err |
81 | 101 | } |
82 | 102 |
|
83 | 103 | // Server returns the server name and server UUID being used by a connection. |
84 | 104 | func (c *Cluster) Server() (response ServerResponse, err error) { |
85 | | - node, hpr, err := c.GetNextNode() |
86 | | - if err != nil { |
87 | | - return ServerResponse{}, err |
| 105 | + for i := 0; i < c.numRetries(); i++ { |
| 106 | + var node *Node |
| 107 | + var hpr hostpool.HostPoolResponse |
| 108 | + |
| 109 | + node, hpr, err = c.GetNextNode() |
| 110 | + if err != nil { |
| 111 | + return ServerResponse{}, err |
| 112 | + } |
| 113 | + |
| 114 | + response, err = node.Server() |
| 115 | + hpr.Mark(err) |
| 116 | + |
| 117 | + // This query should not fail so retry if any error is detected |
| 118 | + if err == nil { |
| 119 | + break |
| 120 | + } |
88 | 121 | } |
89 | 122 |
|
90 | | - response, err = node.Server() |
91 | | - hpr.Mark(err) |
92 | 123 | return response, err |
93 | 124 | } |
94 | 125 |
|
@@ -473,3 +504,11 @@ func (c *Cluster) removeNode(nodeID string) { |
473 | 504 | func (c *Cluster) nextNodeIndex() int64 { |
474 | 505 | return atomic.AddInt64(&c.nodeIndex, 1) |
475 | 506 | } |
| 507 | + |
| 508 | +func (c *Cluster) numRetries() int { |
| 509 | + if n := c.opts.NumRetries; n > 0 { |
| 510 | + return n |
| 511 | + } |
| 512 | + |
| 513 | + return 3 |
| 514 | +} |
0 commit comments