@@ -773,13 +773,13 @@ func (c *ClusterClient) _process(ctx context.Context, cmd Cmder) error {
773
773
774
774
if ask {
775
775
pipe := node .Client .Pipeline ()
776
- _ = pipe .Process (NewCmd ("ASKING " ))
776
+ _ = pipe .Process (NewCmd ("asking " ))
777
777
_ = pipe .Process (cmd )
778
778
_ , lastErr = pipe .ExecContext (ctx )
779
779
_ = pipe .Close ()
780
780
ask = false
781
781
} else {
782
- lastErr = node .Client ._process (ctx , cmd )
782
+ lastErr = node .Client .ProcessContext (ctx , cmd )
783
783
}
784
784
785
785
// If there is no error - we are done.
@@ -840,6 +840,7 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
840
840
841
841
var wg sync.WaitGroup
842
842
errCh := make (chan error , 1 )
843
+
843
844
for _ , master := range state .Masters {
844
845
wg .Add (1 )
845
846
go func (node * clusterNode ) {
@@ -853,6 +854,7 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
853
854
}
854
855
}(master )
855
856
}
857
+
856
858
wg .Wait ()
857
859
858
860
select {
@@ -873,6 +875,7 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
873
875
874
876
var wg sync.WaitGroup
875
877
errCh := make (chan error , 1 )
878
+
876
879
for _ , slave := range state .Slaves {
877
880
wg .Add (1 )
878
881
go func (node * clusterNode ) {
@@ -886,6 +889,7 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
886
889
}
887
890
}(slave )
888
891
}
892
+
889
893
wg .Wait ()
890
894
891
895
select {
@@ -906,6 +910,7 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
906
910
907
911
var wg sync.WaitGroup
908
912
errCh := make (chan error , 1 )
913
+
909
914
worker := func (node * clusterNode ) {
910
915
defer wg .Done ()
911
916
err := fn (node .Client )
@@ -927,6 +932,7 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
927
932
}
928
933
929
934
wg .Wait ()
935
+
930
936
select {
931
937
case err := <- errCh :
932
938
return err
@@ -1068,18 +1074,7 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro
1068
1074
go func (node * clusterNode , cmds []Cmder ) {
1069
1075
defer wg .Done ()
1070
1076
1071
- err := node .Client .withConn (ctx , func (ctx context.Context , cn * pool.Conn ) error {
1072
- err := cn .WithWriter (ctx , c .opt .WriteTimeout , func (wr * proto.Writer ) error {
1073
- return writeCmd (wr , cmds ... )
1074
- })
1075
- if err != nil {
1076
- return err
1077
- }
1078
-
1079
- return cn .WithReader (ctx , c .opt .ReadTimeout , func (rd * proto.Reader ) error {
1080
- return c .pipelineReadCmds (node , rd , cmds , failedCmds )
1081
- })
1082
- })
1077
+ err := c ._processPipelineNode (ctx , node , cmds , failedCmds )
1083
1078
if err == nil {
1084
1079
return
1085
1080
}
@@ -1142,6 +1137,25 @@ func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
1142
1137
return true
1143
1138
}
1144
1139
1140
+ func (c * ClusterClient ) _processPipelineNode (
1141
+ ctx context.Context , node * clusterNode , cmds []Cmder , failedCmds * cmdsMap ,
1142
+ ) error {
1143
+ return node .Client .hooks .processPipeline (ctx , cmds , func (ctx context.Context , cmds []Cmder ) error {
1144
+ return node .Client .withConn (ctx , func (ctx context.Context , cn * pool.Conn ) error {
1145
+ err := cn .WithWriter (ctx , c .opt .WriteTimeout , func (wr * proto.Writer ) error {
1146
+ return writeCmds (wr , cmds )
1147
+ })
1148
+ if err != nil {
1149
+ return err
1150
+ }
1151
+
1152
+ return cn .WithReader (ctx , c .opt .ReadTimeout , func (rd * proto.Reader ) error {
1153
+ return c .pipelineReadCmds (node , rd , cmds , failedCmds )
1154
+ })
1155
+ })
1156
+ })
1157
+ }
1158
+
1145
1159
func (c * ClusterClient ) pipelineReadCmds (
1146
1160
node * clusterNode , rd * proto.Reader , cmds []Cmder , failedCmds * cmdsMap ,
1147
1161
) error {
@@ -1186,7 +1200,7 @@ func (c *ClusterClient) checkMovedErr(
1186
1200
}
1187
1201
1188
1202
if ask {
1189
- failedCmds .Add (node , NewCmd ("ASKING " ), cmd )
1203
+ failedCmds .Add (node , NewCmd ("asking " ), cmd )
1190
1204
return true
1191
1205
}
1192
1206
@@ -1243,26 +1257,7 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
1243
1257
go func (node * clusterNode , cmds []Cmder ) {
1244
1258
defer wg .Done ()
1245
1259
1246
- err := node .Client .withConn (ctx , func (ctx context.Context , cn * pool.Conn ) error {
1247
- err := cn .WithWriter (ctx , c .opt .WriteTimeout , func (wr * proto.Writer ) error {
1248
- return txPipelineWriteMulti (wr , cmds )
1249
- })
1250
- if err != nil {
1251
- return err
1252
- }
1253
-
1254
- return cn .WithReader (ctx , c .opt .ReadTimeout , func (rd * proto.Reader ) error {
1255
- err := c .txPipelineReadQueued (rd , cmds , failedCmds )
1256
- if err != nil {
1257
- moved , ask , addr := isMovedError (err )
1258
- if moved || ask {
1259
- return c .cmdsMoved (cmds , moved , ask , addr , failedCmds )
1260
- }
1261
- return err
1262
- }
1263
- return pipelineReadCmds (rd , cmds )
1264
- })
1265
- })
1260
+ err := c ._processTxPipelineNode (ctx , node , cmds , failedCmds )
1266
1261
if err == nil {
1267
1262
return
1268
1263
}
@@ -1296,11 +1291,42 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
1296
1291
return cmdsMap
1297
1292
}
1298
1293
1294
+ func (c * ClusterClient ) _processTxPipelineNode (
1295
+ ctx context.Context , node * clusterNode , cmds []Cmder , failedCmds * cmdsMap ,
1296
+ ) error {
1297
+ return node .Client .hooks .processTxPipeline (ctx , cmds , func (ctx context.Context , cmds []Cmder ) error {
1298
+ return node .Client .withConn (ctx , func (ctx context.Context , cn * pool.Conn ) error {
1299
+ err := cn .WithWriter (ctx , c .opt .WriteTimeout , func (wr * proto.Writer ) error {
1300
+ return writeCmds (wr , cmds )
1301
+ })
1302
+ if err != nil {
1303
+ return err
1304
+ }
1305
+
1306
+ return cn .WithReader (ctx , c .opt .ReadTimeout , func (rd * proto.Reader ) error {
1307
+ statusCmd := cmds [0 ].(* StatusCmd )
1308
+ // Trim multi and exec.
1309
+ cmds = cmds [1 : len (cmds )- 1 ]
1310
+
1311
+ err := c .txPipelineReadQueued (rd , statusCmd , cmds , failedCmds )
1312
+ if err != nil {
1313
+ moved , ask , addr := isMovedError (err )
1314
+ if moved || ask {
1315
+ return c .cmdsMoved (cmds , moved , ask , addr , failedCmds )
1316
+ }
1317
+ return err
1318
+ }
1319
+
1320
+ return pipelineReadCmds (rd , cmds )
1321
+ })
1322
+ })
1323
+ })
1324
+ }
1325
+
1299
1326
func (c * ClusterClient ) txPipelineReadQueued (
1300
- rd * proto.Reader , cmds []Cmder , failedCmds * cmdsMap ,
1327
+ rd * proto.Reader , statusCmd * StatusCmd , cmds []Cmder , failedCmds * cmdsMap ,
1301
1328
) error {
1302
1329
// Parse queued replies.
1303
- var statusCmd StatusCmd
1304
1330
if err := statusCmd .readReply (rd ); err != nil {
1305
1331
return err
1306
1332
}
@@ -1352,7 +1378,7 @@ func (c *ClusterClient) cmdsMoved(
1352
1378
1353
1379
if ask {
1354
1380
for _ , cmd := range cmds {
1355
- failedCmds .Add (node , NewCmd ("ASKING " ), cmd )
1381
+ failedCmds .Add (node , NewCmd ("asking " ), cmd )
1356
1382
}
1357
1383
return nil
1358
1384
}
0 commit comments