11use super :: { ConnectionLike , Runtime } ;
22use crate :: aio:: setup_connection;
3+ use crate :: aio:: DisconnectNotifier ;
34use crate :: cmd:: Cmd ;
45#[ cfg( any( feature = "tokio-comp" , feature = "async-std-comp" ) ) ]
56use crate :: parser:: ValueCodec ;
@@ -23,6 +24,7 @@ use std::fmt;
2324use std:: fmt:: Debug ;
2425use std:: io;
2526use std:: pin:: Pin ;
27+ use std:: sync:: atomic:: { AtomicBool , Ordering } ;
2628use std:: sync:: Arc ;
2729use std:: task:: { self , Poll } ;
2830use std:: time:: Duration ;
@@ -73,19 +75,11 @@ struct PipelineMessage<S> {
7375/// items being output by the `Stream` (the number is specified at time of sending). With the
7476/// interface provided by `Pipeline` an easy interface of request to response, hiding the `Stream`
7577/// and `Sink`.
78+ #[ derive( Clone ) ]
7679struct Pipeline < SinkItem > {
7780 sender : mpsc:: Sender < PipelineMessage < SinkItem > > ,
78-
7981 push_manager : Arc < ArcSwap < PushManager > > ,
80- }
81-
82- impl < SinkItem > Clone for Pipeline < SinkItem > {
83- fn clone ( & self ) -> Self {
84- Pipeline {
85- sender : self . sender . clone ( ) ,
86- push_manager : self . push_manager . clone ( ) ,
87- }
88- }
82+ is_stream_closed : Arc < AtomicBool > ,
8983}
9084
9185impl < SinkItem > Debug for Pipeline < SinkItem >
@@ -104,14 +98,21 @@ pin_project! {
10498 in_flight: VecDeque <InFlight >,
10599 error: Option <RedisError >,
106100 push_manager: Arc <ArcSwap <PushManager >>,
101+ disconnect_notifier: Option <Box <dyn DisconnectNotifier >>,
102+ is_stream_closed: Arc <AtomicBool >,
107103 }
108104}
109105
110106impl < T > PipelineSink < T >
111107where
112108 T : Stream < Item = RedisResult < Value > > + ' static ,
113109{
114- fn new < SinkItem > ( sink_stream : T , push_manager : Arc < ArcSwap < PushManager > > ) -> Self
110+ fn new < SinkItem > (
111+ sink_stream : T ,
112+ push_manager : Arc < ArcSwap < PushManager > > ,
113+ disconnect_notifier : Option < Box < dyn DisconnectNotifier > > ,
114+ is_stream_closed : Arc < AtomicBool > ,
115+ ) -> Self
115116 where
116117 T : Sink < SinkItem , Error = RedisError > + Stream < Item = RedisResult < Value > > + ' static ,
117118 {
@@ -120,6 +121,8 @@ where
120121 in_flight : VecDeque :: new ( ) ,
121122 error : None ,
122123 push_manager,
124+ disconnect_notifier,
125+ is_stream_closed,
123126 }
124127 }
125128
@@ -130,7 +133,15 @@ where
130133 Some ( result) => result,
131134 // The redis response stream is not going to produce any more items so we `Err`
132135 // to break out of the `forward` combinator and stop handling requests
133- None => return Poll :: Ready ( Err ( ( ) ) ) ,
136+ None => {
137+ // this is the right place to notify about the passive TCP disconnect
138+ // In other places we cannot distinguish between the active destruction of MultiplexedConnection and passive disconnect
139+ if let Some ( disconnect_notifier) = self . as_mut ( ) . project ( ) . disconnect_notifier {
140+ disconnect_notifier. notify_disconnect ( ) ;
141+ }
142+ self . is_stream_closed . store ( true , Ordering :: Relaxed ) ;
143+ return Poll :: Ready ( Err ( ( ) ) ) ;
144+ }
134145 } ;
135146 self . as_mut ( ) . send_result ( item) ;
136147 }
@@ -296,7 +307,10 @@ impl<SinkItem> Pipeline<SinkItem>
296307where
297308 SinkItem : Send + ' static ,
298309{
299- fn new < T > ( sink_stream : T ) -> ( Self , impl Future < Output = ( ) > )
310+ fn new < T > (
311+ sink_stream : T ,
312+ disconnect_notifier : Option < Box < dyn DisconnectNotifier > > ,
313+ ) -> ( Self , impl Future < Output = ( ) > )
300314 where
301315 T : Sink < SinkItem , Error = RedisError > + Stream < Item = RedisResult < Value > > + ' static ,
302316 T : Send + ' static ,
@@ -308,7 +322,13 @@ where
308322 let ( sender, mut receiver) = mpsc:: channel ( BUFFER_SIZE ) ;
309323 let push_manager: Arc < ArcSwap < PushManager > > =
310324 Arc :: new ( ArcSwap :: new ( Arc :: new ( PushManager :: default ( ) ) ) ) ;
311- let sink = PipelineSink :: new :: < SinkItem > ( sink_stream, push_manager. clone ( ) ) ;
325+ let is_stream_closed = Arc :: new ( AtomicBool :: new ( false ) ) ;
326+ let sink = PipelineSink :: new :: < SinkItem > (
327+ sink_stream,
328+ push_manager. clone ( ) ,
329+ disconnect_notifier,
330+ is_stream_closed. clone ( ) ,
331+ ) ;
312332 let f = stream:: poll_fn ( move |cx| receiver. poll_recv ( cx) )
313333 . map ( Ok )
314334 . forward ( sink)
@@ -317,6 +337,7 @@ where
317337 Pipeline {
318338 sender,
319339 push_manager,
340+ is_stream_closed,
320341 } ,
321342 f,
322343 )
@@ -363,6 +384,10 @@ where
363384 async fn set_push_manager ( & mut self , push_manager : PushManager ) {
364385 self . push_manager . store ( Arc :: new ( push_manager) ) ;
365386 }
387+
388+ pub fn is_closed ( & self ) -> bool {
389+ self . is_stream_closed . load ( Ordering :: Relaxed )
390+ }
366391}
367392
368393/// A connection object which can be cloned, allowing requests to be be sent concurrently
@@ -392,6 +417,7 @@ impl MultiplexedConnection {
392417 connection_info : & ConnectionInfo ,
393418 stream : C ,
394419 push_sender : Option < mpsc:: UnboundedSender < PushInfo > > ,
420+ disconnect_notifier : Option < Box < dyn DisconnectNotifier > > ,
395421 ) -> RedisResult < ( Self , impl Future < Output = ( ) > ) >
396422 where
397423 C : Unpin + AsyncRead + AsyncWrite + Send + ' static ,
@@ -401,6 +427,7 @@ impl MultiplexedConnection {
401427 stream,
402428 std:: time:: Duration :: MAX ,
403429 push_sender,
430+ disconnect_notifier,
404431 )
405432 . await
406433 }
@@ -412,6 +439,7 @@ impl MultiplexedConnection {
412439 stream : C ,
413440 response_timeout : std:: time:: Duration ,
414441 push_sender : Option < mpsc:: UnboundedSender < PushInfo > > ,
442+ disconnect_notifier : Option < Box < dyn DisconnectNotifier > > ,
415443 ) -> RedisResult < ( Self , impl Future < Output = ( ) > ) >
416444 where
417445 C : Unpin + AsyncRead + AsyncWrite + Send + ' static ,
@@ -429,7 +457,7 @@ impl MultiplexedConnection {
429457 let codec = ValueCodec :: default ( )
430458 . framed ( stream)
431459 . and_then ( |msg| async move { msg } ) ;
432- let ( mut pipeline, driver) = Pipeline :: new ( codec) ;
460+ let ( mut pipeline, driver) = Pipeline :: new ( codec, disconnect_notifier ) ;
433461 let driver = boxed ( driver) ;
434462 let pm = PushManager :: default ( ) ;
435463 if let Some ( sender) = push_sender {
@@ -560,6 +588,10 @@ impl ConnectionLike for MultiplexedConnection {
560588 fn get_db ( & self ) -> i64 {
561589 self . db
562590 }
591+
592+ fn is_closed ( & self ) -> bool {
593+ self . pipeline . is_closed ( )
594+ }
563595}
564596impl MultiplexedConnection {
565597 /// Subscribes to a new channel.
0 commit comments