Skip to content

Commit f55f3aa

Browse files
authored
fix: do not recycle the intermediate commands slice of cluster DoMulti when network errors (#706)
Signed-off-by: Rueian <[email protected]>
1 parent 9ff4c60 commit f55f3aa

File tree

1 file changed

+16
-9
lines changed

1 file changed

+16
-9
lines changed

cluster.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -645,8 +645,10 @@ func (c *clusterClient) pickMulti(ctx context.Context, multi []Completed) (*conn
645645

646646
func (c *clusterClient) doresultfn(
647647
ctx context.Context, results *redisresults, retries *connretry, mu *sync.Mutex, cc conn, cIndexes []int, commands []Completed, resps []RedisResult, attempts int,
648-
) {
648+
) (clean bool) {
649+
clean = true
649650
for i, resp := range resps {
651+
clean = clean && resp.NonRedisError() == nil
650652
ii := cIndexes[i]
651653
cm := commands[i]
652654
results.s[ii] = resp
@@ -658,7 +660,6 @@ func (c *clusterClient) doresultfn(
658660
if !c.retry || !cm.IsReadOnly() {
659661
continue
660662
}
661-
662663
retryDelay = c.retryHandler.RetryDelay(attempts, cm, resp.Error())
663664
} else {
664665
nc = c.redirectOrNew(addr, cc, cm.Slot(), mode)
@@ -685,22 +686,24 @@ func (c *clusterClient) doresultfn(
685686
mu.Unlock()
686687
}
687688
}
689+
return clean
688690
}
689691

690692
func (c *clusterClient) doretry(
691693
ctx context.Context, cc conn, results *redisresults, retries *connretry, re *retry, mu *sync.Mutex, wg *sync.WaitGroup, attempts int,
692694
) {
695+
clean := true
693696
if len(re.commands) != 0 {
694697
resps := cc.DoMulti(ctx, re.commands...)
695-
c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts)
698+
clean = c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts)
696699
resultsp.Put(resps)
697700
}
698701
if len(re.cAskings) != 0 {
699702
resps := askingMulti(cc, ctx, re.cAskings)
700-
c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts)
703+
clean = c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts) && clean
701704
resultsp.Put(resps)
702705
}
703-
if ctx.Err() == nil {
706+
if clean {
704707
retryp.Put(re)
705708
}
706709
wg.Done()
@@ -928,8 +931,10 @@ func (c *clusterClient) pickMultiCache(ctx context.Context, multi []CacheableTTL
928931

929932
func (c *clusterClient) resultcachefn(
930933
ctx context.Context, results *redisresults, retries *connretrycache, mu *sync.Mutex, cc conn, cIndexes []int, commands []CacheableTTL, resps []RedisResult, attempts int,
931-
) {
934+
) (clean bool) {
935+
clean = true
932936
for i, resp := range resps {
937+
clean = clean && resp.NonRedisError() == nil
933938
ii := cIndexes[i]
934939
cm := commands[i]
935940
results.s[ii] = resp
@@ -968,22 +973,24 @@ func (c *clusterClient) resultcachefn(
968973
mu.Unlock()
969974
}
970975
}
976+
return clean
971977
}
972978

973979
func (c *clusterClient) doretrycache(
974980
ctx context.Context, cc conn, results *redisresults, retries *connretrycache, re *retrycache, mu *sync.Mutex, wg *sync.WaitGroup, attempts int,
975981
) {
982+
clean := true
976983
if len(re.commands) != 0 {
977984
resps := cc.DoMultiCache(ctx, re.commands...)
978-
c.resultcachefn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts)
985+
clean = c.resultcachefn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts)
979986
resultsp.Put(resps)
980987
}
981988
if len(re.cAskings) != 0 {
982989
resps := askingMultiCache(cc, ctx, re.cAskings)
983-
c.resultcachefn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts)
990+
clean = c.resultcachefn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts) && clean
984991
resultsp.Put(resps)
985992
}
986-
if ctx.Err() == nil {
993+
if clean {
987994
retrycachep.Put(re)
988995
}
989996
wg.Done()

0 commit comments

Comments
 (0)