Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions hyperactor/src/channel/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1086,11 +1086,12 @@ mod tests {
assert!(rx.await.is_err());
}

#[tracing_test::traced_test]
#[tokio::test]
#[async_timed_test(timeout_secs = 60)]
// TODO: OSS: failed to retrieve ipv6 address
#[cfg_attr(not(fbcode_build), ignore)]
async fn test_meta_tls_basic() {
hyperactor_telemetry::initialize_logging_for_test();

let addr = ChannelAddr::any(ChannelTransport::MetaTls(TlsMode::IpV6));
let meta_addr = match addr {
ChannelAddr::MetaTls(meta_addr) => meta_addr,
Expand Down
54 changes: 32 additions & 22 deletions hyperactor/src/channel/net/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,29 +645,39 @@ async fn run<M: RemoteMessage>(
);
}

if let Conn::Connected {
write_state: WriteState::Writing(mut frame_writer, ()),
..
} = conn
{
if let Err(err) = frame_writer.send().await {
tracing::info!(
parent: &span,
dest = %dest,
error = %err,
session_id = session_id,
"write error during cleanup"
);
} else if let Err(err) = frame_writer.complete().flush().await {
tracing::info!(
parent: &span,
dest = %dest,
error = %err,
session_id = session_id,
"flush error during cleanup"
);
match conn {
Conn::Connected { write_state, .. } => {
let write_half = match write_state {
WriteState::Writing(mut frame_writer, ()) => {
if let Err(err) = frame_writer.send().await {
tracing::info!(
parent: &span,
dest = %dest,
error = %err,
session_id = session_id,
"write error during cleanup"
);
}
Some(frame_writer.complete())
}
WriteState::Idle(writer) => Some(writer),
WriteState::Broken => None,
};

if let Some(mut w) = write_half {
if let Err(err) = w.shutdown().await {
tracing::info!(
parent: &span,
dest = %dest,
error = %err,
session_id = session_id,
"failed to shutdown NetTx write stream during cleanup"
);
}
}
}
}
Conn::Disconnected(_) => (),
};

tracing::info!(
parent: &span,
Expand Down