Skip to content

Commit b9272a9

Browse files
authored
Release3.2 fix 1345 (#1419)
* redis: don't recycle redis connection if error happens * redis: close client immediately if error occurs
1 parent 4eba3e1 commit b9272a9

File tree

4 files changed

+53
-27
lines changed

4 files changed

+53
-27
lines changed

pkg/topom/topom_slots.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ func (s *Topom) newSlotActionExecutor(sid int) (func(db int) (remains int, nextd
396396
if err != nil {
397397
return 0, -1, err
398398
}
399-
defer s.action.redisp.PutClient(c)
399+
defer s.action.redisp.PutClient(c, err)
400400

401401
if err := c.Select(db); err != nil {
402402
return 0, -1, err

pkg/topom/topom_stats.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (s *Topom) RefreshRedisStats(timeout time.Duration) (*sync2.Future, error)
7979
if err != nil {
8080
return nil, err
8181
}
82-
defer s.ha.redisp.PutClient(c)
82+
defer s.ha.redisp.PutClient(c, err)
8383
m, err := c.Info()
8484
if err != nil {
8585
return nil, err

pkg/utils/redis/client.go

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func (c *Client) Close() error {
5454
func (c *Client) Do(cmd string, args ...interface{}) (interface{}, error) {
5555
r, err := c.conn.Do(cmd, args...)
5656
if err != nil {
57+
c.Close()
5758
return nil, errors.Trace(err)
5859
}
5960
c.LastUse = time.Now()
@@ -64,9 +65,26 @@ func (c *Client) Do(cmd string, args ...interface{}) (interface{}, error) {
6465
return r, nil
6566
}
6667

68+
func (c *Client) Send(cmd string, args ...interface{}) error {
69+
if err := c.conn.Send(cmd, args...); err != nil {
70+
c.Close()
71+
return errors.Trace(err)
72+
}
73+
return nil
74+
}
75+
76+
func (c *Client) Flush() error {
77+
if err := c.conn.Flush(); err != nil {
78+
c.Close()
79+
return errors.Trace(err)
80+
}
81+
return nil
82+
}
83+
6784
func (c *Client) Receive() (interface{}, error) {
6885
r, err := c.conn.Receive()
6986
if err != nil {
87+
c.Close()
7088
return nil, errors.Trace(err)
7189
}
7290
c.LastUse = time.Now()
@@ -170,11 +188,11 @@ func (c *Client) SetMaster(master string) error {
170188
if err != nil {
171189
return errors.Trace(err)
172190
}
173-
c.conn.Send("MULTI")
174-
c.conn.Send("CONFIG", "SET", "masterauth", c.Auth)
175-
c.conn.Send("SLAVEOF", host, port)
176-
c.conn.Send("CONFIG", "REWRITE")
177-
c.conn.Send("CLIENT", "KILL", "TYPE", "normal")
191+
c.Send("MULTI")
192+
c.Send("CONFIG", "SET", "masterauth", c.Auth)
193+
c.Send("SLAVEOF", host, port)
194+
c.Send("CONFIG", "REWRITE")
195+
c.Send("CLIENT", "KILL", "TYPE", "normal")
178196
values, err := redigo.Values(c.Do("EXEC"))
179197
if err != nil {
180198
return errors.Trace(err)
@@ -385,10 +403,10 @@ func (p *Pool) getClientFromCache(addr string) (*Client, error) {
385403
return nil, nil
386404
}
387405

388-
func (p *Pool) PutClient(c *Client) {
406+
func (p *Pool) PutClient(c *Client, err error) {
389407
p.mu.Lock()
390408
defer p.mu.Unlock()
391-
if p.closed || !p.isRecyclable(c) {
409+
if err != nil || p.closed || !p.isRecyclable(c) {
392410
c.Close()
393411
} else {
394412
cache := p.pool[c.Addr]
@@ -400,22 +418,30 @@ func (p *Pool) PutClient(c *Client) {
400418
}
401419
}
402420

403-
func (p *Pool) Info(addr string) (map[string]string, error) {
421+
func (p *Pool) Info(addr string) (_ map[string]string, err error) {
404422
c, err := p.GetClient(addr)
405423
if err != nil {
406424
return nil, err
407425
}
408-
defer p.PutClient(c)
409-
return c.Info()
426+
defer p.PutClient(c, err)
427+
m, err := c.Info()
428+
if err != nil {
429+
return nil, err
430+
}
431+
return m, nil
410432
}
411433

412-
func (p *Pool) InfoFull(addr string) (map[string]string, error) {
434+
func (p *Pool) InfoFull(addr string) (_ map[string]string, err error) {
413435
c, err := p.GetClient(addr)
414436
if err != nil {
415437
return nil, err
416438
}
417-
defer p.PutClient(c)
418-
return c.InfoFull()
439+
defer p.PutClient(c, err)
440+
m, err := c.InfoFull()
441+
if err != nil {
442+
return nil, err
443+
}
444+
return m, nil
419445
}
420446

421447
type InfoCache struct {

pkg/utils/redis/sentinel.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@ func (s *Sentinel) subscribeCommand(client *Client, sentinel string,
110110
onSubscribed func()) error {
111111
var channels = []interface{}{"+switch-master"}
112112
go func() {
113-
client.conn.Send("SUBSCRIBE", channels...)
114-
client.conn.Flush()
113+
client.Send("SUBSCRIBE", channels...)
114+
client.Flush()
115115
}()
116116
for _, sub := range channels {
117117
values, err := redigo.Values(client.Receive())
@@ -221,10 +221,10 @@ func (s *Sentinel) Subscribe(sentinels []string, timeout time.Duration, onMajori
221221
func (s *Sentinel) existsCommand(client *Client, names []string) (map[string]bool, error) {
222222
go func() {
223223
for _, name := range names {
224-
client.conn.Send("SENTINEL", "get-master-addr-by-name", name)
224+
client.Send("SENTINEL", "get-master-addr-by-name", name)
225225
}
226226
if len(names) != 0 {
227-
client.conn.Flush()
227+
client.Flush()
228228
}
229229
}()
230230
exists := make(map[string]bool, len(names))
@@ -250,10 +250,10 @@ func (s *Sentinel) slavesCommand(client *Client, names []string) (map[string][]m
250250
continue
251251
}
252252
pending++
253-
client.conn.Send("SENTINEL", "slaves", name)
253+
client.Send("SENTINEL", "slaves", name)
254254
}
255255
if pending != 0 {
256-
client.conn.Flush()
256+
client.Flush()
257257
}
258258
}()
259259
results := make(map[string][]map[string]string, len(names))
@@ -427,10 +427,10 @@ func (s *Sentinel) monitorGroupsCommand(client *Client, sentniel string, config
427427
go func() {
428428
for gid, tcpAddr := range groups {
429429
var ip, port = tcpAddr.IP.String(), tcpAddr.Port
430-
client.conn.Send("SENTINEL", "monitor", s.NodeName(gid), ip, port, config.Quorum)
430+
client.Send("SENTINEL", "monitor", s.NodeName(gid), ip, port, config.Quorum)
431431
}
432432
if len(groups) != 0 {
433-
client.conn.Flush()
433+
client.Flush()
434434
}
435435
}()
436436
for range groups {
@@ -460,10 +460,10 @@ func (s *Sentinel) monitorGroupsCommand(client *Client, sentniel string, config
460460
if config.ClientReconfigScript != "" {
461461
args = append(args, "client-reconfig-script", config.ClientReconfigScript)
462462
}
463-
client.conn.Send("SENTINEL", args...)
463+
client.Send("SENTINEL", args...)
464464
}
465465
if len(groups) != 0 {
466-
client.conn.Flush()
466+
client.Flush()
467467
}
468468
}()
469469
for range groups {
@@ -573,10 +573,10 @@ func (s *Sentinel) removeCommand(client *Client, names []string) error {
573573
continue
574574
}
575575
pending++
576-
client.conn.Send("SENTINEL", "remove", name)
576+
client.Send("SENTINEL", "remove", name)
577577
}
578578
if pending != 0 {
579-
client.conn.Flush()
579+
client.Flush()
580580
}
581581
}()
582582
for _, name := range names {

0 commit comments

Comments
 (0)