From ae21b0e0d58ef5432bc97f90525cbae882662a48 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Mon, 24 Nov 2025 17:26:49 -0800 Subject: [PATCH] Shutdown NetTx stream when it exits Summary: We currently do not explicitly close `NetTx`'s write stream when it exits, as a result, we will see the following log on the `NetRx` side when using `MetaTls`: ``` [-]I1124 14:09:27.934423 629996 fbcode/monarch/hyperactor/src/channel/net/server.rs:592] error processing peer connection, source:tcp:[2401:db00:eef0:1120:3520:0:6c09:3cba]:57588, dest:metatls:devvm16922.vll0.facebook.com:35865, error:session metatls:devvm16922.vll0.facebook.com:35865.18243036951658569703<-tcp:[2401:db00:eef0:1120:3520:0:6c09:3cba]:57588: reading into Frame with M = u64 Caused by: peer closed connection without sending TLS close_notify: https://docs.rs/rustls/latest/rustls/manual/_03_howto/index.html#unexpected-eof ``` This diff fixes that. Differential Revision: D87823699 --- hyperactor/src/channel/net.rs | 5 +-- hyperactor/src/channel/net/client.rs | 54 ++++++++++++++++------------ 2 files changed, 35 insertions(+), 24 deletions(-) diff --git a/hyperactor/src/channel/net.rs b/hyperactor/src/channel/net.rs index cfab5bc70..686688082 100644 --- a/hyperactor/src/channel/net.rs +++ b/hyperactor/src/channel/net.rs @@ -1086,11 +1086,12 @@ mod tests { assert!(rx.await.is_err()); } - #[tracing_test::traced_test] - #[tokio::test] + #[async_timed_test(timeout_secs = 60)] // TODO: OSS: failed to retrieve ipv6 address #[cfg_attr(not(fbcode_build), ignore)] async fn test_meta_tls_basic() { + hyperactor_telemetry::initialize_logging_for_test(); + let addr = ChannelAddr::any(ChannelTransport::MetaTls(TlsMode::IpV6)); let meta_addr = match addr { ChannelAddr::MetaTls(meta_addr) => meta_addr, diff --git a/hyperactor/src/channel/net/client.rs b/hyperactor/src/channel/net/client.rs index 52f80ef93..4938dc73a 100644 --- a/hyperactor/src/channel/net/client.rs +++ b/hyperactor/src/channel/net/client.rs @@ -645,29 +645,39 @@ async fn run( ); } - if let Conn::Connected { - write_state: WriteState::Writing(mut frame_writer, ()), - .. - } = conn - { - if let Err(err) = frame_writer.send().await { - tracing::info!( - parent: &span, - dest = %dest, - error = %err, - session_id = session_id, - "write error during cleanup" - ); - } else if let Err(err) = frame_writer.complete().flush().await { - tracing::info!( - parent: &span, - dest = %dest, - error = %err, - session_id = session_id, - "flush error during cleanup" - ); + match conn { + Conn::Connected { write_state, .. } => { + let write_half = match write_state { + WriteState::Writing(mut frame_writer, ()) => { + if let Err(err) = frame_writer.send().await { + tracing::info!( + parent: &span, + dest = %dest, + error = %err, + session_id = session_id, + "write error during cleanup" + ); + } + Some(frame_writer.complete()) + } + WriteState::Idle(writer) => Some(writer), + WriteState::Broken => None, + }; + + if let Some(mut w) = write_half { + if let Err(err) = w.shutdown().await { + tracing::info!( + parent: &span, + dest = %dest, + error = %err, + session_id = session_id, + "failed to shutdown NetTx write stream during cleanup" + ); + } + } } - } + Conn::Disconnected(_) => (), + }; tracing::info!( parent: &span,