Skip to content

Commit 7271a2e

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
Shutdown NetTx stream when it exits (meta-pytorch#1993)
Summary: Pull Request resolved: meta-pytorch#1993 We currently do not explicitly close `NetTx`'s write stream when it exits. As a result, we will see the following log on the `NetRx` side when using `MetaTls`: ``` [-]I1124 14:09:27.934423 629996 fbcode/monarch/hyperactor/src/channel/net/server.rs:592] error processing peer connection, source:tcp:[2401:db00:eef0:1120:3520:0:6c09:3cba]:57588, dest:metatls:devvm16922.vll0.facebook.com:35865, error:session metatls:devvm16922.vll0.facebook.com:35865.18243036951658569703<-tcp:[2401:db00:eef0:1120:3520:0:6c09:3cba]:57588: reading into Frame with M = u64 Caused by: peer closed connection without sending TLS close_notify: https://docs.rs/rustls/latest/rustls/manual/_03_howto/index.html#unexpected-eof ``` This diff fixes that. Reviewed By: mariusae Differential Revision: D87823699 fbshipit-source-id: a84c57d5a2b0ebc384931154908ffd9c03eb1851
1 parent 3e75bfc commit 7271a2e

File tree

2 files changed

+35
-24
lines changed

2 files changed

+35
-24
lines changed

hyperactor/src/channel/net.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,11 +1086,12 @@ mod tests {
10861086
assert!(rx.await.is_err());
10871087
}
10881088

1089-
#[tracing_test::traced_test]
1090-
#[tokio::test]
1089+
#[async_timed_test(timeout_secs = 60)]
10911090
// TODO: OSS: failed to retrieve ipv6 address
10921091
#[cfg_attr(not(fbcode_build), ignore)]
10931092
async fn test_meta_tls_basic() {
1093+
hyperactor_telemetry::initialize_logging_for_test();
1094+
10941095
let addr = ChannelAddr::any(ChannelTransport::MetaTls(TlsMode::IpV6));
10951096
let meta_addr = match addr {
10961097
ChannelAddr::MetaTls(meta_addr) => meta_addr,

hyperactor/src/channel/net/client.rs

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -645,29 +645,39 @@ async fn run<M: RemoteMessage>(
645645
);
646646
}
647647

648-
if let Conn::Connected {
649-
write_state: WriteState::Writing(mut frame_writer, ()),
650-
..
651-
} = conn
652-
{
653-
if let Err(err) = frame_writer.send().await {
654-
tracing::info!(
655-
parent: &span,
656-
dest = %dest,
657-
error = %err,
658-
session_id = session_id,
659-
"write error during cleanup"
660-
);
661-
} else if let Err(err) = frame_writer.complete().flush().await {
662-
tracing::info!(
663-
parent: &span,
664-
dest = %dest,
665-
error = %err,
666-
session_id = session_id,
667-
"flush error during cleanup"
668-
);
648+
match conn {
649+
Conn::Connected { write_state, .. } => {
650+
let write_half = match write_state {
651+
WriteState::Writing(mut frame_writer, ()) => {
652+
if let Err(err) = frame_writer.send().await {
653+
tracing::info!(
654+
parent: &span,
655+
dest = %dest,
656+
error = %err,
657+
session_id = session_id,
658+
"write error during cleanup"
659+
);
660+
}
661+
Some(frame_writer.complete())
662+
}
663+
WriteState::Idle(writer) => Some(writer),
664+
WriteState::Broken => None,
665+
};
666+
667+
if let Some(mut w) = write_half {
668+
if let Err(err) = w.shutdown().await {
669+
tracing::info!(
670+
parent: &span,
671+
dest = %dest,
672+
error = %err,
673+
session_id = session_id,
674+
"failed to shutdown NetTx write stream during cleanup"
675+
);
676+
}
677+
}
669678
}
670-
}
679+
Conn::Disconnected(_) => (),
680+
};
671681

672682
tracing::info!(
673683
parent: &span,

0 commit comments

Comments
 (0)