@@ -11,7 +11,7 @@ use std::sync::Arc;
1111use std:: time:: Duration ;
1212use tokio:: io:: { AsyncReadExt , AsyncWriteExt } ;
1313use tokio:: net:: TcpStream ;
14- use tokio:: time:: timeout;
14+ use tokio:: time:: { timeout, Instant } ;
1515
1616/// Default timeout for read operations
1717const DEFAULT_READ_TIMEOUT : Duration = Duration :: from_secs ( 20 ) ;
@@ -124,16 +124,23 @@ impl Connection for TcpConnection {
124124 /// - `Ok(Frame)` - Successfully received frame
125125 /// - `Err` - Connection error, EOF, parse error, or timeout
126126 async fn read_frame ( & mut self ) -> crate :: Result < Frame > {
127+ let deadline = Instant :: now ( ) + self . read_timeout ;
128+
127129 loop {
130+ if Instant :: now ( ) > deadline {
131+ return Err ( "read timeout" . into ( ) ) ;
132+ }
133+
128134 if let Ok ( frame) = self . parse_frame ( ) {
129135 if let Some ( frame) = frame {
130136 return Ok ( frame) ;
131137 }
132138 }
133139
134- // Read with timeout
140+ let remaining = deadline. saturating_duration_since ( Instant :: now ( ) ) ;
141+
135142 let read_result = timeout (
136- self . read_timeout ,
143+ remaining ,
137144 self . socket . read_buf ( & mut self . input_stream )
138145 ) . await ;
139146
@@ -145,15 +152,11 @@ impl Connection for TcpConnection {
145152 Err ( "connection reset by peer" . into ( ) )
146153 } ;
147154 }
148- Ok ( Ok ( _n) ) => {
149- // Successfully read n bytes, continue loop to parse
150- }
151- Ok ( Err ( e) ) => {
152- return Err ( e. into ( ) ) ;
153- }
154- Err ( _) => {
155- return Err ( "read timeout" . into ( ) ) ;
156- }
155+ Ok ( Ok ( n) ) => {
156+ tracing:: debug!( "read {} bytes" , n)
157+ } ,
158+ Ok ( Err ( e) ) => return Err ( e. into ( ) ) ,
159+ Err ( _) => return Err ( "read timeout" . into ( ) ) ,
157160 }
158161 }
159162 }
0 commit comments