Skip to content

Commit e0e0a10

Browse files
committed
sentinel: pipelined commands
1 parent 8be73f1 commit e0e0a10

File tree

2 files changed

+141
-88
lines changed

2 files changed

+141
-88
lines changed

pkg/utils/redis/client.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,17 +64,6 @@ func (c *Client) Do(cmd string, args ...interface{}) (interface{}, error) {
6464
return r, nil
6565
}
6666

67-
func (c *Client) Flush(cmd string, args ...interface{}) error {
68-
if err := c.conn.Send(cmd, args...); err != nil {
69-
return errors.Trace(err)
70-
}
71-
if err := c.conn.Flush(); err != nil {
72-
return errors.Trace(err)
73-
}
74-
c.LastUse = time.Now()
75-
return nil
76-
}
77-
7867
func (c *Client) Receive() (interface{}, error) {
7968
r, err := c.conn.Receive()
8069
if err != nil {

pkg/utils/redis/sentinel.go

Lines changed: 141 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,10 @@ func (s *Sentinel) dispatch(ctx context.Context, sentinel string, timeout time.D
109109
func (s *Sentinel) subscribeCommand(client *Client, sentinel string,
110110
onSubscribed func()) error {
111111
var channels = []interface{}{"+switch-master"}
112-
if err := client.Flush("SUBSCRIBE", channels...); err != nil {
113-
return errors.Trace(err)
114-
}
112+
go func() {
113+
client.conn.Send("SUBSCRIBE", channels...)
114+
client.conn.Flush()
115+
}()
115116
for _, sub := range channels {
116117
values, err := redigo.Values(client.Receive())
117118
if err != nil {
@@ -217,46 +218,66 @@ func (s *Sentinel) Subscribe(sentinels []string, timeout time.Duration, onMajori
217218
}
218219
}
219220

220-
func (s *Sentinel) existsCommand(client *Client, name string) (bool, error) {
221-
r, err := client.Do("SENTINEL", "get-master-addr-by-name", name)
222-
if err != nil {
223-
return false, errors.Trace(err)
221+
func (s *Sentinel) existsCommand(client *Client, names []string) (map[string]bool, error) {
222+
go func() {
223+
var pending int
224+
for _, name := range names {
225+
pending++
226+
client.conn.Send("SENTINEL", "get-master-addr-by-name", name)
227+
}
228+
if pending != 0 {
229+
client.conn.Flush()
230+
}
231+
}()
232+
exists := make(map[string]bool, len(names))
233+
for _, name := range names {
234+
r, err := client.Receive()
235+
if err != nil {
236+
return nil, errors.Trace(err)
237+
}
238+
exists[name] = (r != nil)
224239
}
225-
return r != nil, nil
240+
return exists, nil
226241
}
227242

228-
func (s *Sentinel) masterCommand(client *Client, name string) (map[string]string, error) {
229-
if exists, err := s.existsCommand(client, name); err != nil {
230-
return nil, err
231-
} else if !exists {
232-
return nil, nil
233-
}
234-
m, err := redigo.StringMap(client.Do("SENTINEL", "master", name))
243+
func (s *Sentinel) slavesCommand(client *Client, names []string) (map[string][]map[string]string, error) {
244+
exists, err := s.existsCommand(client, names)
235245
if err != nil {
236-
return nil, errors.Trace(err)
237-
}
238-
return m, nil
239-
}
240-
241-
func (s *Sentinel) slavesCommand(client *Client, name string) ([]map[string]string, error) {
242-
if exists, err := s.existsCommand(client, name); err != nil {
243246
return nil, err
244-
} else if !exists {
245-
return nil, nil
246247
}
247-
values, err := redigo.Values(client.Do("SENTINEL", "slaves", name))
248-
if err != nil {
249-
return nil, errors.Trace(err)
250-
}
251-
var slaves []map[string]string
252-
for i := range values {
253-
m, err := redigo.StringMap(values[i], nil)
248+
go func() {
249+
var pending int
250+
for _, name := range names {
251+
if !exists[name] {
252+
continue
253+
}
254+
pending++
255+
client.conn.Send("SENTINEL", "slaves", name)
256+
}
257+
if pending != 0 {
258+
client.conn.Flush()
259+
}
260+
}()
261+
results := make(map[string][]map[string]string, len(names))
262+
for _, name := range names {
263+
if !exists[name] {
264+
continue
265+
}
266+
values, err := redigo.Values(client.Receive())
254267
if err != nil {
255268
return nil, errors.Trace(err)
256269
}
257-
slaves = append(slaves, m)
270+
var slaves []map[string]string
271+
for i := range values {
272+
m, err := redigo.StringMap(values[i], nil)
273+
if err != nil {
274+
return nil, errors.Trace(err)
275+
}
276+
slaves = append(slaves, m)
277+
}
278+
results[name] = slaves
258279
}
259-
return slaves, nil
280+
return results, nil
260281
}
261282

262283
func (s *Sentinel) mastersCommand(client *Client) (map[int]map[string]string, error) {
@@ -266,13 +287,13 @@ func (s *Sentinel) mastersCommand(client *Client) (map[int]map[string]string, er
266287
}
267288
var masters = make(map[int]map[string]string)
268289
for i := range values {
269-
m, err := redigo.StringMap(values[i], nil)
290+
p, err := redigo.StringMap(values[i], nil)
270291
if err != nil {
271292
return nil, errors.Trace(err)
272293
}
273-
gid, yes := s.isSameProduct(m["name"])
294+
gid, yes := s.isSameProduct(p["name"])
274295
if yes {
275-
masters[gid] = m
296+
masters[gid] = p
276297
}
277298
}
278299
return masters, nil
@@ -281,11 +302,11 @@ func (s *Sentinel) mastersCommand(client *Client) (map[int]map[string]string, er
281302
func (s *Sentinel) mastersDispatch(ctx context.Context, sentinel string, timeout time.Duration) (map[int]*SentinelMaster, error) {
282303
var masters = make(map[int]*SentinelMaster)
283304
var err = s.dispatch(ctx, sentinel, timeout, func(c *Client) error {
284-
m, err := s.mastersCommand(c)
305+
p, err := s.mastersCommand(c)
285306
if err != nil {
286307
return err
287308
}
288-
for gid, master := range m {
309+
for gid, master := range p {
289310
epoch, err := strconv.ParseInt(master["config-epoch"], 10, 64)
290311
if err != nil {
291312
s.printf("sentinel-[%s] masters parse %s failed, config-epoch = '%s', %s",
@@ -398,22 +419,32 @@ type MonitorConfig struct {
398419
}
399420

400421
func (s *Sentinel) monitorGroupsCommand(client *Client, sentniel string, config *MonitorConfig, groups map[int]*net.TCPAddr) error {
401-
for gid, tcpAddr := range groups {
402-
var name = s.NodeName(gid)
403-
if exists, err := s.existsCommand(client, name); err != nil {
404-
return err
405-
} else if exists {
406-
_, err := client.Do("SENTINEL", "remove", name)
407-
if err != nil {
408-
return errors.Trace(err)
409-
}
422+
var names []string
423+
for gid := range groups {
424+
names = append(names, s.NodeName(gid))
425+
}
426+
if err := s.removeCommand(client, names); err != nil {
427+
return err
428+
}
429+
go func() {
430+
for gid, tcpAddr := range groups {
431+
var ip, port = tcpAddr.IP.String(), tcpAddr.Port
432+
client.conn.Send("SENTINEL", "monitor", s.NodeName(gid), ip, port, config.Quorum)
433+
}
434+
if len(groups) != 0 {
435+
client.conn.Flush()
410436
}
411-
var ip, port = tcpAddr.IP.String(), tcpAddr.Port
412-
_, err := client.Do("SENTINEL", "monitor", name, ip, port, config.Quorum)
437+
}()
438+
for _ = range groups {
439+
_, err := client.Receive()
413440
if err != nil {
414441
return errors.Trace(err)
415-
} else {
416-
var args = []interface{}{"set", name}
442+
}
443+
}
444+
go func() {
445+
var pending int
446+
for gid := range groups {
447+
var args = []interface{}{"set", s.NodeName(gid)}
417448
if config.ParallelSyncs != 0 {
418449
args = append(args, "parallel-syncs", config.ParallelSyncs)
419450
}
@@ -432,10 +463,20 @@ func (s *Sentinel) monitorGroupsCommand(client *Client, sentniel string, config
432463
if config.ClientReconfigScript != "" {
433464
args = append(args, "client-reconfig-script", config.ClientReconfigScript)
434465
}
435-
_, err := client.Do("SENTINEL", args...)
436-
if err != nil {
437-
return errors.Trace(err)
466+
if len(args) == 2 {
467+
continue
438468
}
469+
pending++
470+
client.conn.Send("SENTINEL", args...)
471+
}
472+
if pending != 0 {
473+
client.conn.Flush()
474+
}
475+
}()
476+
for _ = range groups {
477+
_, err := client.Receive()
478+
if err != nil {
479+
return errors.Trace(err)
439480
}
440481
}
441482
return nil
@@ -527,25 +568,44 @@ func (s *Sentinel) MonitorGroups(sentinels []string, timeout time.Duration, conf
527568
return last
528569
}
529570

530-
func (s *Sentinel) removeGroupsCommand(client *Client, groups map[int]bool) error {
531-
for gid := range groups {
532-
var name = s.NodeName(gid)
533-
if exists, err := s.existsCommand(client, name); err != nil {
534-
return err
535-
} else if exists {
536-
_, err := client.Do("SENTINEL", "remove", name)
537-
if err != nil {
538-
return errors.Trace(err)
571+
func (s *Sentinel) removeCommand(client *Client, names []string) error {
572+
exists, err := s.existsCommand(client, names)
573+
if err != nil {
574+
return err
575+
}
576+
go func() {
577+
var pending int
578+
for _, name := range names {
579+
if !exists[name] {
580+
continue
539581
}
582+
pending++
583+
client.conn.Send("SENTINEL", "remove", name)
584+
}
585+
if pending != 0 {
586+
client.conn.Flush()
587+
}
588+
}()
589+
for _, name := range names {
590+
if !exists[name] {
591+
continue
592+
}
593+
_, err := client.Receive()
594+
if err != nil {
595+
return errors.Trace(err)
540596
}
541597
}
542598
return nil
543599
}
544600

545601
func (s *Sentinel) removeGroupsDispatch(ctx context.Context, sentinel string, timeout time.Duration,
546602
groups map[int]bool) error {
603+
var names []string
604+
for gid := range groups {
605+
names = append(names, s.NodeName(gid))
606+
}
547607
var err = s.dispatch(ctx, sentinel, timeout, func(c *Client) error {
548-
return s.removeGroupsCommand(c, groups)
608+
return s.removeCommand(c, names)
549609
})
550610
if err != nil {
551611
switch errors.Cause(err) {
@@ -594,15 +654,15 @@ func (s *Sentinel) RemoveGroups(sentinels []string, timeout time.Duration, group
594654

595655
func (s *Sentinel) removeGroupsAllDispatch(ctx context.Context, sentinel string, timeout time.Duration) error {
596656
var err = s.dispatch(ctx, sentinel, timeout, func(c *Client) error {
597-
m, err := s.mastersCommand(c)
657+
masters, err := s.mastersCommand(c)
598658
if err != nil {
599659
return err
600660
}
601-
var groups = make(map[int]bool)
602-
for gid := range m {
603-
groups[gid] = true
661+
var names []string
662+
for gid := range masters {
663+
names = append(names, s.NodeName(gid))
604664
}
605-
return s.removeGroupsCommand(c, groups)
665+
return s.removeCommand(c, names)
606666
})
607667
if err != nil {
608668
switch errors.Cause(err) {
@@ -659,15 +719,19 @@ func (s *Sentinel) MastersAndSlavesClient(client *Client) (map[string]*SentinelG
659719
if err != nil {
660720
return nil, err
661721
}
662-
results := make(map[string]*SentinelGroup)
663-
for _, master := range masters {
664-
var name = master["name"]
665-
slaves, err := s.slavesCommand(client, name)
666-
if err != nil {
667-
return nil, err
668-
}
722+
var names []string
723+
for gid := range masters {
724+
names = append(names, s.NodeName(gid))
725+
}
726+
slaves, err := s.slavesCommand(client, names)
727+
if err != nil {
728+
return nil, err
729+
}
730+
results := make(map[string]*SentinelGroup, len(masters))
731+
for gid, master := range masters {
732+
var name = s.NodeName(gid)
669733
results[name] = &SentinelGroup{
670-
Master: master, Slaves: slaves,
734+
Master: master, Slaves: slaves[name],
671735
}
672736
}
673737
return results, nil

0 commit comments

Comments
 (0)