Skip to content

Commit a4bad76

Browse files
authored
Merge pull request #1234 from go-redis/fix/tx-pipeline
Change Tx.Pipeline and Tx.TxPipeline meaning
2 parents 08dad1e + dd4ef4e commit a4bad76

File tree

8 files changed

+36
-30
lines changed

8 files changed

+36
-30
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## v7 WIP
44

5+
- Tx.Pipeline now returns a non-transactional pipeline. Use Tx.TxPipeline for a transactional pipeline.
56
- WrapProcess is replaced with more convenient AddHook that has access to context.Context.
67
- WithContext now can not be used to create a shallow copy of the client.
78
- New methods ProcessContext, DoContext, and ExecContext.

cluster_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ var _ = Describe("ClusterClient", func() {
360360
return err
361361
}
362362

363-
_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
363+
_, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
364364
pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
365365
return nil
366366
})
@@ -1009,7 +1009,7 @@ var _ = Describe("ClusterClient timeout", func() {
10091009

10101010
It("Tx Pipeline timeouts", func() {
10111011
err := client.Watch(func(tx *redis.Tx) error {
1012-
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
1012+
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
10131013
pipe.Ping()
10141014
return nil
10151015
})

example_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ func ExampleClient_Watch() {
314314
n++
315315

316316
// runs only if the watched keys remain unchanged
317-
_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
317+
_, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
318318
// pipe handles the error case
319319
pipe.Set(key, n, 0)
320320
return nil

race_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ var _ = Describe("races", func() {
194194
num, err := strconv.ParseInt(val, 10, 64)
195195
Expect(err).NotTo(HaveOccurred())
196196

197-
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
197+
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
198198
pipe.Set("key", strconv.FormatInt(num+1, 10), 0)
199199
return nil
200200
})

redis_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ var _ = Describe("Client", func() {
7979

8080
It("should close Tx without closing the client", func() {
8181
err := client.Watch(func(tx *redis.Tx) error {
82-
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
82+
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
8383
pipe.Ping()
8484
return nil
8585
})
@@ -286,7 +286,7 @@ var _ = Describe("Client timeout", func() {
286286

287287
It("Tx Pipeline timeouts", func() {
288288
err := client.Watch(func(tx *redis.Tx) error {
289-
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
289+
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
290290
pipe.Ping()
291291
return nil
292292
})

ring_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ var _ = Describe("Ring watch", func() {
253253
return err
254254
}
255255

256-
_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
256+
_, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
257257
pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
258258
return nil
259259
})
@@ -285,7 +285,7 @@ var _ = Describe("Ring watch", func() {
285285

286286
It("should discard", func() {
287287
err := ring.Watch(func(tx *redis.Tx) error {
288-
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
288+
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
289289
pipe.Set("key1", "hello1", 0)
290290
pipe.Discard()
291291
pipe.Set("key2", "hello2", 0)
@@ -308,7 +308,7 @@ var _ = Describe("Ring watch", func() {
308308

309309
It("returns no error when there are no commands", func() {
310310
err := ring.Watch(func(tx *redis.Tx) error {
311-
_, err := tx.Pipelined(func(redis.Pipeliner) error { return nil })
311+
_, err := tx.TxPipelined(func(redis.Pipeliner) error { return nil })
312312
return err
313313
}, "key")
314314
Expect(err).NotTo(HaveOccurred())
@@ -322,7 +322,7 @@ var _ = Describe("Ring watch", func() {
322322
const N = 20000
323323

324324
err := ring.Watch(func(tx *redis.Tx) error {
325-
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
325+
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
326326
for i := 0; i < N; i++ {
327327
pipe.Incr("key")
328328
}
@@ -358,7 +358,7 @@ var _ = Describe("Ring watch", func() {
358358
num, err := strconv.ParseInt(val, 10, 64)
359359
Expect(err).NotTo(HaveOccurred())
360360

361-
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
361+
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
362362
pipe.Set("key", strconv.FormatInt(num+1, 10), 0)
363363
return nil
364364
})
@@ -380,7 +380,7 @@ var _ = Describe("Ring watch", func() {
380380

381381
It("should close Tx without closing the client", func() {
382382
err := ring.Watch(func(tx *redis.Tx) error {
383-
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
383+
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
384384
pipe.Ping()
385385
return nil
386386
})
@@ -396,7 +396,7 @@ var _ = Describe("Ring watch", func() {
396396
var ping *redis.StatusCmd
397397

398398
err := ring.Watch(func(tx *redis.Tx) error {
399-
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
399+
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
400400
ping = pipe.Ping()
401401
return nil
402402
})
@@ -443,7 +443,7 @@ var _ = Describe("Ring Tx timeout", func() {
443443

444444
It("Tx Pipeline timeouts", func() {
445445
err := ring.Watch(func(tx *redis.Tx) error {
446-
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
446+
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
447447
pipe.Ping()
448448
return nil
449449
})

tx.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -116,36 +116,41 @@ func (c *Tx) Unwatch(keys ...string) *StatusCmd {
116116
return cmd
117117
}
118118

119-
// Pipeline creates a new pipeline. It is more convenient to use Pipelined.
120119
func (c *Tx) Pipeline() Pipeliner {
121120
pipe := Pipeline{
122121
ctx: c.ctx,
123122
exec: func(ctx context.Context, cmds []Cmder) error {
124-
return c.hooks.processPipeline(ctx, cmds, c.baseClient.processTxPipeline)
123+
return c.hooks.processPipeline(ctx, cmds, c.baseClient.processPipeline)
125124
},
126125
}
127126
pipe.init()
128127
return &pipe
129128
}
130129

131-
// Pipelined executes commands queued in the fn in a transaction.
130+
func (c *Tx) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
131+
return c.Pipeline().Pipelined(fn)
132+
}
133+
134+
// TxPipelined executes commands queued in the fn in a transaction.
132135
//
133136
// When using WATCH, EXEC will execute commands only if the watched keys
134137
// were not modified, allowing for a check-and-set mechanism.
135138
//
136139
// Exec always returns list of commands. If transaction fails
137140
// TxFailedErr is returned. Otherwise Exec returns an error of the first
138141
// failed command or nil.
139-
func (c *Tx) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
140-
return c.Pipeline().Pipelined(fn)
141-
}
142-
143-
// TxPipelined is an alias for Pipelined.
144142
func (c *Tx) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
145-
return c.Pipelined(fn)
143+
return c.TxPipeline().Pipelined(fn)
146144
}
147145

148-
// TxPipeline is an alias for Pipeline.
146+
// TxPipeline creates a new pipeline. Usually it is more convenient to use TxPipelined.
149147
func (c *Tx) TxPipeline() Pipeliner {
150-
return c.Pipeline()
148+
pipe := Pipeline{
149+
ctx: c.ctx,
150+
exec: func(ctx context.Context, cmds []Cmder) error {
151+
return c.hooks.processPipeline(ctx, cmds, c.baseClient.processTxPipeline)
152+
},
153+
}
154+
pipe.init()
155+
return &pipe
151156
}

tx_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ var _ = Describe("Tx", func() {
3434
return err
3535
}
3636

37-
_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
37+
_, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
3838
pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
3939
return nil
4040
})
@@ -66,7 +66,7 @@ var _ = Describe("Tx", func() {
6666

6767
It("should discard", func() {
6868
err := client.Watch(func(tx *redis.Tx) error {
69-
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
69+
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
7070
pipe.Set("key1", "hello1", 0)
7171
pipe.Discard()
7272
pipe.Set("key2", "hello2", 0)
@@ -89,7 +89,7 @@ var _ = Describe("Tx", func() {
8989

9090
It("returns no error when there are no commands", func() {
9191
err := client.Watch(func(tx *redis.Tx) error {
92-
_, err := tx.Pipelined(func(redis.Pipeliner) error { return nil })
92+
_, err := tx.TxPipelined(func(redis.Pipeliner) error { return nil })
9393
return err
9494
})
9595
Expect(err).NotTo(HaveOccurred())
@@ -103,7 +103,7 @@ var _ = Describe("Tx", func() {
103103
const N = 20000
104104

105105
err := client.Watch(func(tx *redis.Tx) error {
106-
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
106+
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
107107
for i := 0; i < N; i++ {
108108
pipe.Incr("key")
109109
}
@@ -133,7 +133,7 @@ var _ = Describe("Tx", func() {
133133

134134
do := func() error {
135135
err := client.Watch(func(tx *redis.Tx) error {
136-
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
136+
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
137137
pipe.Ping()
138138
return nil
139139
})

0 commit comments

Comments
 (0)