Skip to content

Commit eb5f23c

Browse files
authored
Merge branch 'master' into ndyakov/CAE-1088-resp3-notification-handlers
2 parents 3e7c7ab + 8f5469a commit eb5f23c

File tree

5 files changed

+47
-8
lines changed

5 files changed

+47
-8
lines changed

.github/release-drafter-config.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ categories:
3636
change-template: '- $TITLE (#$NUMBER)'
3737
exclude-labels:
3838
- 'skip-changelog'
39+
exclude-contributors:
40+
- 'dependabot'
3941
template: |
4042
# Changes
4143

osscluster.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1749,10 +1749,16 @@ func (c *ClusterClient) txPipelineReadQueued(
17491749
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
17501750
}
17511751
err := statusCmd.readReply(rd)
1752-
if err == nil || c.checkMovedErr(ctx, cmd, err, failedCmds) || isRedisError(err) {
1753-
continue
1752+
if err != nil {
1753+
if c.checkMovedErr(ctx, cmd, err, failedCmds) {
1754+
// will be processed later
1755+
continue
1756+
}
1757+
cmd.SetErr(err)
1758+
if !isRedisError(err) {
1759+
return err
1760+
}
17541761
}
1755-
return err
17561762
}
17571763

17581764
// To be sure there are no buffered push notifications, we process them before reading the reply

pipeline.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,12 @@ type Pipeliner interface {
3030
// If a certain Redis command is not yet supported, you can use Do to execute it.
3131
Do(ctx context.Context, args ...interface{}) *Cmd
3232

33-
// Process puts the commands to be executed into the pipeline buffer.
33+
// Process queues the cmd for later execution.
3434
Process(ctx context.Context, cmd Cmder) error
3535

36+
// BatchProcess adds multiple commands to be executed into the pipeline buffer.
37+
BatchProcess(ctx context.Context, cmd ...Cmder) error
38+
3639
// Discard discards all commands in the pipeline buffer that have not yet been executed.
3740
Discard()
3841

@@ -79,7 +82,12 @@ func (c *Pipeline) Do(ctx context.Context, args ...interface{}) *Cmd {
7982

8083
// Process queues the cmd for later execution.
8184
func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error {
82-
c.cmds = append(c.cmds, cmd)
85+
return c.BatchProcess(ctx, cmd)
86+
}
87+
88+
// BatchProcess queues multiple cmds for later execution.
89+
func (c *Pipeline) BatchProcess(ctx context.Context, cmd ...Cmder) error {
90+
c.cmds = append(c.cmds, cmd...)
8391
return nil
8492
}
8593

pipeline_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,25 @@ var _ = Describe("pipelining", func() {
114114
err := pipe.Do(ctx).Err()
115115
Expect(err).To(Equal(errors.New("redis: please enter the command to be executed")))
116116
})
117+
118+
It("should process", func() {
119+
err := pipe.Process(ctx, redis.NewCmd(ctx, "asking"))
120+
Expect(err).To(BeNil())
121+
Expect(pipe.Cmds()).To(HaveLen(1))
122+
})
123+
124+
It("should batchProcess", func() {
125+
err := pipe.BatchProcess(ctx, redis.NewCmd(ctx, "asking"))
126+
Expect(err).To(BeNil())
127+
Expect(pipe.Cmds()).To(HaveLen(1))
128+
129+
pipe.Discard()
130+
Expect(pipe.Cmds()).To(HaveLen(0))
131+
132+
err = pipe.BatchProcess(ctx, redis.NewCmd(ctx, "asking"), redis.NewCmd(ctx, "set", "key", "value"))
133+
Expect(err).To(BeNil())
134+
Expect(pipe.Cmds()).To(HaveLen(2))
135+
})
117136
}
118137

119138
Describe("Pipeline", func() {

redis.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,7 @@ func (c *baseClient) generalProcessPipeline(
764764
return err
765765
})
766766
if lastErr == nil || !canRetry || !shouldRetry(lastErr, true) {
767+
setCmdsErr(cmds, lastErr)
767768
return lastErr
768769
}
769770
}
@@ -859,13 +860,16 @@ func (c *baseClient) txPipelineReadQueued(ctx context.Context, cn *pool.Conn, rd
859860
}
860861

861862
// Parse +QUEUED.
862-
for range cmds {
863+
for _, cmd := range cmds {
863864
// To be sure there are no buffered push notifications, we process them before reading the reply
864865
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
865866
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
866867
}
867-
if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) {
868-
return err
868+
if err := statusCmd.readReply(rd); err != nil {
869+
cmd.SetErr(err)
870+
if !isRedisError(err) {
871+
return err
872+
}
869873
}
870874
}
871875

0 commit comments

Comments
 (0)