Skip to content

Commit 94f2b9c

Browse files
Arshia001theduke
authored andcommitted
Rework how pipes and event FDs interact with the epoll system
1 parent 32bc8ce commit 94f2b9c

File tree

8 files changed

+131
-169
lines changed

8 files changed

+131
-169
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/virtual-fs/Cargo.toml

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ rust-version.workspace = true
1111

1212
[dependencies]
1313
wasmer-package.workspace = true
14+
virtual-mio = { path = "../virtual-io", version = "0.601.0-rc.2", default-features = false }
1415
dashmap.workspace = true
1516
derive_more.workspace = true
1617
dunce = "1.0.4"
@@ -40,7 +41,7 @@ serde = { workspace = true, default-features = false, features = [
4041
"derive",
4142
], optional = true }
4243
getrandom.workspace = true
43-
web-time = { version = "1.1", optional = true}
44+
web-time = { version = "1.1", optional = true }
4445

4546
[dev-dependencies]
4647
pretty_assertions.workspace = true
@@ -64,10 +65,7 @@ host-fs = [
6465
webc-fs = ["webc", "anyhow"]
6566
static-fs = ["webc", "anyhow"]
6667
enable-serde = ["typetag", "serde"]
67-
js = [
68-
"dep:web-time",
69-
"getrandom/js",
70-
]
68+
js = ["dep:web-time", "getrandom/js"]
7169
# Enables memory tracking/limiting functionality for the in-memory filesystem.
7270
tracking = []
7371
futures = []

lib/virtual-fs/src/pipe.rs

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
use bytes::{Buf, Bytes};
2-
use std::io::{self, Read, Seek, SeekFrom};
32
use std::pin::Pin;
43
use std::sync::Arc;
54
use std::sync::Mutex;
65
use std::task::Context;
76
use std::task::Poll;
87
use std::{io::IoSlice, sync::MutexGuard};
8+
use std::{
9+
io::{self, Read, Seek, SeekFrom},
10+
sync::Weak,
11+
};
912
use tokio::sync::mpsc;
1013
use tokio::{
1114
io::{AsyncRead, AsyncSeek, AsyncWrite},
1215
sync::mpsc::error::TryRecvError,
1316
};
17+
use virtual_mio::{InterestHandler, InterestType};
1418

1519
use crate::{ArcFile, FsError, VirtualFile};
1620

@@ -31,6 +35,7 @@ pub struct Pipe {
3135
pub struct PipeTx {
3236
/// Sends bytes down the pipe
3337
tx: Option<mpsc::UnboundedSender<Vec<u8>>>,
38+
rx_end: Weak<Mutex<PipeReceiver>>,
3439
}
3540

3641
#[derive(Debug, Clone)]
@@ -124,6 +129,22 @@ impl PipeRx {
124129
rx.buffer.replace(Bytes::from(data));
125130
}
126131
}
132+
133+
pub fn set_interest_handler(&mut self, interest_handler: Box<dyn InterestHandler>) {
134+
let Some(ref rx) = self.rx else {
135+
return;
136+
};
137+
let mut rx = rx.lock().unwrap();
138+
rx.interest_handler.replace(interest_handler);
139+
}
140+
141+
pub fn remove_interest_handler(&mut self) -> Option<Box<dyn InterestHandler>> {
142+
let Some(ref rx) = self.rx else {
143+
return None;
144+
};
145+
let mut rx = rx.lock().unwrap();
146+
rx.interest_handler.take()
147+
}
127148
}
128149

129150
#[derive(Debug)]
@@ -132,20 +153,24 @@ struct PipeReceiver {
132153
// actual receiver, we can't make use of an mpmc channel
133154
chan: mpsc::UnboundedReceiver<Vec<u8>>,
134155
buffer: Option<Bytes>,
156+
interest_handler: Option<Box<dyn InterestHandler>>,
135157
}
136158

