Skip to content

Commit 298ec60

Browse files
James Sunfacebook-github-bot
authored andcommitted
ignore sigpipe (#835)
Summary: Pull Request resolved: #835 SAFETY: ignore SIGPIPE as hyperactor tx/rx channels have the following issue: When tx tries to send a message, it can either do try_post/post or send. try_post/post is async and send is sync. However, both methods do not have a way to tell the receiver's state. For try_post/post, it will send messages in the background. However, if the receiver process gets killed, try_post/post will continue sending messages leading to SIGPIPE and crash the parent process. For send, it is a sync call. It waits on the receiver to ack. However, if the receiver is dead or not started yet, the send will be blocked. We need to fix the above issues but before that, ignore SIGPIPE as a mitigation. As a specific victim of this, the log sender streams the log to the log forwarder. However, it doesn't know the state of the log forwarder. It could have not started yet or being killed. If we use tx.send, it will be blocked leading to stuck allocator. If we use tx.post, it will get back sigpipe on child process shutdown. Reviewed By: LucasLLC, vidhyav Differential Revision: D80131609 fbshipit-source-id: 374165d1ceb835a3ea618ba0931b7464304022f2
1 parent 2d952f9 commit 298ec60

File tree

3 files changed

+28
-33
lines changed

3 files changed

+28
-33
lines changed

hyperactor_mesh/src/logging.rs

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ pub trait LogSender: Send + Sync {
289289

290290
/// Flush the log channel, ensuring all messages are delivered
291291
/// Returns when the flush message has been acknowledged
292-
async fn flush(&mut self) -> anyhow::Result<()>;
292+
fn flush(&mut self) -> anyhow::Result<()>;
293293
}
294294

295295
/// Represents the target output stream (stdout or stderr)
@@ -331,7 +331,7 @@ impl LocalLogSender {
331331
impl LogSender for LocalLogSender {
332332
fn send(&mut self, target: OutputTarget, payload: Vec<u8>) -> anyhow::Result<()> {
333333
if TxStatus::Active == *self.status.borrow() {
334-
// post does not guarantee the message to be delivered
334+
// Do not use tx.send, it will block the allocator as the child process state is unknown.
335335
self.tx.post(LogMessage::Log {
336336
hostname: self.hostname.clone(),
337337
pid: self.pid,
@@ -348,23 +348,18 @@ impl LogSender for LocalLogSender {
348348
Ok(())
349349
}
350350

351-
async fn flush(&mut self) -> anyhow::Result<()> {
351+
fn flush(&mut self) -> anyhow::Result<()> {
352352
// send will make sure message is delivered
353353
if TxStatus::Active == *self.status.borrow() {
354-
match self.tx.send(LogMessage::Flush {}).await {
355-
Ok(()) => Ok(()),
356-
Err(e) => {
357-
tracing::error!("log sender {} error sending flush message: {}", self.pid, e);
358-
Err(anyhow::anyhow!("error sending flush message: {}", e))
359-
}
360-
}
354+
// Do not use tx.send, it will block the allocator as the child process state is unknown.
355+
self.tx.post(LogMessage::Flush {});
361356
} else {
362357
tracing::debug!(
363358
"log sender {} is not active, skip sending flush message",
364359
self.tx.addr()
365360
);
366-
Ok(())
367361
}
362+
Ok(())
368363
}
369364
}
370365

@@ -523,30 +518,12 @@ impl<T: LogSender + Unpin + 'static, S: io::AsyncWrite + Send + Unpin + 'static>
523518
fn poll_flush(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Result<(), io::Error>> {
524519
let this = self.get_mut();
525520

526-
// First, flush the standard writer
527521
match Pin::new(&mut this.std_writer).poll_flush(cx) {
528522
Poll::Ready(Ok(())) => {
529-
// Now send a Flush message to the other side of the channel.
530-
let mut flush_future = this.log_sender.flush();
531-
match flush_future.as_mut().poll(cx) {
532-
Poll::Ready(Ok(())) => {
533-
// Successfully sent the flush message
534-
Poll::Ready(Ok(()))
535-
}
536-
Poll::Ready(Err(e)) => {
537-
// Error sending the flush message
538-
tracing::error!("error sending flush message: {}", e);
539-
Poll::Ready(Err(io::Error::other(format!(
540-
"error sending flush message: {}",
541-
e
542-
))))
543-
}
544-
Poll::Pending => {
545-
// The future is not ready yet, so we return Pending
546-
// The waker is already registered by polling the future
547-
Poll::Pending
548-
}
523+
if let Err(e) = this.log_sender.flush() {
524+
tracing::error!("error sending flush: {}", e);
549525
}
526+
Poll::Ready(Ok(()))
550527
}
551528
other => other, // Propagate any errors or Pending state from the std_writer flush
552529
}
@@ -1044,7 +1021,7 @@ mod tests {
10441021
.map_err(|e| anyhow::anyhow!("Failed to send log in test: {}", e))
10451022
}
10461023

1047-
async fn flush(&mut self) -> anyhow::Result<()> {
1024+
fn flush(&mut self) -> anyhow::Result<()> {
10481025
// Mark that flush was called
10491026
let mut flush_called = self.flush_called.lock().unwrap();
10501027
*flush_called = true;

monarch_hyperactor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ hyperactor_multiprocess = { version = "0.0.0", path = "../hyperactor_multiproces
2626
hyperactor_telemetry = { version = "0.0.0", path = "../hyperactor_telemetry" }
2727
inventory = "0.3.8"
2828
lazy_static = "1.5"
29+
libc = "0.2.139"
2930
monarch_types = { version = "0.0.0", path = "../monarch_types" }
3031
ndslice = { version = "0.0.0", path = "../ndslice" }
3132
nix = { version = "0.30.1", features = ["dir", "event", "hostname", "inotify", "ioctl", "mman", "mount", "net", "poll", "ptrace", "reboot", "resource", "sched", "signal", "term", "time", "user", "zerocopy"] }

monarch_hyperactor/src/bin/process_allocator/common.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,23 @@ pub fn main_impl(
5252
program: Command,
5353
timeout: Option<Duration>,
5454
) -> tokio::task::JoinHandle<Result<(), anyhow::Error>> {
55+
#[cfg(unix)]
56+
fn ignore_sigpipe() {
57+
// SAFETY: ignore SIGPIPE as hyperactor tx/rx channels have the following issue:
58+
// When tx tries to send a message, it can either do try_post/post or send.
59+
// try_post/post is async and send is sync. However, both methods do not have a way to tell the receiver's state.
60+
// For try_post/post, it will send messages in the background. However, if the receiver process gets killed,
61+
// try_post/post will continue sending messages leading to SIGPIPE and crash the parent process.
62+
// For send, it is a sync call. It waits on the receiver to ack. However, if the receiver is dead or not started yet,
63+
// the send will be blocked.
64+
//
65+
// TODO: We need to fix the above issues but before that, ignore SIGPIPE as a mitigation.
66+
unsafe {
67+
assert!(libc::signal(libc::SIGPIPE, libc::SIG_IGN) != libc::SIG_ERR);
68+
}
69+
}
70+
ignore_sigpipe();
71+
5572
tracing::info!("bind address is: {}", serve_address);
5673
tracing::info!("program to spawn on allocation request: [{:?}]", &program);
5774

0 commit comments

Comments
 (0)