11use super :: { ConnectionLike , Runtime } ;
22use crate :: aio:: setup_connection;
3+ use crate :: aio:: DisconnectNotifier ;
4+ use crate :: client:: GlideConnectionOptions ;
35use crate :: cmd:: Cmd ;
46#[ cfg( any( feature = "tokio-comp" , feature = "async-std-comp" ) ) ]
57use crate :: parser:: ValueCodec ;
68use crate :: push_manager:: PushManager ;
79use crate :: types:: { RedisError , RedisFuture , RedisResult , Value } ;
8- use crate :: { cmd, ConnectionInfo , ProtocolVersion , PushInfo , PushKind } ;
10+ use crate :: { cmd, ConnectionInfo , ProtocolVersion , PushKind } ;
911use :: tokio:: {
1012 io:: { AsyncRead , AsyncWrite } ,
1113 sync:: { mpsc, oneshot} ,
@@ -23,6 +25,7 @@ use std::fmt;
2325use std:: fmt:: Debug ;
2426use std:: io;
2527use std:: pin:: Pin ;
28+ use std:: sync:: atomic:: { AtomicBool , Ordering } ;
2629use std:: sync:: Arc ;
2730use std:: task:: { self , Poll } ;
2831use std:: time:: Duration ;
@@ -73,19 +76,11 @@ struct PipelineMessage<S> {
7376/// items being output by the `Stream` (the number is specified at time of sending). With the
7477/// interface provided by `Pipeline` an easy interface of request to response, hiding the `Stream`
7578/// and `Sink`.
79+ #[ derive( Clone ) ]
7680struct Pipeline < SinkItem > {
7781 sender : mpsc:: Sender < PipelineMessage < SinkItem > > ,
78-
7982 push_manager : Arc < ArcSwap < PushManager > > ,
80- }
81-
82- impl < SinkItem > Clone for Pipeline < SinkItem > {
83- fn clone ( & self ) -> Self {
84- Pipeline {
85- sender : self . sender . clone ( ) ,
86- push_manager : self . push_manager . clone ( ) ,
87- }
88- }
83+ is_stream_closed : Arc < AtomicBool > ,
8984}
9085
9186impl < SinkItem > Debug for Pipeline < SinkItem >
@@ -104,14 +99,21 @@ pin_project! {
10499 in_flight: VecDeque <InFlight >,
105100 error: Option <RedisError >,
106101 push_manager: Arc <ArcSwap <PushManager >>,
102+ disconnect_notifier: Option <Box <dyn DisconnectNotifier >>,
103+ is_stream_closed: Arc <AtomicBool >,
107104 }
108105}
109106
110107impl < T > PipelineSink < T >
111108where
112109 T : Stream < Item = RedisResult < Value > > + ' static ,
113110{
114- fn new < SinkItem > ( sink_stream : T , push_manager : Arc < ArcSwap < PushManager > > ) -> Self
111+ fn new < SinkItem > (
112+ sink_stream : T ,
113+ push_manager : Arc < ArcSwap < PushManager > > ,
114+ disconnect_notifier : Option < Box < dyn DisconnectNotifier > > ,
115+ is_stream_closed : Arc < AtomicBool > ,
116+ ) -> Self
115117 where
116118 T : Sink < SinkItem , Error = RedisError > + Stream < Item = RedisResult < Value > > + ' static ,
117119 {
@@ -120,6 +122,8 @@ where
120122 in_flight : VecDeque :: new ( ) ,
121123 error : None ,
122124 push_manager,
125+ disconnect_notifier,
126+ is_stream_closed,
123127 }
124128 }
125129
@@ -130,7 +134,15 @@ where
130134 Some ( result) => result,
131135 // The redis response stream is not going to produce any more items so we `Err`
132136 // to break out of the `forward` combinator and stop handling requests
133- None => return Poll :: Ready ( Err ( ( ) ) ) ,
137+ None => {
138+ // this is the right place to notify about the passive TCP disconnect
139+ // In other places we cannot distinguish between the active destruction of MultiplexedConnection and passive disconnect
140+ if let Some ( disconnect_notifier) = self . as_mut ( ) . project ( ) . disconnect_notifier {
141+ disconnect_notifier. notify_disconnect ( ) ;
142+ }
143+ self . is_stream_closed . store ( true , Ordering :: Relaxed ) ;
144+ return Poll :: Ready ( Err ( ( ) ) ) ;
145+ }
134146 } ;
135147 self . as_mut ( ) . send_result ( item) ;
136148 }
@@ -296,7 +308,10 @@ impl<SinkItem> Pipeline<SinkItem>
296308where
297309 SinkItem : Send + ' static ,
298310{
299- fn new < T > ( sink_stream : T ) -> ( Self , impl Future < Output = ( ) > )
311+ fn new < T > (
312+ sink_stream : T ,
313+ disconnect_notifier : Option < Box < dyn DisconnectNotifier > > ,
314+ ) -> ( Self , impl Future < Output = ( ) > )
300315 where
301316 T : Sink < SinkItem , Error = RedisError > + Stream < Item = RedisResult < Value > > + ' static ,
302317 T : Send + ' static ,
@@ -308,7 +323,13 @@ where
308323 let ( sender, mut receiver) = mpsc:: channel ( BUFFER_SIZE ) ;
309324 let push_manager: Arc < ArcSwap < PushManager > > =
310325 Arc :: new ( ArcSwap :: new ( Arc :: new ( PushManager :: default ( ) ) ) ) ;
311- let sink = PipelineSink :: new :: < SinkItem > ( sink_stream, push_manager. clone ( ) ) ;
326+ let is_stream_closed = Arc :: new ( AtomicBool :: new ( false ) ) ;
327+ let sink = PipelineSink :: new :: < SinkItem > (
328+ sink_stream,
329+ push_manager. clone ( ) ,
330+ disconnect_notifier,
331+ is_stream_closed. clone ( ) ,
332+ ) ;
312333 let f = stream:: poll_fn ( move |cx| receiver. poll_recv ( cx) )
313334 . map ( Ok )
314335 . forward ( sink)
@@ -317,6 +338,7 @@ where
317338 Pipeline {
318339 sender,
319340 push_manager,
341+ is_stream_closed,
320342 } ,
321343 f,
322344 )
@@ -363,6 +385,10 @@ where
363385 async fn set_push_manager ( & mut self , push_manager : PushManager ) {
364386 self . push_manager . store ( Arc :: new ( push_manager) ) ;
365387 }
388+
389+ pub fn is_closed ( & self ) -> bool {
390+ self . is_stream_closed . load ( Ordering :: Relaxed )
391+ }
366392}
367393
368394/// A connection object which can be cloned, allowing requests to be be sent concurrently
@@ -391,7 +417,7 @@ impl MultiplexedConnection {
391417 pub async fn new < C > (
392418 connection_info : & ConnectionInfo ,
393419 stream : C ,
394- push_sender : Option < mpsc :: UnboundedSender < PushInfo > > ,
420+ glide_connection_options : GlideConnectionOptions ,
395421 ) -> RedisResult < ( Self , impl Future < Output = ( ) > ) >
396422 where
397423 C : Unpin + AsyncRead + AsyncWrite + Send + ' static ,
@@ -400,7 +426,7 @@ impl MultiplexedConnection {
400426 connection_info,
401427 stream,
402428 std:: time:: Duration :: MAX ,
403- push_sender ,
429+ glide_connection_options ,
404430 )
405431 . await
406432 }
@@ -411,7 +437,7 @@ impl MultiplexedConnection {
411437 connection_info : & ConnectionInfo ,
412438 stream : C ,
413439 response_timeout : std:: time:: Duration ,
414- push_sender : Option < mpsc :: UnboundedSender < PushInfo > > ,
440+ glide_connection_options : GlideConnectionOptions ,
415441 ) -> RedisResult < ( Self , impl Future < Output = ( ) > ) >
416442 where
417443 C : Unpin + AsyncRead + AsyncWrite + Send + ' static ,
@@ -429,10 +455,11 @@ impl MultiplexedConnection {
429455 let codec = ValueCodec :: default ( )
430456 . framed ( stream)
431457 . and_then ( |msg| async move { msg } ) ;
432- let ( mut pipeline, driver) = Pipeline :: new ( codec) ;
458+ let ( mut pipeline, driver) =
459+ Pipeline :: new ( codec, glide_connection_options. disconnect_notifier ) ;
433460 let driver = boxed ( driver) ;
434461 let pm = PushManager :: default ( ) ;
435- if let Some ( sender) = push_sender {
462+ if let Some ( sender) = glide_connection_options . push_sender {
436463 pm. replace_sender ( sender) ;
437464 }
438465
@@ -560,6 +587,10 @@ impl ConnectionLike for MultiplexedConnection {
560587 fn get_db ( & self ) -> i64 {
561588 self . db
562589 }
590+
591+ fn is_closed ( & self ) -> bool {
592+ self . pipeline . is_closed ( )
593+ }
563594}
564595impl MultiplexedConnection {
565596 /// Subscribes to a new channel.
0 commit comments