11use std:: { fmt, future:: Future , future:: poll_fn, rc:: Rc , task:: Context , task:: Poll } ;
22
3- use ntex_io :: DispatchItem ;
3+ use ntex_dispatcher :: { DispatchItem , Reason as DispReason } ;
44use ntex_service:: { Pipeline , Service , ServiceCtx } ;
5- use ntex_util:: future:: { Either , join} ;
6- use ntex_util:: { HashMap , spawn} ;
5+ use ntex_util:: { HashMap , future:: Either , future:: join, spawn} ;
76
87use crate :: connection:: { Connection , RecvHalfConnection } ;
98use crate :: control:: { Control , ControlAck } ;
@@ -247,19 +246,19 @@ where
247246 Ok ( None )
248247 }
249248 } ,
250- DispatchItem :: EncoderError ( err) => {
249+ DispatchItem :: Stop ( DispReason :: Encoder ( err) ) => {
251250 let err = ConnectionError :: from ( err) ;
252251 let streams = self . connection . proto_error ( & err) ;
253252 self . handle_connection_error ( streams, err. into ( ) ) ;
254253 control ( Control :: proto_error ( err) , & self . inner , ctx) . await
255254 }
256- DispatchItem :: DecoderError ( err) => {
255+ DispatchItem :: Stop ( DispReason :: Decoder ( err) ) => {
257256 let err = ConnectionError :: from ( err) ;
258257 let streams = self . connection . proto_error ( & err) ;
259258 self . handle_connection_error ( streams, err. into ( ) ) ;
260259 control ( Control :: proto_error ( err) , & self . inner , ctx) . await
261260 }
262- DispatchItem :: KeepAliveTimeout => {
261+ DispatchItem :: Stop ( DispReason :: KeepAliveTimeout ) => {
263262 log:: warn!(
264263 "{}: did not receive pong response in time, closing connection" ,
265264 self . connection. tag( ) ,
@@ -273,7 +272,7 @@ where
273272 )
274273 . await
275274 }
276- DispatchItem :: ReadTimeout => {
275+ DispatchItem :: Stop ( DispReason :: ReadTimeout ) => {
277276 log:: warn!(
278277 "{}: did not receive complete frame in time, closing connection" ,
279278 self . connection. tag( ) ,
@@ -287,12 +286,12 @@ where
287286 )
288287 . await
289288 }
290- DispatchItem :: Disconnect ( err) => {
289+ DispatchItem :: Stop ( DispReason :: Io ( err) ) => {
291290 let streams = self . connection . disconnect ( ) ;
292291 self . handle_connection_error ( streams, OperationError :: Disconnected ) ;
293292 control ( Control :: peer_gone ( err) , & self . inner , ctx) . await
294293 }
295- DispatchItem :: WBackPressureEnabled | DispatchItem :: WBackPressureDisabled => Ok ( None ) ,
294+ DispatchItem :: Control ( _ ) => Ok ( None ) ,
296295 }
297296 }
298297}
@@ -346,10 +345,10 @@ where
346345{
347346 match ctx. call ( inner. control . get_ref ( ) , pkt) . await {
348347 Ok ( res) => {
349- if let Some ( Frame :: Reset ( ref rst) ) = res. frame {
350- if !rst. stream_id ( ) . is_zero ( ) {
351- inner . connection . rst_stream ( rst . stream_id ( ) , rst . reason ( ) ) ;
352- }
348+ if let Some ( Frame :: Reset ( ref rst) ) = res. frame
349+ && !rst. stream_id ( ) . is_zero ( )
350+ {
351+ inner . connection . rst_stream ( rst . stream_id ( ) , rst . reason ( ) ) ;
353352 }
354353 if let Some ( frm) = res. frame {
355354 inner. connection . encode ( frm) ;
0 commit comments