Skip to content

Commit 2a32680

Browse files
committed
fix(txpipeline): keyless commands should take the slot of the keyed commands
1 parent 4c635cc commit 2a32680

File tree

1 file changed

+41
-1
lines changed

1 file changed

+41
-1
lines changed

osscluster.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1507,8 +1507,36 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
15071507
return err
15081508
}
15091509

1510-
cmdsMap := c.mapCmdsBySlot(cmds)
1510+
cmdsMap := map[int][]Cmder{}
1511+
slot := -1
1512+
// split keyed and keyless commands
1513+
keyedCmds, _ := c.keyedAndKeyessCmds(cmds)
1514+
if len(keyedCmds) == 0 {
1515+
// no keyed commands try random slot
1516+
slot = hashtag.RandomSlot()
1517+
} else {
1518+
// keyed commands, get slot from them
1519+
// if more than one slot, return cross slot error
1520+
cmdsBySlot := c.mapCmdsBySlot(keyedCmds)
1521+
if len(cmdsBySlot) > 1 {
1522+
// cross slot error, we have more than one slot for keyed commands
1523+
setCmdsErr(cmds, ErrCrossSlot)
1524+
return ErrCrossSlot
1525+
}
1526+
// get the slot, should be only one
1527+
for sl := range cmdsBySlot {
1528+
slot = sl
1529+
break
1530+
}
1531+
}
1532+
// slot was not determined, try random one
1533+
if slot == -1 {
1534+
slot = hashtag.RandomSlot()
1535+
}
1536+
cmdsMap[slot] = cmds
1537+
15111538
// TxPipeline does not support cross slot transaction.
1539+
// double check the commands are in the same slot
15121540
if len(cmdsMap) > 1 {
15131541
setCmdsErr(cmds, ErrCrossSlot)
15141542
return ErrCrossSlot
@@ -1560,6 +1588,18 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
15601588
}
15611589
return cmdsMap
15621590
}
1591+
func (c *ClusterClient) keyedAndKeyessCmds(cmds []Cmder) ([]Cmder, []Cmder) {
1592+
keyedCmds := make([]Cmder, 0, len(cmds))
1593+
keylessCmds := make([]Cmder, 0, len(cmds))
1594+
for _, cmd := range cmds {
1595+
if cmdFirstKeyPos(cmd) == 0 {
1596+
keylessCmds = append(keylessCmds, cmd)
1597+
} else {
1598+
keyedCmds = append(keyedCmds, cmd)
1599+
}
1600+
}
1601+
return keyedCmds, keylessCmds
1602+
}
15631603

15641604
func (c *ClusterClient) processTxPipelineNode(
15651605
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,

0 commit comments

Comments
 (0)