Skip to content

Commit 6b87a58

Browse files
committed
fix(ring): propagate pipeline errors
1 parent 0decfdc commit 6b87a58

File tree

2 files changed

+41
-4
lines changed

2 files changed

+41
-4
lines changed

ring.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/cespare/xxhash/v2"
1515
"github.com/dgryski/go-rendezvous" //nolint
16+
1617
"github.com/redis/go-redis/v9/auth"
1718

1819
"github.com/redis/go-redis/v9/internal"
@@ -797,29 +798,48 @@ func (c *Ring) generalProcessPipeline(
797798
cmdsMap[hash] = append(cmdsMap[hash], cmd)
798799
}
799800

800-
var wg sync.WaitGroup
801+
var (
802+
wg sync.WaitGroup
803+
mu sync.Mutex
804+
firstErr error
805+
)
801806
for hash, cmds := range cmdsMap {
802807
wg.Add(1)
803808
go func(hash string, cmds []Cmder) {
804809
defer wg.Done()
805810

806-
// TODO: retry?
807811
shard, err := c.sharding.GetByName(hash)
808812
if err != nil {
809813
setCmdsErr(cmds, err)
814+
mu.Lock()
815+
if firstErr == nil {
816+
firstErr = err
817+
}
818+
mu.Unlock()
810819
return
811820
}
812821

813822
if tx {
814823
cmds = wrapMultiExec(ctx, cmds)
815-
_ = shard.Client.processTxPipelineHook(ctx, cmds)
824+
err = shard.Client.processTxPipelineHook(ctx, cmds)
816825
} else {
817-
_ = shard.Client.processPipelineHook(ctx, cmds)
826+
err = shard.Client.processPipelineHook(ctx, cmds)
827+
}
828+
if err != nil {
829+
setCmdsErr(cmds, err)
830+
mu.Lock()
831+
if firstErr == nil {
832+
firstErr = err
833+
}
834+
mu.Unlock()
818835
}
819836
}(hash, cmds)
820837
}
821838

822839
wg.Wait()
840+
if firstErr != nil {
841+
return firstErr
842+
}
823843
return cmdsFirstErr(cmds)
824844
}
825845

ring_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -867,3 +867,20 @@ var _ = Describe("Ring GetShardClients and GetShardClientForKey", func() {
867867
Expect(len(shardMap)).To(BeNumerically("<=", 2)) // At most 2 shards (our setup)
868868
})
869869
})
870+
871+
var _ = Describe("unreachable ring shard", func() {
872+
It("pipeline returns dial error", func() {
873+
ring := redis.NewRing(&redis.RingOptions{
874+
Addrs: map[string]string{"shard1": "10.255.255.1:6379"},
875+
DialTimeout: 100 * time.Millisecond,
876+
HeartbeatFrequency: time.Hour,
877+
})
878+
defer ring.Close()
879+
880+
_, err := ring.Pipelined(ctx, func(pipe redis.Pipeliner) error {
881+
pipe.Ping(ctx)
882+
return nil
883+
})
884+
Expect(err).To(HaveOccurred())
885+
})
886+
})

0 commit comments

Comments
 (0)