@@ -9,6 +9,7 @@ use std::time::Duration;
99
1010use fred:: clients:: Client ;
1111use fred:: clients:: Pipeline ;
12+ use fred:: clients:: Replicas ;
1213use fred:: interfaces:: EventInterface ;
1314#[ cfg( test) ]
1415use fred:: mocks:: Mocks ;
@@ -588,6 +589,10 @@ impl RedisCacheStorage {
588589 self . inner . next ( ) . clone ( )
589590 }
590591
592+ fn replica_client ( & self ) -> Replicas < Client > {
593+ self . client ( ) . replicas ( )
594+ }
595+
591596 pub ( crate ) fn pipeline ( & self ) -> Pipeline < Client > {
592597 self . inner . next ( ) . pipeline ( )
593598 }
@@ -617,28 +622,26 @@ impl RedisCacheStorage {
617622 options : Options ,
618623 ) -> Result < RedisValue < V > , RedisError > {
619624 let key = self . make_key ( key) ;
620- if self . reset_ttl
621- && let Some ( ttl) = self . ttl
622- {
623- let pipeline = self . pipeline ( ) . with_options ( & options) ;
624- let _: ( ) = pipeline
625- . get ( & key)
626- . await
627- . inspect_err ( |e| self . record_error ( e) ) ?;
628- let _: ( ) = pipeline
629- . expire ( & key, ttl. as_secs ( ) as i64 , None )
630- . await
631- . inspect_err ( |e| self . record_error ( e) ) ?;
632-
633- let ( value, _timeout_set) : ( RedisValue < V > , bool ) =
634- pipeline. all ( ) . await . inspect_err ( |e| self . record_error ( e) ) ?;
635- Ok ( value)
636- } else if self . is_cluster {
637- let client = self . client ( ) . replicas ( ) . with_options ( & options) ;
638- client. get ( key) . await . inspect_err ( |e| self . record_error ( e) )
639- } else {
640- let client = self . client ( ) . with_options ( & options) ;
641- client. get ( key) . await . inspect_err ( |e| self . record_error ( e) )
625+ match self . ttl {
626+ Some ( ttl) if self . reset_ttl => {
627+ let pipeline = self . pipeline ( ) . with_options ( & options) ;
628+ let _: ( ) = pipeline
629+ . get ( & key)
630+ . await
631+ . inspect_err ( |e| self . record_error ( e) ) ?;
632+ let _: ( ) = pipeline
633+ . expire ( & key, ttl. as_secs ( ) as i64 , None )
634+ . await
635+ . inspect_err ( |e| self . record_error ( e) ) ?;
636+
637+ let ( value, _timeout_set) : ( RedisValue < V > , bool ) =
638+ pipeline. all ( ) . await . inspect_err ( |e| self . record_error ( e) ) ?;
639+ Ok ( value)
640+ }
641+ _ => {
642+ let client = self . replica_client ( ) . with_options ( & options) ;
643+ client. get ( key) . await . inspect_err ( |e| self . record_error ( e) )
644+ }
642645 }
643646 }
644647
@@ -661,12 +664,11 @@ impl RedisCacheStorage {
661664 // - https://redis.io/docs/latest/commands/mget/
662665
663666 tracing:: trace!( "getting multiple values from redis: {:?}" , keys) ;
664- let client = self . client ( ) ;
667+ let client = self . replica_client ( ) . with_options ( & options ) ;
665668
666669 if keys. len ( ) == 1 {
667670 let key = self . make_key ( keys. swap_remove ( 0 ) ) ;
668671 let res = client
669- . with_options ( & options)
670672 . get ( key)
671673 . await
672674 . inspect_err ( |e| self . record_error ( e) )
@@ -687,8 +689,6 @@ impl RedisCacheStorage {
687689 }
688690
689691 // then we query all the key groups at the same time
690- // use `client.replicas()` since we're in a cluster and can take advantage of read-replicas
691- let client = client. replicas ( ) . with_options ( & options) ;
692692 let mut tasks = Vec :: new ( ) ;
693693 for ( _shard, ( indexes, keys) ) in h {
694694 let client = client. clone ( ) ;
@@ -722,7 +722,6 @@ impl RedisCacheStorage {
722722 . map ( |k| self . make_key ( k) )
723723 . collect :: < Vec < _ > > ( ) ;
724724 client
725- . with_options ( & options)
726725 . mget ( keys)
727726 . await
728727 . inspect_err ( |e| self . record_error ( e) )
0 commit comments