@@ -110,19 +110,40 @@ async fn copy_with_idle_timeout(
110110) -> Result < ( ) > {
111111 let tracker = Arc :: new ( SharedIdleTracker :: new ( ) ) ;
112112
113- let mut timeout_client = IdleTimeoutStream :: new ( client, tracker. clone ( ) , IDLE_TIMEOUT ) ;
114- let mut timeout_proxy = IdleTimeoutStream :: new ( proxy, tracker, IDLE_TIMEOUT ) ;
113+ let mut timeout_client = ActiveStream :: new ( client, tracker. clone ( ) ) ;
114+ let mut timeout_proxy = ActiveStream :: new ( proxy, tracker. clone ( ) ) ;
115115
116- match copy_bidirectional ( & mut timeout_client, & mut timeout_proxy) . await {
117- Ok ( ( up, down) ) => {
118- stats:: update_metrics ( runtime, Protocol :: Tcp , proxy_name, target_addr, up, down) ;
119- Ok ( ( ) )
116+ let copy_task = copy_bidirectional ( & mut timeout_client, & mut timeout_proxy) ;
117+
118+ let monitor_task = async {
119+ loop {
120+ tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
121+ if tracker. is_idle ( IDLE_TIMEOUT ) {
122+ break ;
123+ }
124+ }
125+ } ;
126+
127+ tokio:: select! {
128+ res = copy_task => {
129+ if let Err ( e) = res {
130+ debug!( "TCP relay error or closed: {}" , e) ;
131+ }
120132 }
121- Err ( e) => {
122- debug ! ( "TCP relay error: {}" , e) ;
123- Ok ( ( ) )
133+ _ = monitor_task => {
134+ debug!( "TCP relay idle timeout for {}" , target_addr) ;
124135 }
125136 }
137+
138+ stats:: update_metrics (
139+ runtime,
140+ Protocol :: Tcp ,
141+ proxy_name,
142+ target_addr,
143+ timeout_client. read_bytes ,
144+ timeout_proxy. read_bytes ,
145+ ) ;
146+ Ok ( ( ) )
126147}
127148
128149async fn find_session_target (
@@ -178,94 +199,62 @@ impl SharedIdleTracker {
178199 }
179200}
180201
181- struct IdleTimeoutStream < T > {
202+ struct ActiveStream < T > {
182203 inner : T ,
183204 tracker : Arc < SharedIdleTracker > ,
184- timeout : Duration ,
205+ read_bytes : u64 ,
185206}
186207
187- impl < T > IdleTimeoutStream < T > {
188- fn new ( inner : T , tracker : Arc < SharedIdleTracker > , timeout : Duration ) -> Self {
208+ impl < T > ActiveStream < T > {
209+ fn new ( inner : T , tracker : Arc < SharedIdleTracker > ) -> Self {
189210 Self {
190211 inner,
191212 tracker,
192- timeout ,
213+ read_bytes : 0 ,
193214 }
194215 }
195216
196217 fn update_activity ( & self ) {
197218 self . tracker . update_activity ( ) ;
198219 }
199-
200- fn check_idle ( & self ) -> tokio:: io:: Result < ( ) > {
201- if self . tracker . is_idle ( self . timeout ) {
202- return Err ( tokio:: io:: Error :: new (
203- tokio:: io:: ErrorKind :: TimedOut ,
204- "idle timeout - no activity on either side" ,
205- ) ) ;
206- }
207- Ok ( ( ) )
208- }
209-
210- fn is_normal_close ( e : & std:: io:: Error ) -> bool {
211- matches ! (
212- e. kind( ) ,
213- std:: io:: ErrorKind :: BrokenPipe
214- | std:: io:: ErrorKind :: ConnectionReset
215- | std:: io:: ErrorKind :: ConnectionAborted
216- | std:: io:: ErrorKind :: UnexpectedEof
217- )
218- }
219220}
220221
221- impl < T : AsyncRead + Unpin > AsyncRead for IdleTimeoutStream < T > {
222+ impl < T : AsyncRead + Unpin > AsyncRead for ActiveStream < T > {
222223 fn poll_read (
223224 mut self : Pin < & mut Self > ,
224225 cx : & mut Context < ' _ > ,
225226 buf : & mut tokio:: io:: ReadBuf < ' _ > ,
226227 ) -> Poll < std:: io:: Result < ( ) > > {
227- self . check_idle ( ) ?;
228-
229228 let initial_filled = buf. filled ( ) . len ( ) ;
230229 let poll = Pin :: new ( & mut self . inner ) . poll_read ( cx, buf) ;
231230
232- match & poll {
233- Poll :: Ready ( Ok ( ( ) ) ) if buf. filled ( ) . len ( ) > initial_filled => {
231+ if let Poll :: Ready ( Ok ( ( ) ) ) = & poll {
232+ let n = buf. filled ( ) . len ( ) - initial_filled;
233+ if n > 0 {
234234 self . update_activity ( ) ;
235+ self . read_bytes += n as u64 ;
235236 }
236- Poll :: Ready ( Err ( e) ) if Self :: is_normal_close ( e) => {
237- return Poll :: Ready ( Ok ( ( ) ) ) ;
238- }
239- _ => { }
240237 }
241238
242239 poll
243240 }
244241}
245242
246- impl < T : AsyncWrite + Unpin > AsyncWrite for IdleTimeoutStream < T > {
243+ impl < T : AsyncWrite + Unpin > AsyncWrite for ActiveStream < T > {
247244 fn poll_write (
248245 mut self : Pin < & mut Self > ,
249246 cx : & mut Context < ' _ > ,
250247 buf : & [ u8 ] ,
251248 ) -> Poll < Result < usize , Error > > {
252- self . check_idle ( ) ?;
253-
254249 let poll = Pin :: new ( & mut self . inner ) . poll_write ( cx, buf) ;
255250
256- match poll {
257- Poll :: Ready ( Ok ( n) ) => {
258- if n > 0 {
259- self . update_activity ( ) ;
260- }
261- Poll :: Ready ( Ok ( n) )
262- }
263- Poll :: Ready ( Err ( e) ) if Self :: is_normal_close ( & e) => {
264- // Treat normal close as successful write of all bytes
265- Poll :: Ready ( Ok ( buf. len ( ) ) )
251+ if let Poll :: Ready ( Ok ( n) ) = & poll {
252+ if * n > 0 {
253+ self . update_activity ( ) ;
266254 }
267- _ => poll,
268255 }
256+
257+ poll
269258 }
270259
271260 fn poll_flush ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , Error > > {
0 commit comments