@@ -9,7 +9,6 @@ use std::time::Duration;
99
1010use fred:: clients:: Client ;
1111use fred:: clients:: Pipeline ;
12- use fred:: clients:: Replicas ;
1312use fred:: interfaces:: EventInterface ;
1413#[ cfg( test) ]
1514use fred:: mocks:: Mocks ;
@@ -589,10 +588,6 @@ impl RedisCacheStorage {
589588 self . inner . next ( ) . clone ( )
590589 }
591590
592- fn replica_client ( & self ) -> Replicas < Client > {
593- self . client ( ) . replicas ( )
594- }
595-
596591 pub ( crate ) fn pipeline ( & self ) -> Pipeline < Client > {
597592 self . inner . next ( ) . pipeline ( )
598593 }
@@ -622,26 +617,28 @@ impl RedisCacheStorage {
622617 options : Options ,
623618 ) -> Result < RedisValue < V > , RedisError > {
624619 let key = self . make_key ( key) ;
625- match self . ttl {
626- Some ( ttl) if self . reset_ttl => {
627- let pipeline = self . pipeline ( ) . with_options ( & options) ;
628- let _: ( ) = pipeline
629- . get ( & key)
630- . await
631- . inspect_err ( |e| self . record_error ( e) ) ?;
632- let _: ( ) = pipeline
633- . expire ( & key, ttl. as_secs ( ) as i64 , None )
634- . await
635- . inspect_err ( |e| self . record_error ( e) ) ?;
636-
637- let ( value, _timeout_set) : ( RedisValue < V > , bool ) =
638- pipeline. all ( ) . await . inspect_err ( |e| self . record_error ( e) ) ?;
639- Ok ( value)
640- }
641- _ => {
642- let client = self . replica_client ( ) . with_options ( & options) ;
643- client. get ( key) . await . inspect_err ( |e| self . record_error ( e) )
644- }
620+ if self . reset_ttl
621+ && let Some ( ttl) = self . ttl
622+ {
623+ let pipeline = self . pipeline ( ) . with_options ( & options) ;
624+ let _: ( ) = pipeline
625+ . get ( & key)
626+ . await
627+ . inspect_err ( |e| self . record_error ( e) ) ?;
628+ let _: ( ) = pipeline
629+ . expire ( & key, ttl. as_secs ( ) as i64 , None )
630+ . await
631+ . inspect_err ( |e| self . record_error ( e) ) ?;
632+
633+ let ( value, _timeout_set) : ( RedisValue < V > , bool ) =
634+ pipeline. all ( ) . await . inspect_err ( |e| self . record_error ( e) ) ?;
635+ Ok ( value)
636+ } else if self . is_cluster {
637+ let client = self . client ( ) . replicas ( ) . with_options ( & options) ;
638+ client. get ( key) . await . inspect_err ( |e| self . record_error ( e) )
639+ } else {
640+ let client = self . client ( ) . with_options ( & options) ;
641+ client. get ( key) . await . inspect_err ( |e| self . record_error ( e) )
645642 }
646643 }
647644
@@ -664,11 +661,12 @@ impl RedisCacheStorage {
664661 // - https://redis.io/docs/latest/commands/mget/
665662
666663 tracing:: trace!( "getting multiple values from redis: {:?}" , keys) ;
667- let client = self . replica_client ( ) . with_options ( & options ) ;
664+ let client = self . client ( ) ;
668665
669666 if keys. len ( ) == 1 {
670667 let key = self . make_key ( keys. swap_remove ( 0 ) ) ;
671668 let res = client
669+ . with_options ( & options)
672670 . get ( key)
673671 . await
674672 . inspect_err ( |e| self . record_error ( e) )
@@ -689,6 +687,8 @@ impl RedisCacheStorage {
689687 }
690688
691689 // then we query all the key groups at the same time
690+ // use `client.replicas()` since we're in a cluster and can take advantage of read-replicas
691+ let client = client. replicas ( ) . with_options ( & options) ;
692692 let mut tasks = Vec :: new ( ) ;
693693 for ( _shard, ( indexes, keys) ) in h {
694694 let client = client. clone ( ) ;
@@ -722,6 +722,7 @@ impl RedisCacheStorage {
722722 . map ( |k| self . make_key ( k) )
723723 . collect :: < Vec < _ > > ( ) ;
724724 client
725+ . with_options ( & options)
725726 . mget ( keys)
726727 . await
727728 . inspect_err ( |e| self . record_error ( e) )
0 commit comments