Skip to content

Commit f437f0e

Browse files
authored
perf: reduce goroutines used by DoMulti/DoMultiCache in a cluster client (#699)
Signed-off-by: Rueian <[email protected]>
1 parent 7e284ae commit f437f0e

File tree

1 file changed

+26
-6
lines changed

1 file changed

+26
-6
lines changed

cluster.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -544,16 +544,16 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
544544

545545
if !init && c.rslots != nil && c.opt.SendToReplicas != nil {
546546
for _, cmd := range multi {
547-
var p conn
547+
var cc conn
548548
if c.opt.SendToReplicas(cmd) {
549-
p = c.rslots[cmd.Slot()]
549+
cc = c.rslots[cmd.Slot()]
550550
} else {
551-
p = c.pslots[cmd.Slot()]
551+
cc = c.pslots[cmd.Slot()]
552552
}
553-
if p == nil {
553+
if cc == nil {
554554
return nil
555555
}
556-
count.m[p]++
556+
count.m[cc]++
557557
}
558558

559559
retries = connretryp.Get(len(count.m), len(count.m))
@@ -569,7 +569,9 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
569569
} else {
570570
cc = c.pslots[cmd.Slot()]
571571
}
572-
572+
if cc == nil { // check cc == nil again in case of non-deterministic SendToReplicas.
573+
return nil
574+
}
573575
re := retries.m[cc]
574576
re.commands = append(re.commands, cmd)
575577
re.cIndexes = append(re.cIndexes, i)
@@ -726,13 +728,22 @@ func (c *clusterClient) DoMulti(ctx context.Context, multi ...Completed) []Redis
726728
retry:
727729
retries.RetryDelay = -1 // Assume no retry. Because client retry flag can be set to false.
728730

731+
var cc1 conn
732+
var re1 *retry
729733
wg.Add(len(retries.m))
730734
mu.Lock()
735+
for cc, re := range retries.m {
736+
delete(retries.m, cc)
737+
cc1 = cc
738+
re1 = re
739+
break
740+
}
731741
for cc, re := range retries.m {
732742
delete(retries.m, cc)
733743
go c.doretry(ctx, cc, results, retries, re, &mu, &wg, attempts)
734744
}
735745
mu.Unlock()
746+
c.doretry(ctx, cc1, results, retries, re1, &mu, &wg, attempts)
736747
wg.Wait()
737748

738749
if len(retries.m) != 0 {
@@ -997,13 +1008,22 @@ func (c *clusterClient) DoMultiCache(ctx context.Context, multi ...CacheableTTL)
9971008
retry:
9981009
retries.RetryDelay = -1 // Assume no retry. Because client retry flag can be set to false.
9991010

1011+
var cc1 conn
1012+
var re1 *retrycache
10001013
wg.Add(len(retries.m))
10011014
mu.Lock()
1015+
for cc, re := range retries.m {
1016+
delete(retries.m, cc)
1017+
cc1 = cc
1018+
re1 = re
1019+
break
1020+
}
10021021
for cc, re := range retries.m {
10031022
delete(retries.m, cc)
10041023
go c.doretrycache(ctx, cc, results, retries, re, &mu, &wg, attempts)
10051024
}
10061025
mu.Unlock()
1026+
c.doretrycache(ctx, cc1, results, retries, re1, &mu, &wg, attempts)
10071027
wg.Wait()
10081028

10091029
if len(retries.m) != 0 {

0 commit comments

Comments
 (0)