@@ -473,8 +473,7 @@ var _ = Describe("ClusterClient", func() {
473
473
Describe ("pipelining" , func () {
474
474
var pipe * redis.Pipeline
475
475
476
- assertPipeline := func () {
477
- keys := []string {"A" , "B" , "C" , "D" , "E" , "F" , "G" }
476
+ assertPipeline := func (keys []string ) {
478
477
479
478
It ("follows redirects" , func () {
480
479
if ! failover {
@@ -493,13 +492,12 @@ var _ = Describe("ClusterClient", func() {
493
492
Expect (err ).NotTo (HaveOccurred ())
494
493
Expect (cmds ).To (HaveLen (14 ))
495
494
496
- _ = client .ForEachShard (ctx , func (ctx context.Context , node * redis.Client ) error {
497
- defer GinkgoRecover ()
498
- Eventually (func () int64 {
499
- return node .DBSize (ctx ).Val ()
500
- }, 30 * time .Second ).ShouldNot (BeZero ())
501
- return nil
502
- })
495
+ // Check that all keys are set.
496
+ for _ , key := range keys {
497
+ Eventually (func () string {
498
+ return client .Get (ctx , key ).Val ()
499
+ }, 30 * time .Second ).Should (Equal (key + "_value" ))
500
+ }
503
501
504
502
if ! failover {
505
503
for _ , key := range keys {
@@ -528,14 +526,14 @@ var _ = Describe("ClusterClient", func() {
528
526
})
529
527
530
528
It ("works with missing keys" , func () {
531
- pipe .Set (ctx , "A" , "A_value" , 0 )
532
- pipe .Set (ctx , "C" , "C_value" , 0 )
529
+ pipe .Set (ctx , "A{s} " , "A_value" , 0 )
530
+ pipe .Set (ctx , "C{s} " , "C_value" , 0 )
533
531
_ , err := pipe .Exec (ctx )
534
532
Expect (err ).NotTo (HaveOccurred ())
535
533
536
- a := pipe .Get (ctx , "A" )
537
- b := pipe .Get (ctx , "B" )
538
- c := pipe .Get (ctx , "C" )
534
+ a := pipe .Get (ctx , "A{s} " )
535
+ b := pipe .Get (ctx , "B{s} " )
536
+ c := pipe .Get (ctx , "C{s} " )
539
537
cmds , err := pipe .Exec (ctx )
540
538
Expect (err ).To (Equal (redis .Nil ))
541
539
Expect (cmds ).To (HaveLen (3 ))
@@ -558,7 +556,8 @@ var _ = Describe("ClusterClient", func() {
558
556
559
557
AfterEach (func () {})
560
558
561
- assertPipeline ()
559
+ keys := []string {"A" , "B" , "C" , "D" , "E" , "F" , "G" }
560
+ assertPipeline (keys )
562
561
563
562
It ("doesn't fail node with context.Canceled error" , func () {
564
563
ctx , cancel := context .WithCancel (context .Background ())
@@ -601,7 +600,25 @@ var _ = Describe("ClusterClient", func() {
601
600
602
601
AfterEach (func () {})
603
602
604
- assertPipeline ()
603
+ // TxPipeline doesn't support cross slot commands.
604
+ // Use hashtag to force all keys to the same slot.
605
+ keys := []string {"A{s}" , "B{s}" , "C{s}" , "D{s}" , "E{s}" , "F{s}" , "G{s}" }
606
+ assertPipeline (keys )
607
+
608
+ // make sure CrossSlot error is returned
609
+ It ("returns CrossSlot error" , func () {
610
+ pipe .Set (ctx , "A{s}" , "A_value" , 0 )
611
+ pipe .Set (ctx , "B{t}" , "B_value" , 0 )
612
+ Expect (hashtag .Slot ("A{s}" )).NotTo (Equal (hashtag .Slot ("B{t}" )))
613
+ _ , err := pipe .Exec (ctx )
614
+ Expect (err ).To (MatchError (redis .ErrCrossSlot ))
615
+ })
616
+
617
+ // doesn't fail when no commands are queued
618
+ It ("returns no error when there are no commands" , func () {
619
+ _ , err := pipe .Exec (ctx )
620
+ Expect (err ).NotTo (HaveOccurred ())
621
+ })
605
622
})
606
623
})
607
624
0 commit comments