Skip to content

Commit 99daad7

Browse files
authored
fix: broken tx retries for cluster clients after #697 (#709)
* fix: broken tx retries for cluster clients after #697 Signed-off-by: Rueian <[email protected]> * fix: broken tx retris for cluster clients after #697 Signed-off-by: Rueian <[email protected]> * fix: broken tx retris for cluster clients after #697 Signed-off-by: Rueian <[email protected]> --------- Signed-off-by: Rueian <[email protected]>
1 parent 3b9473a commit 99daad7

File tree

3 files changed

+852
-35
lines changed

3 files changed

+852
-35
lines changed

cluster.go

Lines changed: 81 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -525,9 +525,8 @@ func (c *clusterClient) toReplica(cmd Completed) bool {
525525
return false
526526
}
527527

528-
func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
528+
func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, init bool) {
529529
last := cmds.InitSlot
530-
init := false
531530

532531
for _, cmd := range multi {
533532
if cmd.Slot() == cmds.InitSlot {
@@ -550,7 +549,7 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
550549
cc = c.pslots[cmd.Slot()]
551550
}
552551
if cc == nil {
553-
return nil
552+
return nil, false
554553
}
555554
count.m[cc]++
556555
}
@@ -569,13 +568,13 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
569568
cc = c.pslots[cmd.Slot()]
570569
}
571570
if cc == nil { // check cc == nil again in case of non-deterministic SendToReplicas.
572-
return nil
571+
return nil, false
573572
}
574573
re := retries.m[cc]
575574
re.commands = append(re.commands, cmd)
576575
re.cIndexes = append(re.cIndexes, i)
577576
}
578-
return retries
577+
return retries, init
579578
}
580579

581580
inits := 0
@@ -589,25 +588,28 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
589588
} else if init && last != cmd.Slot() {
590589
panic(panicMixCxSlot)
591590
}
592-
p := c.pslots[cmd.Slot()]
593-
if p == nil {
594-
return nil
591+
cc := c.pslots[cmd.Slot()]
592+
if cc == nil {
593+
return nil, false
595594
}
596-
count.m[p]++
595+
count.m[cc]++
597596
}
598597

599598
if last == cmds.InitSlot {
600599
// if all commands have no slots, such as INFO, we pick a non-nil slot.
601-
for i, p := range c.pslots {
602-
if p != nil {
600+
for i, cc := range c.pslots {
601+
if cc != nil {
603602
last = uint16(i)
604-
count.m[p] = inits
603+
count.m[cc] = inits
605604
break
606605
}
607606
}
608607
if last == cmds.InitSlot {
609-
return nil
608+
return nil, false
610609
}
610+
} else if init {
611+
cc := c.pslots[last]
612+
count.m[cc] += inits
611613
}
612614

613615
retries = connretryp.Get(len(count.m), len(count.m))
@@ -627,25 +629,34 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry) {
627629
re.commands = append(re.commands, cmd)
628630
re.cIndexes = append(re.cIndexes, i)
629631
}
630-
return retries
632+
return retries, init
631633
}
632634

