@@ -60,6 +60,7 @@ pub struct IoLoop<
6060 receive_buffer : Buffer ,
6161 send_buffer : Buffer ,
6262 serialized_frames : VecDeque < ( FrameSize , FrameSending ) > ,
63+ half_closed : bool ,
6364}
6465
6566impl <
@@ -106,6 +107,7 @@ impl<
106107 receive_buffer : Buffer :: with_capacity ( FRAMES_STORAGE * frame_size as usize ) ,
107108 send_buffer : Buffer :: with_capacity ( FRAMES_STORAGE * frame_size as usize ) ,
108109 serialized_frames : VecDeque :: default ( ) ,
110+ half_closed : false ,
109111 }
110112 }
111113
@@ -205,6 +207,7 @@ impl<
205207 trace ! ( "Poison connection attempt" ) ;
206208 self . connection_status . poison ( err. clone ( ) ) ;
207209 } ) ?;
210+ self . half_closed = false ;
208211 let mut res = Ok ( ( ) ) ;
209212
210213 while self . should_continue ( & connection_killswitch) {
@@ -474,10 +477,17 @@ impl<
474477
475478 trace ! ( "read {} bytes" , sz) ;
476479 self . receive_buffer . fill ( sz) ;
480+ } else if self . half_closed {
481+ if self . reconnecting ( ) || self . channels . connection_killswitch ( ) . killed ( ) {
482+ return Ok ( true ) ;
483+ } else {
484+ self . report_connection_aborted ( ) ?;
485+ }
477486 } else {
478487 error ! (
479488 "Socket was readable but we read 0. This usually means that the connection is half closed, thus report it as broken."
480489 ) ;
490+ self . half_closed = true ;
481491 // Give a chance to parse and use frames we already read from socket before overriding the error with a custom one.
482492 if !self . handle_frames ( connection_killswitch) ?
483493 && self . internal_rpc . is_empty ( )
@@ -489,10 +499,7 @@ impl<
489499 ) ;
490500 return Ok ( true ) ;
491501 }
492- self . socket_state . handle_io_result ( Err ( io:: Error :: from (
493- io:: ErrorKind :: ConnectionAborted ,
494- )
495- . into ( ) ) ) ?;
502+ self . report_connection_aborted ( ) ?;
496503 }
497504 }
498505 }
@@ -501,6 +508,12 @@ impl<
501508 }
502509 }
503510
511+ fn report_connection_aborted ( & self ) -> Result < ( ) > {
512+ self . socket_state
513+ . handle_io_result ( Err ( io:: Error :: from ( io:: ErrorKind :: ConnectionAborted ) . into ( ) ) ) ?;
514+ Ok ( ( ) )
515+ }
516+
504517 fn serialize ( & mut self , connection_killswitch : & KillSwitch ) -> Result < ( ) > {
505518 while let Some ( next_msg) = self . frames . pop ( self . channels . flow ( ) ) {
506519 trace ! ( %next_msg, "will write to buffer" ) ;
0 commit comments