@@ -1046,7 +1046,7 @@ func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error
1046
1046
1047
1047
func (c * ClusterClient ) _processPipeline (ctx context.Context , cmds []Cmder ) error {
1048
1048
cmdsMap := newCmdsMap ()
1049
- err := c .mapCmdsByNode (cmds , cmdsMap )
1049
+ err := c .mapCmdsByNode (cmdsMap , cmds )
1050
1050
if err != nil {
1051
1051
setCmdsErr (cmds , err )
1052
1052
return err
@@ -1080,11 +1080,15 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro
1080
1080
return c .pipelineReadCmds (node , rd , cmds , failedCmds )
1081
1081
})
1082
1082
})
1083
- if err != nil {
1084
- err = c .mapCmdsByNode (cmds , failedCmds )
1085
- if err != nil {
1083
+ if err == nil {
1084
+ return
1085
+ }
1086
+ if attempt < c .opt .MaxRedirects {
1087
+ if err := c .mapCmdsByNode (failedCmds , cmds ); err != nil {
1086
1088
setCmdsErr (cmds , err )
1087
1089
}
1090
+ } else {
1091
+ setCmdsErr (cmds , err )
1088
1092
}
1089
1093
}(node , cmds )
1090
1094
}
@@ -1099,41 +1103,27 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro
1099
1103
return cmdsFirstErr (cmds )
1100
1104
}
1101
1105
1102
- type cmdsMap struct {
1103
- mu sync.Mutex
1104
- m map [* clusterNode ][]Cmder
1105
- }
1106
-
1107
- func newCmdsMap () * cmdsMap {
1108
- return & cmdsMap {
1109
- m : make (map [* clusterNode ][]Cmder ),
1110
- }
1111
- }
1112
-
1113
- func (m * cmdsMap ) Add (node * clusterNode , cmds ... Cmder ) {
1114
- m .mu .Lock ()
1115
- m .m [node ] = append (m .m [node ], cmds ... )
1116
- m .mu .Unlock ()
1117
- }
1118
-
1119
- func (c * ClusterClient ) mapCmdsByNode (cmds []Cmder , cmdsMap * cmdsMap ) error {
1106
+ func (c * ClusterClient ) mapCmdsByNode (cmdsMap * cmdsMap , cmds []Cmder ) error {
1120
1107
state , err := c .state .Get ()
1121
1108
if err != nil {
1122
1109
return err
1123
1110
}
1124
1111
1125
- cmdsAreReadOnly := c .opt .ReadOnly && c .cmdsAreReadOnly (cmds )
1112
+ if c .opt .ReadOnly && c .cmdsAreReadOnly (cmds ) {
1113
+ for _ , cmd := range cmds {
1114
+ slot := c .cmdSlot (cmd )
1115
+ node , err := c .slotReadOnlyNode (state , slot )
1116
+ if err != nil {
1117
+ return err
1118
+ }
1119
+ cmdsMap .Add (node , cmd )
1120
+ }
1121
+ return nil
1122
+ }
1123
+
1126
1124
for _ , cmd := range cmds {
1127
1125
slot := c .cmdSlot (cmd )
1128
-
1129
- var node * clusterNode
1130
- var err error
1131
- if cmdsAreReadOnly {
1132
- cmdInfo := c .cmdInfo (cmd .Name ())
1133
- node , err = c .cmdNode (cmdInfo , slot )
1134
- } else {
1135
- node , err = state .slotMasterNode (slot )
1136
- }
1126
+ node , err := state .slotMasterNode (slot )
1137
1127
if err != nil {
1138
1128
return err
1139
1129
}
@@ -1261,7 +1251,7 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
1261
1251
return err
1262
1252
}
1263
1253
1264
- err = cn .WithReader (ctx , c .opt .ReadTimeout , func (rd * proto.Reader ) error {
1254
+ return cn .WithReader (ctx , c .opt .ReadTimeout , func (rd * proto.Reader ) error {
1265
1255
err := c .txPipelineReadQueued (rd , cmds , failedCmds )
1266
1256
if err != nil {
1267
1257
moved , ask , addr := isMovedError (err )
@@ -1272,13 +1262,16 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
1272
1262
}
1273
1263
return pipelineReadCmds (rd , cmds )
1274
1264
})
1275
- return err
1276
1265
})
1277
- if err != nil {
1278
- err = c .mapCmdsByNode (cmds , failedCmds )
1279
- if err != nil {
1266
+ if err == nil {
1267
+ return
1268
+ }
1269
+ if attempt < c .opt .MaxRedirects {
1270
+ if err := c .mapCmdsByNode (failedCmds , cmds ); err != nil {
1280
1271
setCmdsErr (cmds , err )
1281
1272
}
1273
+ } else {
1274
+ setCmdsErr (cmds , err )
1282
1275
}
1283
1276
}(node , cmds )
1284
1277
}
@@ -1561,29 +1554,27 @@ func (c *ClusterClient) cmdNode(cmdInfo *CommandInfo, slot int) (*clusterNode, e
1561
1554
}
1562
1555
1563
1556
if c .opt .ReadOnly && cmdInfo != nil && cmdInfo .ReadOnly {
1564
- if c .opt .RouteByLatency {
1565
- return state .slotClosestNode (slot )
1566
- }
1567
- if c .opt .RouteRandomly {
1568
- return state .slotRandomNode (slot )
1569
- }
1570
- return state .slotSlaveNode (slot )
1557
+ return c .slotReadOnlyNode (state , slot )
1571
1558
}
1572
-
1573
1559
return state .slotMasterNode (slot )
1574
1560
}
1575
1561
1562
+ func (c * clusterClient ) slotReadOnlyNode (state * clusterState , slot int ) (* clusterNode , error ) {
1563
+ if c .opt .RouteByLatency {
1564
+ return state .slotClosestNode (slot )
1565
+ }
1566
+ if c .opt .RouteRandomly {
1567
+ return state .slotRandomNode (slot )
1568
+ }
1569
+ return state .slotSlaveNode (slot )
1570
+ }
1571
+
1576
1572
func (c * ClusterClient ) slotMasterNode (slot int ) (* clusterNode , error ) {
1577
1573
state , err := c .state .Get ()
1578
1574
if err != nil {
1579
1575
return nil , err
1580
1576
}
1581
-
1582
- nodes := state .slotNodes (slot )
1583
- if len (nodes ) > 0 {
1584
- return nodes [0 ], nil
1585
- }
1586
- return c .nodes .Random ()
1577
+ return state .slotMasterNode (slot )
1587
1578
}
1588
1579
1589
1580
func appendUniqueNode (nodes []* clusterNode , node * clusterNode ) []* clusterNode {
@@ -1622,3 +1613,22 @@ func remove(ss []string, es ...string) []string {
1622
1613
}
1623
1614
return ss
1624
1615
}
1616
+
1617
+ //------------------------------------------------------------------------------
1618
+
1619
+ type cmdsMap struct {
1620
+ mu sync.Mutex
1621
+ m map [* clusterNode ][]Cmder
1622
+ }
1623
+
1624
+ func newCmdsMap () * cmdsMap {
1625
+ return & cmdsMap {
1626
+ m : make (map [* clusterNode ][]Cmder ),
1627
+ }
1628
+ }
1629
+
1630
+ func (m * cmdsMap ) Add (node * clusterNode , cmds ... Cmder ) {
1631
+ m .mu .Lock ()
1632
+ m .m [node ] = append (m .m [node ], cmds ... )
1633
+ m .mu .Unlock ()
1634
+ }
0 commit comments