Skip to content

Commit 67c7cf5

Browse files
committed
utils: close redis connection immediately if client is not recyclable (#1419,#1345)
1 parent d525223 commit 67c7cf5

File tree

2 files changed

+59
-19
lines changed

2 files changed

+59
-19
lines changed

pkg/utils/redis/client.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ type Client struct {
2626

2727
LastUse time.Time
2828
Timeout time.Duration
29+
30+
Pipeline struct {
31+
Send, Recv uint64
32+
}
2933
}
3034

3135
func NewClientNoAuth(addr string, timeout time.Duration) (*Client, error) {
@@ -51,6 +55,18 @@ func (c *Client) Close() error {
5155
return c.conn.Close()
5256
}
5357

58+
func (c *Client) isRecyclable() bool {
59+
switch {
60+
case c.conn.Err() != nil:
61+
return false
62+
case c.Pipeline.Send != c.Pipeline.Recv:
63+
return false
64+
case c.Timeout != 0 && c.Timeout <= time.Since(c.LastUse):
65+
return false
66+
}
67+
return true
68+
}
69+
5470
func (c *Client) Do(cmd string, args ...interface{}) (interface{}, error) {
5571
r, err := c.conn.Do(cmd, args...)
5672
if err != nil {
@@ -70,6 +86,7 @@ func (c *Client) Send(cmd string, args ...interface{}) error {
7086
c.Close()
7187
return errors.Trace(err)
7288
}
89+
c.Pipeline.Send++
7390
return nil
7491
}
7592

@@ -87,6 +104,8 @@ func (c *Client) Receive() (interface{}, error) {
87104
c.Close()
88105
return nil, errors.Trace(err)
89106
}
107+
c.Pipeline.Recv++
108+
90109
c.LastUse = time.Now()
91110

92111
if err, ok := r.(redigo.Error); ok {
@@ -327,18 +346,6 @@ func NewPool(auth string, timeout time.Duration) *Pool {
327346
return p
328347
}
329348

330-
func (p *Pool) isRecyclable(c *Client) bool {
331-
switch {
332-
case c.conn.Err() != nil:
333-
return false
334-
case p.timeout == 0:
335-
return true
336-
case p.timeout >= time.Since(c.LastUse):
337-
return true
338-
}
339-
return false
340-
}
341-
342349
func (p *Pool) Close() error {
343350
p.mu.Lock()
344351
defer p.mu.Unlock()
@@ -368,10 +375,10 @@ func (p *Pool) Cleanup() error {
368375
for addr, list := range p.pool {
369376
for i := list.Len(); i != 0; i-- {
370377
c := list.Remove(list.Front()).(*Client)
371-
if p.isRecyclable(c) {
372-
list.PushBack(c)
373-
} else {
378+
if !c.isRecyclable() {
374379
c.Close()
380+
} else {
381+
list.PushBack(c)
375382
}
376383
}
377384
if list.Len() == 0 {
@@ -398,10 +405,10 @@ func (p *Pool) getClientFromCache(addr string) (*Client, error) {
398405
if list := p.pool[addr]; list != nil {
399406
for i := list.Len(); i != 0; i-- {
400407
c := list.Remove(list.Front()).(*Client)
401-
if p.isRecyclable(c) {
402-
return c, nil
403-
} else {
408+
if !c.isRecyclable() {
404409
c.Close()
410+
} else {
411+
return c, nil
405412
}
406413
}
407414
}
@@ -411,7 +418,7 @@ func (p *Pool) getClientFromCache(addr string) (*Client, error) {
411418
func (p *Pool) PutClient(c *Client) {
412419
p.mu.Lock()
413420
defer p.mu.Unlock()
414-
if p.closed || !p.isRecyclable(c) {
421+
if !c.isRecyclable() || p.closed {
415422
c.Close()
416423
} else {
417424
cache := p.pool[c.Addr]

pkg/utils/redis/sentinel.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ func (s *Sentinel) dispatch(ctx context.Context, sentinel string, timeout time.D
108108

109109
func (s *Sentinel) subscribeCommand(client *Client, sentinel string,
110110
onSubscribed func()) error {
111+
defer func() {
112+
client.Close()
113+
}()
111114
var channels = []interface{}{"+switch-master"}
112115
go func() {
113116
client.Send("SUBSCRIBE", channels...)
@@ -219,6 +222,11 @@ func (s *Sentinel) Subscribe(sentinels []string, timeout time.Duration, onMajori
219222
}
220223

221224
func (s *Sentinel) existsCommand(client *Client, names []string) (map[string]bool, error) {
225+
defer func() {
226+
if !client.isRecyclable() {
227+
client.Close()
228+
}
229+
}()
222230
go func() {
223231
for _, name := range names {
224232
client.Send("SENTINEL", "get-master-addr-by-name", name)
@@ -239,6 +247,11 @@ func (s *Sentinel) existsCommand(client *Client, names []string) (map[string]boo
239247
}
240248

241249
func (s *Sentinel) slavesCommand(client *Client, names []string) (map[string][]map[string]string, error) {
250+
defer func() {
251+
if !client.isRecyclable() {
252+
client.Close()
253+
}
254+
}()
242255
exists, err := s.existsCommand(client, names)
243256
if err != nil {
244257
return nil, err
@@ -279,6 +292,11 @@ func (s *Sentinel) slavesCommand(client *Client, names []string) (map[string][]m
279292
}
280293

281294
func (s *Sentinel) mastersCommand(client *Client) (map[int]map[string]string, error) {
295+
defer func() {
296+
if !client.isRecyclable() {
297+
client.Close()
298+
}
299+
}()
282300
values, err := redigo.Values(client.Do("SENTINEL", "masters"))
283301
if err != nil {
284302
return nil, errors.Trace(err)
@@ -417,6 +435,11 @@ type MonitorConfig struct {
417435
}
418436

419437
func (s *Sentinel) monitorGroupsCommand(client *Client, sentniel string, config *MonitorConfig, groups map[int]*net.TCPAddr) error {
438+
defer func() {
439+
if !client.isRecyclable() {
440+
client.Close()
441+
}
442+
}()
420443
var names []string
421444
for gid := range groups {
422445
names = append(names, s.NodeName(gid))
@@ -562,6 +585,11 @@ func (s *Sentinel) MonitorGroups(sentinels []string, timeout time.Duration, conf
562585
}
563586

564587
func (s *Sentinel) removeCommand(client *Client, names []string) error {
588+
defer func() {
589+
if !client.isRecyclable() {
590+
client.Close()
591+
}
592+
}()
565593
exists, err := s.existsCommand(client, names)
566594
if err != nil {
567595
return err
@@ -708,6 +736,11 @@ type SentinelGroup struct {
708736
}
709737

710738
func (s *Sentinel) MastersAndSlavesClient(client *Client) (map[string]*SentinelGroup, error) {
739+
defer func() {
740+
if !client.isRecyclable() {
741+
client.Close()
742+
}
743+
}()
711744
masters, err := s.mastersCommand(client)
712745
if err != nil {
713746
return nil, err

0 commit comments

Comments
 (0)