Skip to content

Commit 113a18a

Browse files
cxljsndyakov
andauthored
fix: pipeline repeatedly sets the error (#3525)
* fix: pipeline repeatedly sets the error Signed-off-by: Xiaolong Chen <[email protected]> * add test Signed-off-by: Xiaolong Chen <[email protected]> * CI Signed-off-by: Xiaolong Chen <[email protected]> --------- Signed-off-by: Xiaolong Chen <[email protected]> Co-authored-by: Nedyalko Dyakov <[email protected]>
1 parent 286735b commit 113a18a

File tree

2 files changed

+38
-2
lines changed

2 files changed

+38
-2
lines changed

pipeline_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,39 @@ var _ = Describe("pipelining", func() {
6060
Expect(cmds).To(BeEmpty())
6161
})
6262

63+
It("pipeline: basic exec", func() {
64+
p := client.Pipeline()
65+
p.Get(ctx, "key")
66+
p.Set(ctx, "key", "value", 0)
67+
p.Get(ctx, "key")
68+
cmds, err := p.Exec(ctx)
69+
Expect(err).To(Equal(redis.Nil))
70+
Expect(cmds).To(HaveLen(3))
71+
Expect(cmds[0].Err()).To(Equal(redis.Nil))
72+
Expect(cmds[1].(*redis.StatusCmd).Val()).To(Equal("OK"))
73+
Expect(cmds[1].Err()).NotTo(HaveOccurred())
74+
Expect(cmds[2].(*redis.StringCmd).Val()).To(Equal("value"))
75+
Expect(cmds[2].Err()).NotTo(HaveOccurred())
76+
})
77+
78+
It("pipeline: exec pipeline when get conn failed", func() {
79+
p := client.Pipeline()
80+
p.Get(ctx, "key")
81+
p.Set(ctx, "key", "value", 0)
82+
p.Get(ctx, "key")
83+
84+
client.Close()
85+
86+
cmds, err := p.Exec(ctx)
87+
Expect(err).To(Equal(redis.ErrClosed))
88+
Expect(cmds).To(HaveLen(3))
89+
for _, cmd := range cmds {
90+
Expect(cmd.Err()).To(Equal(redis.ErrClosed))
91+
}
92+
93+
client = redis.NewClient(redisOptions())
94+
})
95+
6396
assertPipeline := func() {
6497
It("returns no errors when there are no commands", func() {
6598
_, err := pipe.Exec(ctx)

redis.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -768,7 +768,10 @@ func (c *baseClient) generalProcessPipeline(
768768
return err
769769
})
770770
if lastErr == nil || !canRetry || !shouldRetry(lastErr, true) {
771-
setCmdsErr(cmds, lastErr)
771+
// The error should be set here only when failing to obtain the conn.
772+
if !isRedisError(lastErr) {
773+
setCmdsErr(cmds, lastErr)
774+
}
772775
return lastErr
773776
}
774777
}
@@ -864,7 +867,7 @@ func (c *baseClient) txPipelineReadQueued(ctx context.Context, cn *pool.Conn, rd
864867
}
865868

866869
// Parse +QUEUED.
867-
for _, cmd := range cmds {
870+
for _, cmd := range cmds {
868871
// To be sure there are no buffered push notifications, we process them before reading the reply
869872
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
870873
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)

0 commit comments

Comments
 (0)