633-
func (c *clusterClient) pickMulti(ctx context.Context, multi []Completed) (*connretry, error) {
634-
conns := c._pickMulti(multi)
635+
func (c *clusterClient) pickMulti(ctx context.Context, multi []Completed) (*connretry, bool, error) {
636+
conns, hasInit := c._pickMulti(multi)
635637
if conns == nil {
636638
if err := c.refresh(ctx); err != nil {
637-
return nil, err
639+
return nil, false, err
638640
}
639-
if conns = c._pickMulti(multi); conns == nil {
640-
return nil, ErrNoSlot
641+
if conns, hasInit = c._pickMulti(multi); conns == nil {
642+
return nil, false, ErrNoSlot
641643
}
642644
}
643-
return conns, nil
645+
return conns, hasInit, nil
646+
}
647+
648+
func isMulti(cmd Completed) bool {
649+
return len(cmd.Commands()) == 1 && cmd.Commands()[0] == "MULTI"
650+
}
651+
func isExec(cmd Completed) bool {
652+
return len(cmd.Commands()) == 1 && cmd.Commands()[0] == "EXEC"
644653
}
645654

646655
func (c *clusterClient) doresultfn(
647-
ctx context.Context, results *redisresults, retries *connretry, mu *sync.Mutex, cc conn, cIndexes []int, commands []Completed, resps []RedisResult, attempts int,
656+
ctx context.Context, results *redisresults, retries *connretry, mu *sync.Mutex, cc conn, cIndexes []int, commands []Completed, resps []RedisResult, attempts int, hasInit bool,
648657
) (clean bool) {
658+
mi := -1
659+
ei := -1
649660
clean = true
650661
for i, resp := range resps {
651662
clean = clean && resp.NonRedisError() == nil
@@ -664,6 +675,37 @@ func (c *clusterClient) doresultfn(
664675
} else {
665676
nc = c.redirectOrNew(addr, cc, cm.Slot(), mode)
666677
}
678+
if hasInit && ei < i { // find out if there is a transaction block or not.
679+
for mi = i; mi >= 0 && !isMulti(commands[mi]) && !isExec(commands[mi]); mi-- {
680+
}
681+
for ei = i; ei < len(commands) && !isMulti(commands[ei]) && !isExec(commands[ei]); ei++ {
682+
}
683+
if mi >= 0 && ei < len(commands) && isMulti(commands[mi]) && isExec(commands[ei]) && resps[mi].val.string == ok { // a transaction is found.
684+
mu.Lock()
685+
retries.Redirects++
686+
nr := retries.m[nc]
687+
if nr == nil {
688+
nr = retryp.Get(0, len(commands))
689+
retries.m[nc] = nr
690+
}
691+
for i := mi; i <= ei; i++ {
692+
ii := cIndexes[i]
693+
cm := commands[i]
694+
if mode == RedirectAsk {
695+
nr.aIndexes = append(nr.aIndexes, ii)
696+
nr.cAskings = append(nr.cAskings, cm)
697+
} else {
698+
nr.cIndexes = append(nr.cIndexes, ii)
699+
nr.commands = append(nr.commands, cm)
700+
}
701+
}
702+
mu.Unlock()
703+
continue // the transaction has been added to the retries, go to the next cmd.
704+
}
705+
}
706+
if hasInit && mi < i && i < ei && mi >= 0 && isMulti(commands[mi]) {
707+
continue // the current cmd is in the processed transaction and has been added to the retries.
708+
}
667709
mu.Lock()
668710
if mode != RedirectRetry {
669711
retries.Redirects++
@@ -690,17 +732,17 @@ func (c *clusterClient) doresultfn(
690732
}
691733

692734
func (c *clusterClient) doretry(
693-
ctx context.Context, cc conn, results *redisresults, retries *connretry, re *retry, mu *sync.Mutex, wg *sync.WaitGroup, attempts int,
735+
ctx context.Context, cc conn, results *redisresults, retries *connretry, re *retry, mu *sync.Mutex, wg *sync.WaitGroup, attempts int, hasInit bool,
694736
) {
695737
clean := true
696738
if len(re.commands) != 0 {
697739
resps := cc.DoMulti(ctx, re.commands...)
698-
clean = c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts)
740+
clean = c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts, hasInit)
699741
resultsp.Put(resps)
700742
}
701743
if len(re.cAskings) != 0 {
702744
resps := askingMulti(cc, ctx, re.cAskings)
703-
clean = c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts) && clean
745+
clean = c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts, hasInit) && clean
704746
resultsp.Put(resps)
705747
}
706748
if clean {
@@ -714,7 +756,7 @@ func (c *clusterClient) DoMulti(ctx context.Context, multi ...Completed) []Redis
714756
return nil
715757
}
716758

717-
retries, err := c.pickMulti(ctx, multi)
759+
retries, hasInit, err := c.pickMulti(ctx, multi)
718760
if err != nil {
719761
return fillErrs(len(multi), err)
720762
}
@@ -742,18 +784,17 @@ retry:
742784
}
743785
for cc, re := range retries.m {
744786
delete(retries.m, cc)
745-
go c.doretry(ctx, cc, results, retries, re, &mu, &wg, attempts)
787+
go c.doretry(ctx, cc, results, retries, re, &mu, &wg, attempts, hasInit)
746788
}
747789
mu.Unlock()
748-
c.doretry(ctx, cc1, results, retries, re1, &mu, &wg, attempts)
790+
c.doretry(ctx, cc1, results, retries, re1, &mu, &wg, attempts, hasInit)
749791
wg.Wait()
750792

751793
if len(retries.m) != 0 {
752794
if retries.Redirects > 0 {
753795
retries.Redirects = 0
754796
goto retry
755797
}
756-
757798
if retries.RetryDelay >= 0 {
758799
c.retryHandler.WaitForRetry(ctx, retries.RetryDelay)
759800
attempts++
@@ -817,14 +858,23 @@ func (c *clusterClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Dur
817858
}
818859

819860
func askingMulti(cc conn, ctx context.Context, multi []Completed) *redisresults {
861+
var inTx bool
820862
commands := make([]Completed, 0, len(multi)*2)
821863
for _, cmd := range multi {
822-
commands = append(commands, cmds.AskingCmd, cmd)
864+
if inTx {
865+
commands = append(commands, cmd)
866+
inTx = !isExec(cmd)
867+
} else {
868+
commands = append(commands, cmds.AskingCmd, cmd)
869+
inTx = isMulti(cmd)
870+
}
823871
}
824872
results := resultsp.Get(0, len(multi))
825873
resps := cc.DoMulti(ctx, commands...)
826-
for i := 1; i < len(resps.s); i += 2 {
827-
results.s = append(results.s, resps.s[i])
874+
for i, resp := range resps.s {
875+
if commands[i] != cmds.AskingCmd {
876+
results.s = append(results.s, resp)
877+
}
828878
}
829879
resultsp.Put(resps)
830880
return results
@@ -946,7 +996,6 @@ func (c *clusterClient) resultcachefn(
946996
if !c.retry {
947997
continue
948998
}
949-
950999
retryDelay = c.retryHandler.RetryDelay(attempts, Completed(cm.Cmd), resp.Error())
9511000
} else {
9521001
nc = c.redirectOrNew(addr, cc, cm.Cmd.Slot(), mode)
@@ -1040,7 +1089,6 @@ retry:
10401089
retries.Redirects = 0
10411090
goto retry
10421091
}
1043-
10441092
if retries.RetryDelay >= 0 {
10451093
c.retryHandler.WaitForRetry(ctx, retries.RetryDelay)
10461094
attempts++

0 commit comments

Comments
 (0)