File tree Expand file tree Collapse file tree 2 files changed +41
-4
lines changed Expand file tree Collapse file tree 2 files changed +41
-4
lines changed Original file line number Diff line number Diff line change @@ -13,6 +13,7 @@ import (
13
13
14
14
"github.com/cespare/xxhash/v2"
15
15
"github.com/dgryski/go-rendezvous" //nolint
16
+
16
17
"github.com/redis/go-redis/v9/auth"
17
18
18
19
"github.com/redis/go-redis/v9/internal"
@@ -797,29 +798,48 @@ func (c *Ring) generalProcessPipeline(
797
798
cmdsMap [hash ] = append (cmdsMap [hash ], cmd )
798
799
}
799
800
800
- var wg sync.WaitGroup
801
+ var (
802
+ wg sync.WaitGroup
803
+ mu sync.Mutex
804
+ firstErr error
805
+ )
801
806
for hash , cmds := range cmdsMap {
802
807
wg .Add (1 )
803
808
go func (hash string , cmds []Cmder ) {
804
809
defer wg .Done ()
805
810
806
- // TODO: retry?
807
811
shard , err := c .sharding .GetByName (hash )
808
812
if err != nil {
809
813
setCmdsErr (cmds , err )
814
+ mu .Lock ()
815
+ if firstErr == nil {
816
+ firstErr = err
817
+ }
818
+ mu .Unlock ()
810
819
return
811
820
}
812
821
813
822
if tx {
814
823
cmds = wrapMultiExec (ctx , cmds )
815
- _ = shard .Client .processTxPipelineHook (ctx , cmds )
824
+ err = shard .Client .processTxPipelineHook (ctx , cmds )
816
825
} else {
817
- _ = shard .Client .processPipelineHook (ctx , cmds )
826
+ err = shard .Client .processPipelineHook (ctx , cmds )
827
+ }
828
+ if err != nil {
829
+ setCmdsErr (cmds , err )
830
+ mu .Lock ()
831
+ if firstErr == nil {
832
+ firstErr = err
833
+ }
834
+ mu .Unlock ()
818
835
}
819
836
}(hash , cmds )
820
837
}
821
838
822
839
wg .Wait ()
840
+ if firstErr != nil {
841
+ return firstErr
842
+ }
823
843
return cmdsFirstErr (cmds )
824
844
}
825
845
Original file line number Diff line number Diff line change @@ -867,3 +867,20 @@ var _ = Describe("Ring GetShardClients and GetShardClientForKey", func() {
867
867
Expect (len (shardMap )).To (BeNumerically ("<=" , 2 )) // At most 2 shards (our setup)
868
868
})
869
869
})
870
+
871
+ var _ = Describe ("unreachable ring shard" , func () {
872
+ It ("pipeline returns dial error" , func () {
873
+ ring := redis .NewRing (& redis.RingOptions {
874
+ Addrs : map [string ]string {"shard1" : "10.255.255.1:6379" },
875
+ DialTimeout : 100 * time .Millisecond ,
876
+ HeartbeatFrequency : time .Hour ,
877
+ })
878
+ defer ring .Close ()
879
+
880
+ _ , err := ring .Pipelined (ctx , func (pipe redis.Pipeliner ) error {
881
+ pipe .Ping (ctx )
882
+ return nil
883
+ })
884
+ Expect (err ).To (HaveOccurred ())
885
+ })
886
+ })
You can’t perform that action at this time.
0 commit comments