137159
impl Pipe {
138160
pub fn new() -> Self {
139161
let (tx, rx) = mpsc::unbounded_channel();
140162

163+
let recv = Arc::new(Mutex::new(PipeReceiver {
164+
chan: rx,
165+
buffer: None,
166+
interest_handler: None,
167+
}));
141168
Pipe {
142-
send: PipeTx { tx: Some(tx) },
143-
recv: PipeRx {
144-
rx: Some(Arc::new(Mutex::new(PipeReceiver {
145-
chan: rx,
146-
buffer: None,
147-
}))),
169+
send: PipeTx {
170+
tx: Some(tx),
171+
rx_end: Arc::downgrade(&recv),
148172
},
173+
recv: PipeRx { rx: Some(recv) },
149174
}
150175
}
151176

@@ -174,23 +199,19 @@ impl Pipe {
174199
self.send.close();
175200
self.recv.close();
176201
}
177-
}
178202

179-
impl Default for Pipe {
180-
fn default() -> Self {
181-
Self::new()
203+
pub fn set_interest_handler(&mut self, interest_handler: Box<dyn InterestHandler>) {
204+
self.recv.set_interest_handler(interest_handler);
182205
}
183-
}
184206

185-
impl From<Pipe> for PipeTx {
186-
fn from(val: Pipe) -> Self {
187-
val.send
207+
pub fn remove_interest_handler(&mut self) -> Option<Box<dyn InterestHandler>> {
208+
self.recv.remove_interest_handler()
188209
}
189210
}
190211

191-
impl From<Pipe> for PipeRx {
192-
fn from(val: Pipe) -> Self {
193-
val.recv
212+
impl Default for Pipe {
213+
fn default() -> Self {
214+
Self::new()
194215
}
195216
}
196217

@@ -213,6 +234,15 @@ impl PipeTx {
213234
Poll::Ready(Ok(8192))
214235
}
215236
}
237+
238+
fn push_readable_interest_to_rx_end(&self) {
239+
if let Some(rx_end) = self.rx_end.upgrade() {
240+
let mut guard = rx_end.lock().unwrap();
241+
if let Some(interest_handler) = guard.interest_handler.as_mut() {
242+
interest_handler.push_interest(InterestType::Readable);
243+
}
244+
}
245+
}
216246
}
217247

218248
impl Seek for Pipe {
@@ -302,6 +332,7 @@ impl std::io::Write for PipeTx {
302332

303333
tx.send(buf.to_vec())
304334
.map_err(|_| Into::<std::io::Error>::into(std::io::ErrorKind::BrokenPipe))?;
335+
self.push_readable_interest_to_rx_end();
305336
Ok(buf.len())
306337
}
307338

@@ -387,7 +418,10 @@ impl AsyncWrite for PipeTx {
387418
};
388419

389420
match tx.send(buf.to_vec()) {
390-
Ok(()) => Poll::Ready(Ok(buf.len())),
421+
Ok(()) => {
422+
self.push_readable_interest_to_rx_end();
423+
Poll::Ready(Ok(buf.len()))
424+
}
391425
Err(_) => Poll::Ready(Err(Into::<std::io::Error>::into(
392426
std::io::ErrorKind::BrokenPipe,
393427
))),

lib/wasix/src/fs/fd.rs

Lines changed: 22 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,19 @@ use std::{
22
borrow::Cow,
33
collections::{HashMap, HashSet},
44
path::PathBuf,
5-
pin::Pin,
65
sync::{atomic::AtomicU64, Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
7-
task::Context,
86
};
97

10-
use futures::Future;
118
use serde_derive::{Deserialize, Serialize};
129
use std::sync::Mutex as StdMutex;
1310
use tokio::sync::{watch, Mutex as AsyncMutex};
1411
use virtual_fs::{Pipe, PipeRx, PipeTx, VirtualFile};
1512
use wasmer_wasix_types::wasi::{EpollType, Fd as WasiFd, Fdflags, Fdflagsext, Filestat, Rights};
1613

17-
use crate::{net::socket::InodeSocket, syscalls::EpollJoinWaker};
14+
use crate::net::socket::InodeSocket;
1815

1916
use super::{
20-
InodeGuard, InodeValFilePollGuard, InodeValFilePollGuardJoin, InodeValFilePollGuardMode,
21-
InodeWeakGuard, NotificationInner,
17+
InodeGuard, InodeValFilePollGuard, InodeValFilePollGuardMode, InodeWeakGuard, NotificationInner,
2218
};
2319

2420
#[derive(Debug, Clone)]
@@ -111,48 +107,32 @@ pub struct EpollInterest {
111107

112108
/// Guard the cleans up the selector registrations
113109
#[derive(Debug)]
114-
pub enum EpollJoinGuard {
115-
Join {
116-
join_guard: InodeValFilePollGuardJoin,
117-
epoll_waker: Arc<EpollJoinWaker>,
118-
},
119-
Handler {
120-
fd_guard: InodeValFilePollGuard,
121-
},
110+
pub struct EpollJoinGuard {
111+
pub(crate) fd_guard: InodeValFilePollGuard,
122112
}
123113
impl Drop for EpollJoinGuard {
124114
fn drop(&mut self) {
125-
if let Self::Handler { fd_guard, .. } = self {
126-
if let InodeValFilePollGuardMode::Socket { inner } = &mut fd_guard.mode {
115+
match &self.fd_guard.mode {
116+
InodeValFilePollGuardMode::File(_) => {
117+
// Intentionally ignored, epoll doesn't work with files
118+
}
119+
InodeValFilePollGuardMode::Socket { inner } => {
127120
let mut inner = inner.protected.write().unwrap();
128121
inner.remove_handler();
129122
}
130-
}
131-
}
132-
}
133-
impl EpollJoinGuard {
134-
pub fn is_spent(&self) -> bool {
135-
match self {
136-
Self::Join { join_guard, .. } => join_guard.is_spent(),
137-
Self::Handler { .. } => false,
138-
}
139-
}
140-
pub fn renew(&mut self) {
141-
if let Self::Join {
142-
join_guard,
143-
epoll_waker,
144-
} = self
145-
{
146-
let fd = join_guard.fd();
147-
join_guard.reset();
148-
149-
let waker = epoll_waker.as_waker();
150-
let mut cx = Context::from_waker(&waker);
151-
if Pin::new(join_guard).poll(&mut cx).is_ready() {
152-
tracing::trace!(fd, "join renew already woken");
153-
waker.wake();
154-
} else {
155-
tracing::trace!(fd, "join waker reinstalled");
123+
InodeValFilePollGuardMode::EventNotifications(inner) => {
124+
inner.remove_interest_handler();
125+
}
126+
InodeValFilePollGuardMode::DuplexPipe { pipe } => {
127+
let mut inner = pipe.write().unwrap();
128+
inner.remove_interest_handler();
129+
}
130+
InodeValFilePollGuardMode::PipeRx { rx } => {
131+
let mut inner = rx.write().unwrap();
132+
inner.remove_interest_handler();
133+
}
134+
InodeValFilePollGuardMode::PipeTx { .. } => {
135+
// Intentionally ignored, the sending end of a pipe can't have an interest handler
156136
}
157137
}
158138
}

lib/wasix/src/fs/inode_guard.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,6 @@ pub struct InodeValFilePollGuardJoin {
134134
fd: u32,
135135
peb: PollEventSet,
136136
subscription: Subscription,
137-
spent: bool,
138137
}
139138

140139
impl InodeValFilePollGuardJoin {
@@ -144,7 +143,6 @@ impl InodeValFilePollGuardJoin {
144143
fd: guard.fd,
145144
peb: guard.peb,
146145
subscription: guard.subscription,
147-
spent: false,
148146
}
149147
}
150148
pub(crate) fn fd(&self) -> u32 {
@@ -153,22 +151,6 @@ impl InodeValFilePollGuardJoin {
153151
pub(crate) fn peb(&self) -> PollEventSet {
154152
self.peb
155153
}
156-
pub fn is_spent(&self) -> bool {
157-
self.spent
158-
}
159-
pub fn reset(&mut self) {
160-
match &self.mode {
161-
InodeValFilePollGuardMode::File(_) => {}
162-
InodeValFilePollGuardMode::EventNotifications(inner) => {
163-
inner.reset();
164-
}
165-
InodeValFilePollGuardMode::Socket { .. }
166-
| InodeValFilePollGuardMode::PipeRx { .. }
167-
| InodeValFilePollGuardMode::PipeTx { .. }
168-
| InodeValFilePollGuardMode::DuplexPipe { .. } => {}
169-
}
170-
self.spent = false;
171-
}
172154
}
173155

174156
pub const POLL_GUARD_MAX_RET: usize = 4;
@@ -403,7 +385,6 @@ impl Future for InodeValFilePollGuardJoin {
403385
};
404386
}
405387
if !ret.is_empty() {
406-
self.spent = true;
407388
return Poll::Ready(ret);
408389
}
409390
Poll::Pending

lib/wasix/src/fs/notification.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use std::{
44
task::{Poll, Waker},
55
};
66

7+
use virtual_mio::{InterestHandler, InterestType};
8+
79
#[derive(Debug)]
810
#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
911
struct NotificationState {
@@ -17,6 +19,9 @@ struct NotificationState {
1719
/// All the registered wakers
1820
#[cfg_attr(feature = "enable-serde", serde(skip))]
1921
wakers: VecDeque<Waker>,
22+
/// InterestHandler for use with epoll
23+
#[cfg_attr(feature = "enable-serde", serde(skip))]
24+
interest_handler: Option<Box<dyn InterestHandler>>,
2025
}
2126

2227
impl NotificationState {
@@ -31,6 +36,9 @@ impl NotificationState {
3136
while let Some(waker) = self.wakers.pop_front() {
3237
waker.wake();
3338
}
39+
if let Some(handler) = self.interest_handler.as_mut() {
40+
handler.push_interest(InterestType::Readable);
41+
}
3442
}
3543

3644
fn inc(&mut self, val: u64) {
@@ -70,6 +78,7 @@ impl NotificationInner {
7078
last_poll: u64::MAX,
7179
is_semaphore,
7280
wakers: Default::default(),
81+
interest_handler: None,
7382
}),
7483
}
7584
}
@@ -111,4 +120,14 @@ impl NotificationInner {
111120
let mut state = self.state.lock().unwrap();
112121
state.last_poll = u64::MAX;
113122
}
123+
124+
pub fn set_interest_handler(&self, handler: Box<dyn InterestHandler>) {
125+
let mut state = self.state.lock().unwrap();
126+
state.interest_handler.replace(handler);
127+
}
128+
129+
pub fn remove_interest_handler(&self) -> Option<Box<dyn InterestHandler>> {
130+
let mut state = self.state.lock().unwrap();
131+
state.interest_handler.take()
132+
}
114133
}

0 commit comments

Comments
 (0)