@@ -1526,79 +1526,78 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
15261526 return err
15271527 }
15281528
1529- cmdsMap := map [ int ][] Cmder {}
1529+ slottedCmds := c . slottedCommands ( cmds )
15301530 slot := - 1
1531- // get only the keyed commands
1532- keyedCmds := c .keyedCmds (cmds )
1533- if len (keyedCmds ) == 0 {
1534- // no keyed commands try random slot
1531+ switch len (slottedCmds ) {
1532+ case 0 :
15351533 slot = hashtag .RandomSlot ()
1536- } else {
1537- // keyed commands, get slot from them
1538- // if more than one slot, return cross slot error
1539- cmdsBySlot := c .mapCmdsBySlot (keyedCmds )
1540- if len (cmdsBySlot ) > 1 {
1541- // cross slot error, we have more than one slot for keyed commands
1542- setCmdsErr (cmds , ErrCrossSlot )
1543- return ErrCrossSlot
1544- }
1545- // get the slot, should be only one
1546- for sl := range cmdsBySlot {
1534+ case 1 :
1535+ for sl := range slottedCmds {
15471536 slot = sl
15481537 break
15491538 }
1550- }
1551- // slot was not determined, try random one
1552- if slot == - 1 {
1553- slot = hashtag .RandomSlot ()
1554- }
1555- cmdsMap [slot ] = cmds
1556-
1557- // TxPipeline does not support cross slot transaction.
1558- // double check the commands are in the same slot
1559- if len (cmdsMap ) > 1 {
1539+ default :
1540+ // TxPipeline does not support cross slot transaction.
15601541 setCmdsErr (cmds , ErrCrossSlot )
15611542 return ErrCrossSlot
15621543 }
15631544
1564- for slot , cmds := range cmdsMap {
1565- node , err := state .slotMasterNode (slot )
1566- if err != nil {
1567- setCmdsErr (cmds , err )
1568- continue
1569- }
1545+ node , err := state .slotMasterNode (slot )
1546+ if err != nil {
1547+ setCmdsErr (cmds , err )
1548+ return err
1549+ }
15701550
1571- cmdsMap := map [* clusterNode ][]Cmder {node : cmds }
1572- for attempt := 0 ; attempt <= c .opt .MaxRedirects ; attempt ++ {
1573- if attempt > 0 {
1574- if err := internal .Sleep (ctx , c .retryBackoff (attempt )); err != nil {
1575- setCmdsErr (cmds , err )
1576- return err
1577- }
1551+ cmdsMap := map [* clusterNode ][]Cmder {node : cmds }
1552+ for attempt := 0 ; attempt <= c .opt .MaxRedirects ; attempt ++ {
1553+ if attempt > 0 {
1554+ if err := internal .Sleep (ctx , c .retryBackoff (attempt )); err != nil {
1555+ setCmdsErr (cmds , err )
1556+ return err
15781557 }
1558+ }
15791559
1580- failedCmds := newCmdsMap ()
1581- var wg sync.WaitGroup
1560+ failedCmds := newCmdsMap ()
1561+ var wg sync.WaitGroup
15821562
1583- for node , cmds := range cmdsMap {
1584- wg .Add (1 )
1585- go func (node * clusterNode , cmds []Cmder ) {
1586- defer wg .Done ()
1587- c .processTxPipelineNode (ctx , node , cmds , failedCmds )
1588- }(node , cmds )
1589- }
1563+ for node , cmds := range cmdsMap {
1564+ wg .Add (1 )
1565+ go func (node * clusterNode , cmds []Cmder ) {
1566+ defer wg .Done ()
1567+ c .processTxPipelineNode (ctx , node , cmds , failedCmds )
1568+ }(node , cmds )
1569+ }
15901570
1591- wg .Wait ()
1592- if len (failedCmds .m ) == 0 {
1593- break
1594- }
1595- cmdsMap = failedCmds .m
1571+ wg .Wait ()
1572+ if len (failedCmds .m ) == 0 {
1573+ break
15961574 }
1575+ cmdsMap = failedCmds .m
15971576 }
15981577
15991578 return cmdsFirstErr (cmds )
16001579}
16011580
1581+ func (c * ClusterClient ) slottedCommands (cmds []Cmder ) map [int ][]Cmder {
1582+ cmdsSlots := map [int ][]Cmder {}
1583+
1584+ prefferedRandomSlot := - 1
1585+ for _ , cmd := range cmds {
1586+ if cmdFirstKeyPos (cmd ) == 0 {
1587+ continue
1588+ }
1589+
1590+ slot := c .cmdSlot (cmd , prefferedRandomSlot )
1591+ if prefferedRandomSlot == - 1 {
1592+ prefferedRandomSlot = slot
1593+ }
1594+
1595+ cmdsSlots [slot ] = append (cmdsSlots [slot ], cmd )
1596+ }
1597+
1598+ return cmdsSlots
1599+ }
1600+
16021601func (c * ClusterClient ) mapCmdsBySlot (cmds []Cmder ) map [int ][]Cmder {
16031602 cmdsMap := make (map [int ][]Cmder )
16041603 preferredRandomSlot := - 1
0 commit comments