Skip to content

Commit a40d301

Browse files
Felipe Caldeirameta-codesync[bot]
authored andcommitted
Fix vtable mismatch causing infinite waker loops in optimized builds
Summary: The `tokio-uds-compat` library provides cross-platform Unix domain socket support, enabling the same async socket code to work on both Unix and Windows systems. On Unix, it directly uses Tokio's native Unix domain sockets. On Windows, it bridges between Tokio and the `async-io` runtime using `uds_windows` to emulate Unix domain socket behavior. A bug caused by this cross-runtime bridging led to CPU spikes in the Myles Daemon, which uses this library. For a detailed account of the investigation, see [this write-up](https://docs.google.com/document/d/1r0llKLSHyykZps6cvIYgdn5HX7y4NfrG4NrilDCFFAE/edit?usp=sharing). ### **Root Cause** The issue seems to stem from Rust compiler vtable deduplication optimizations in release builds: 1. **Vtable Deduplication**: In optimized builds, the compiler merges identical vtables to save memory 2. **Cross-Runtime Waker Cloning**: When `waker.clone()` was called across Tokio→async-io boundaries, optimization could return different vtable pointers for the same logical task 3. **Failed Identity Checks**: This caused `waker1.will_wake(&waker2)` to return `false` even for identical tasks, since `will_wake()` compares both data pointers AND vtable pointers 4. **Reactor Loop**: async-io's reactor continuously replaced "different" wakers instead of reusing them, creating an infinite polling loop This diff adds a helper function to prevent vtable mismatches. By cloning the waker at the async-io runtime boundary, this function ensures consistent waker identity across cross-runtime calls, thereby fixing the infinite loop issue. The cloned waker still wakes the same underlying task, so besides fixing the infinite loop, all other behavior should remain the same. Reviewed By: mengfei1026 Differential Revision: D84855880 fbshipit-source-id: a8b1f4135c587990ffc488ee6b369bfdc821ba35
1 parent 7043fee commit a40d301

File tree

1 file changed

+67
-32
lines changed
  • shed/tokio-uds-compat/src

1 file changed

+67
-32
lines changed

shed/tokio-uds-compat/src/lib.rs

Lines changed: 67 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ mod windows {
3434
use tokio::io::AsyncWrite;
3535
use tokio::io::ReadBuf;
3636

37+
/// Helper function to prevent vtable mismatches in optimized builds by cloning
38+
/// the waker at the async-io runtime boundary. This ensures consistent waker
39+
/// identity across cross-runtime calls.
40+
fn with_cloned_waker<T>(cx: &Context<'_>, f: impl FnOnce(&mut Context<'_>) -> T) -> T {
41+
let cloned_waker = cx.waker().clone();
42+
let mut preserving_cx = Context::from_waker(&cloned_waker);
43+
f(&mut preserving_cx)
44+
}
45+
3746
pub struct OwnedReadHalf {
3847
inner: Arc<UnixStream>,
3948
}
@@ -82,18 +91,34 @@ mod windows {
8291
cx: &mut Context<'_>,
8392
buf: &[u8],
8493
) -> Poll<Result<usize, io::Error>> {
85-
futures::AsyncWrite::poll_write(Pin::new(&mut self.inner.async_ref()), cx, buf)
94+
with_cloned_waker(cx, |preserving_cx| {
95+
futures::AsyncWrite::poll_write(
96+
Pin::new(&mut self.inner.async_ref()),
97+
preserving_cx,
98+
buf,
99+
)
100+
})
86101
}
87102

88103
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
89-
futures::AsyncWrite::poll_flush(Pin::new(&mut self.inner.async_ref()), cx)
104+
with_cloned_waker(cx, |preserving_cx| {
105+
futures::AsyncWrite::poll_flush(
106+
Pin::new(&mut self.inner.async_ref()),
107+
preserving_cx,
108+
)
109+
})
90110
}
91111

92112
fn poll_shutdown(
93113
self: Pin<&mut Self>,
94114
cx: &mut Context<'_>,
95115
) -> Poll<Result<(), io::Error>> {
96-
futures::AsyncWrite::poll_close(Pin::new(&mut self.inner.async_ref()), cx)
116+
with_cloned_waker(cx, |preserving_cx| {
117+
futures::AsyncWrite::poll_close(
118+
Pin::new(&mut self.inner.async_ref()),
119+
preserving_cx,
120+
)
121+
})
97122
}
98123
}
99124

