@@ -31,6 +31,9 @@ use std::time::Duration;
3131#[ cfg( feature = "tokio-comp" ) ]
3232use tokio_util:: codec:: Decoder ;
3333
34+ // Default connection timeout in ms
35+ const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT : Duration = Duration :: from_millis ( 250 ) ;
36+
3437// Senders which the result of a single request are sent through
3538type PipelineOutput = oneshot:: Sender < RedisResult < Value > > ;
3639
@@ -76,7 +79,7 @@ struct PipelineMessage<S> {
7679/// interface provided by `Pipeline` an easy interface of request to response, hiding the `Stream`
7780/// and `Sink`.
7881#[ derive( Clone ) ]
79- struct Pipeline < SinkItem > {
82+ pub ( crate ) struct Pipeline < SinkItem > {
8083 sender : mpsc:: Sender < PipelineMessage < SinkItem > > ,
8184 push_manager : Arc < ArcSwap < PushManager > > ,
8285 is_stream_closed : Arc < AtomicBool > ,
@@ -399,6 +402,7 @@ where
399402 self . push_manager . store ( Arc :: new ( push_manager) ) ;
400403 }
401404
405+ /// Checks if the pipeline is closed.
402406 pub fn is_closed ( & self ) -> bool {
403407 self . is_stream_closed . load ( Ordering :: Relaxed )
404408 }
@@ -413,6 +417,7 @@ pub struct MultiplexedConnection {
413417 response_timeout : Duration ,
414418 protocol : ProtocolVersion ,
415419 push_manager : PushManager ,
420+ password : Option < String > ,
416421}
417422
418423impl Debug for MultiplexedConnection {
@@ -455,35 +460,28 @@ impl MultiplexedConnection {
455460 where
456461 C : Unpin + AsyncRead + AsyncWrite + Send + ' static ,
457462 {
458- fn boxed (
459- f : impl Future < Output = ( ) > + Send + ' static ,
460- ) -> Pin < Box < dyn Future < Output = ( ) > + Send > > {
461- Box :: pin ( f)
462- }
463-
464- #[ cfg( not( feature = "tokio-comp" ) ) ]
465- compile_error ! ( "tokio-comp feature is required for aio feature" ) ;
466-
467- let redis_connection_info = & connection_info. redis ;
468463 let codec = ValueCodec :: default ( )
469464 . framed ( stream)
470465 . and_then ( |msg| async move { msg } ) ;
471466 let ( mut pipeline, driver) =
472467 Pipeline :: new ( codec, glide_connection_options. disconnect_notifier ) ;
473- let driver = boxed ( driver) ;
468+ let driver = Box :: pin ( driver) ;
474469 let pm = PushManager :: default ( ) ;
475470 if let Some ( sender) = glide_connection_options. push_sender {
476471 pm. replace_sender ( sender) ;
477472 }
478473
479474 pipeline. set_push_manager ( pm. clone ( ) ) . await ;
480- let mut con = MultiplexedConnection {
481- pipeline,
482- db : connection_info. redis . db ,
483- response_timeout,
484- push_manager : pm,
485- protocol : redis_connection_info. protocol ,
486- } ;
475+
476+ let mut con = MultiplexedConnection :: builder ( pipeline)
477+ . with_db ( connection_info. redis . db )
478+ . with_response_timeout ( response_timeout)
479+ . with_push_manager ( pm)
480+ . with_protocol ( connection_info. redis . protocol )
481+ . with_password ( connection_info. redis . password . clone ( ) )
482+ . build ( )
483+ . await ?;
484+
487485 let driver = {
488486 let auth = setup_connection ( & connection_info. redis , & mut con) ;
489487
@@ -502,6 +500,7 @@ impl MultiplexedConnection {
502500 }
503501 }
504502 } ;
503+
505504 Ok ( ( con, driver) )
506505 }
507506
@@ -575,6 +574,97 @@ impl MultiplexedConnection {
575574 self . push_manager = push_manager. clone ( ) ;
576575 self . pipeline . set_push_manager ( push_manager) . await ;
577576 }
577+
578+ /// Replace the password used to authenticate with the server.
579+ /// If `None` is provided, the password will be removed.
580+ pub async fn update_connection_password (
581+ & mut self ,
582+ password : Option < String > ,
583+ ) -> RedisResult < Value > {
584+ self . password = password;
585+ Ok ( Value :: Okay )
586+ }
587+
588+ /// Creates a new `MultiplexedConnectionBuilder` for constructing a `MultiplexedConnection`.
589+ pub ( crate ) fn builder ( pipeline : Pipeline < Vec < u8 > > ) -> MultiplexedConnectionBuilder {
590+ MultiplexedConnectionBuilder :: new ( pipeline)
591+ }
592+ }
593+
594+ /// A builder for creating `MultiplexedConnection` instances.
595+ pub struct MultiplexedConnectionBuilder {
596+ pipeline : Pipeline < Vec < u8 > > ,
597+ db : Option < i64 > ,
598+ response_timeout : Option < Duration > ,
599+ push_manager : Option < PushManager > ,
600+ protocol : Option < ProtocolVersion > ,
601+ password : Option < String > ,
602+ }
603+
604+ impl MultiplexedConnectionBuilder {
605+ /// Creates a new builder with the required pipeline
606+ pub ( crate ) fn new ( pipeline : Pipeline < Vec < u8 > > ) -> Self {
607+ Self {
608+ pipeline,
609+ db : None ,
610+ response_timeout : None ,
611+ push_manager : None ,
612+ protocol : None ,
613+ password : None ,
614+ }
615+ }
616+
617+ /// Sets the database index for the `MultiplexedConnectionBuilder`.
618+ pub fn with_db ( mut self , db : i64 ) -> Self {
619+ self . db = Some ( db) ;
620+ self
621+ }
622+
623+ /// Sets the response timeout for the `MultiplexedConnectionBuilder`.
624+ pub fn with_response_timeout ( mut self , timeout : Duration ) -> Self {
625+ self . response_timeout = Some ( timeout) ;
626+ self
627+ }
628+
629+ /// Sets the push manager for the `MultiplexedConnectionBuilder`.
630+ pub fn with_push_manager ( mut self , push_manager : PushManager ) -> Self {
631+ self . push_manager = Some ( push_manager) ;
632+ self
633+ }
634+
635+ /// Sets the protocol version for the `MultiplexedConnectionBuilder`.
636+ pub fn with_protocol ( mut self , protocol : ProtocolVersion ) -> Self {
637+ self . protocol = Some ( protocol) ;
638+ self
639+ }
640+
641+ /// Sets the password for the `MultiplexedConnectionBuilder`.
642+ pub fn with_password ( mut self , password : Option < String > ) -> Self {
643+ self . password = password;
644+ self
645+ }
646+
647+ /// Builds and returns a new `MultiplexedConnection` instance using the configured settings.
648+ pub async fn build ( self ) -> RedisResult < MultiplexedConnection > {
649+ let db = self . db . unwrap_or_default ( ) ;
650+ let response_timeout = self
651+ . response_timeout
652+ . unwrap_or ( DEFAULT_CONNECTION_ATTEMPT_TIMEOUT ) ;
653+ let push_manager = self . push_manager . unwrap_or_default ( ) ;
654+ let protocol = self . protocol . unwrap_or_default ( ) ;
655+ let password = self . password ;
656+
657+ let con = MultiplexedConnection {
658+ pipeline : self . pipeline ,
659+ db,
660+ response_timeout,
661+ push_manager,
662+ protocol,
663+ password,
664+ } ;
665+
666+ Ok ( con)
667+ }
578668}
579669
580670impl ConnectionLike for MultiplexedConnection {
0 commit comments