@@ -433,6 +433,7 @@ where
433433 config. backend_flush_size ,
434434 config. backend_low_flush_interval ,
435435 config. backend_high_flush_interval ,
436+ config. backend_timeout ,
436437 )
437438 . await ;
438439 match res {
@@ -463,6 +464,7 @@ async fn handle_conn<H, S>(
463464 backend_flush_size : NonZeroUsize ,
464465 backend_low_flush_interval : Duration ,
465466 backend_high_flush_interval : Duration ,
467+ backend_timeout : Duration ,
466468) -> Result < ( ) , HandleConnErr < H :: Task > >
467469where
468470 H : CmdTaskResultHandler ,
@@ -482,6 +484,10 @@ where
482484 batch_stats. clone ( ) ,
483485 ) ;
484486
487+ let mut timer = futures_timer:: Delay :: new ( backend_timeout) ;
488+ let mut response_received = false ;
489+ let mut task_empty = true ;
490+
485491 future:: poll_fn (
486492 |cx : & mut Context < ' _ > | -> Poll < Result < ( ) , HandleConnErr < H :: Task > > > {
487493 let retry_times_opt = match retry_state_opt. take ( ) {
@@ -580,6 +586,8 @@ where
580586 Poll :: Pending => break Ok ( ( ) ) ,
581587 } ;
582588
589+ response_received = true ;
590+
583591 let mut task = match tasks. pop_front ( ) {
584592 Some ( task) => task,
585593 None => {
@@ -598,6 +606,21 @@ where
598606 return Poll :: Ready ( Err ( ( err, retry_state) ) ) ;
599607 }
600608
609+ if let Poll :: Ready ( ( ) ) = Pin :: new ( & mut timer) . poll ( cx) {
610+ if !task_empty && !response_received {
611+ let err = BackendError :: Timeout ;
612+ error ! ( "backend read timeout" ) ;
613+ let failed_tasks = tasks. drain ( ..) . collect ( ) ;
614+ // For timeout we just don't retry as it will take a long time.
615+ let retry_state = handle_conn_err ( Some ( MAX_BACKEND_RETRY ) , failed_tasks, & err) ;
616+ return Poll :: Ready ( Err ( ( err, retry_state) ) ) ;
617+ }
618+
619+ timer. reset ( backend_timeout) ;
620+ task_empty = tasks. is_empty ( ) ;
621+ response_received = false ;
622+ }
623+
601624 Poll :: Pending
602625 } ,
603626 )
@@ -639,6 +662,7 @@ pub enum BackendError {
639662 InvalidAddress ,
640663 Canceled ,
641664 InvalidState ,
665+ Timeout ,
642666}
643667
644668impl BackendError {
@@ -650,6 +674,7 @@ impl BackendError {
650674 SenderBackendError :: InvalidAddress => Either :: Left ( BackendError :: InvalidAddress ) ,
651675 SenderBackendError :: Canceled => Either :: Left ( BackendError :: Canceled ) ,
652676 SenderBackendError :: InvalidState => Either :: Left ( BackendError :: InvalidState ) ,
677+ SenderBackendError :: Timeout => Either :: Left ( BackendError :: Timeout ) ,
653678 SenderBackendError :: Retry ( task) => Either :: Right ( RetryError :: new ( task) ) ,
654679 }
655680 }
@@ -682,6 +707,7 @@ pub enum SenderBackendError<T> {
682707 Canceled ,
683708 InvalidState ,
684709 Retry ( T ) ,
710+ Timeout ,
685711}
686712
687713impl < T > SenderBackendError < T > {
@@ -693,6 +719,7 @@ impl<T> SenderBackendError<T> {
693719 BackendError :: InvalidAddress => SenderBackendError :: InvalidAddress ,
694720 BackendError :: Canceled => SenderBackendError :: Canceled ,
695721 BackendError :: InvalidState => SenderBackendError :: InvalidState ,
722+ BackendError :: Timeout => SenderBackendError :: Timeout ,
696723 }
697724 }
698725
@@ -709,6 +736,7 @@ impl<T> SenderBackendError<T> {
709736 Self :: Canceled => SenderBackendError :: Canceled ,
710737 Self :: InvalidState => SenderBackendError :: InvalidState ,
711738 Self :: Retry ( task) => SenderBackendError :: Retry ( f ( task) ) ,
739+ Self :: Timeout => SenderBackendError :: Timeout ,
712740 }
713741 }
714742}
@@ -717,12 +745,13 @@ impl<T> fmt::Debug for SenderBackendError<T> {
717745 fn fmt ( & self , f : & mut fmt:: Formatter ) -> fmt:: Result {
718746 match self {
719747 Self :: Io ( io_err) => write ! ( f, "BackendError::Io({:?})" , io_err) ,
720- Self :: NodeNotFound => write ! ( f, "backendError ::NodeNotFound" ) ,
721- Self :: InvalidProtocol => write ! ( f, "backendError ::InvalidProtocol" ) ,
722- Self :: InvalidAddress => write ! ( f, "backendError ::InvalidAddress" ) ,
723- Self :: Canceled => write ! ( f, "backendError ::Canceled" ) ,
724- Self :: InvalidState => write ! ( f, "backendError ::InvalidState" ) ,
748+ Self :: NodeNotFound => write ! ( f, "BackendError ::NodeNotFound" ) ,
749+ Self :: InvalidProtocol => write ! ( f, "BackendError ::InvalidProtocol" ) ,
750+ Self :: InvalidAddress => write ! ( f, "BackendError ::InvalidAddress" ) ,
751+ Self :: Canceled => write ! ( f, "BackendError ::Canceled" ) ,
752+ Self :: InvalidState => write ! ( f, "BackendError ::InvalidState" ) ,
725753 Self :: Retry ( _) => write ! ( f, "BackendError::Retry" ) ,
754+ Self :: Timeout => write ! ( f, "BackendError::Timeout" ) ,
726755 }
727756 }
728757}
@@ -874,6 +903,7 @@ mod tests {
874903 NonZeroUsize :: new ( 1024 ) . unwrap ( ) ,
875904 Duration :: from_nanos ( 200_000 ) ,
876905 Duration :: from_nanos ( 400_000 ) ,
906+ Duration :: from_secs ( 10 ) ,
877907 )
878908 . await ;
879909
@@ -943,6 +973,7 @@ mod tests {
943973 NonZeroUsize :: new ( 1024 ) . unwrap ( ) ,
944974 Duration :: from_nanos ( 200_000 ) ,
945975 Duration :: from_nanos ( 400_000 ) ,
976+ Duration :: from_secs ( 10 ) ,
946977 )
947978 . await ;
948979
@@ -957,4 +988,50 @@ mod tests {
957988 }
958989 }
959990 }
991+
992+ #[ tokio:: test]
993+ async fn test_handle_conn_timeout ( ) {
994+ test_handle_conn_timeout_helper ( BatchStrategy :: Disabled ) . await ;
995+ test_handle_conn_timeout_helper ( BatchStrategy :: Fixed ) . await ;
996+ test_handle_conn_timeout_helper ( BatchStrategy :: Dynamic ) . await ;
997+ }
998+
999+ async fn test_handle_conn_timeout_helper ( strategy : BatchStrategy ) {
1000+ let ( sender, _dummy_receiver) = mpsc:: unbounded :: < RespVec > ( ) ;
1001+ let ( _dummy_sender, receiver) = mpsc:: unbounded :: < RespVec > ( ) ;
1002+ let writer: ConnSink < RespVec > = Box :: pin ( sender. sink_map_err ( |_| BackendError :: Canceled ) ) ;
1003+ let reader: ConnStream < RespVec > = Box :: pin ( receiver. map ( Ok ) ) ;
1004+
1005+ let ( mut task_sender, mut task_receiver) = mpsc:: unbounded ( ) ;
1006+ for i in 0 ..3 {
1007+ let i: usize = i;
1008+ task_sender
1009+ . send ( DummyCmdTask :: new ( RespVec :: Simple (
1010+ i. to_string ( ) . into_bytes ( ) ,
1011+ ) ) )
1012+ . await
1013+ . unwrap ( ) ;
1014+ }
1015+
1016+ let handler = Arc :: new ( CounterHandler :: new ( ) ) ;
1017+ let batch_stats = Arc :: new ( BatchStats :: default ( ) ) ;
1018+ let res = handle_conn (
1019+ writer,
1020+ reader,
1021+ & mut task_receiver,
1022+ handler. clone ( ) ,
1023+ None ,
1024+ batch_stats,
1025+ strategy,
1026+ NonZeroUsize :: new ( 1024 ) . unwrap ( ) ,
1027+ Duration :: from_nanos ( 200_000 ) ,
1028+ Duration :: from_nanos ( 400_000 ) ,
1029+ Duration :: from_secs ( 3 ) ,
1030+ )
1031+ . await ;
1032+
1033+ let ( err, _) = res. unwrap_err ( ) ;
1034+ assert ! ( matches!( err, BackendError :: Timeout ) ) ;
1035+ assert ! ( handler. get_replies( ) . is_empty( ) ) ;
1036+ }
9601037}
0 commit comments