@@ -130,21 +155,23 @@ mod windows {
130155
cx: &mut Context<'_>,
131156
buf: &mut ReadBuf<'_>,
132157
) -> Poll<Result<(), io::Error>> {
133-
let result = futures::AsyncRead::poll_read(
134-
Pin::new(&mut self.async_ref()),
135-
cx,
136-
buf.initialize_unfilled(),
137-
);
138-
139-
match result {
140-
Poll::Ready(Ok(written)) => {
141-
tracing::trace!(?written, "UnixStream::poll_read");
142-
buf.set_filled(written);
143-
Poll::Ready(Ok(()))
158+
with_cloned_waker(cx, |preserving_cx| {
159+
let result = futures::AsyncRead::poll_read(
160+
Pin::new(&mut self.async_ref()),
161+
preserving_cx,
162+
buf.initialize_unfilled(),
163+
);
164+
165+
match result {
166+
Poll::Ready(Ok(written)) => {
167+
tracing::trace!(?written, "UnixStream::poll_read");
168+
buf.set_filled(written);
169+
Poll::Ready(Ok(()))
170+
}
171+
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
172+
Poll::Pending => Poll::Pending,
144173
}
145-
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
146-
Poll::Pending => Poll::Pending,
147-
}
174+
})
148175
}
149176
}
150177

@@ -164,18 +191,24 @@ mod windows {
164191
cx: &mut Context<'_>,
165192
buf: &[u8],
166193
) -> Poll<Result<usize, io::Error>> {
167-
futures::AsyncWrite::poll_write(self.inner_mut(), cx, buf)
194+
with_cloned_waker(cx, |preserving_cx| {
195+
futures::AsyncWrite::poll_write(self.inner_mut(), preserving_cx, buf)
196+
})
168197
}
169198

170199
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
171-
futures::AsyncWrite::poll_flush(self.inner_mut(), cx)
200+
with_cloned_waker(cx, |preserving_cx| {
201+
futures::AsyncWrite::poll_flush(self.inner_mut(), preserving_cx)
202+
})
172203
}
173204

174205
fn poll_shutdown(
175206
self: Pin<&mut Self>,
176207
cx: &mut Context<'_>,
177208
) -> Poll<Result<(), io::Error>> {
178-
futures::AsyncWrite::poll_close(self.inner_mut(), cx)
209+
with_cloned_waker(cx, |preserving_cx| {
210+
futures::AsyncWrite::poll_close(self.inner_mut(), preserving_cx)
211+
})
179212
}
180213
}
181214

@@ -198,20 +231,22 @@ mod windows {
198231
&self,
199232
cx: &mut Context<'_>,
200233
) -> Poll<io::Result<(UnixStream, uds_windows::SocketAddr)>> {
201-
match self.0.poll_readable(cx) {
202-
Poll::Ready(Ok(())) => {
203-
let result = self.0.read_with(|io| io.accept());
204-
let mut result = Box::pin(result);
205-
result.as_mut().poll(cx).map(|x| {
206-
x.and_then(|(stream, addr)| {
207-
let stream = UnixStream::from_std(stream)?;
208-
Ok((stream, addr))
234+
with_cloned_waker(cx, |preserving_cx| {
235+
match self.0.poll_readable(preserving_cx) {
236+
Poll::Ready(Ok(())) => {
237+
let result = self.0.read_with(|io| io.accept());
238+
let mut result = Box::pin(result);
239+
result.as_mut().poll(preserving_cx).map(|x| {
240+
x.and_then(|(stream, addr)| {
241+
let stream = UnixStream::from_std(stream)?;
242+
Ok((stream, addr))
243+
})
209244
})
210-
})
245+
}
246+
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
247+
Poll::Pending => Poll::Pending,
211248
}
212-
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
213-
Poll::Pending => Poll::Pending,
214-
}
249+
})
215250
}
216251
}
217252
}

0 commit comments

Comments
 (0)