Skip to content

Commit c9621c9

Browse files
mtjhrcslp
authored andcommitted
utils: Introduce pollable_channel()
This is similar concept to the mpsc::channel but uses an EventFd to notify the reader when data is availible. This seems like it could be generally useful, when we need to use Epoll to also wait for other file descriptor events. This is just a simple implementation using EventFd and could maybe be optimized. (maybe even using a unix pipe directly) Signed-off-by: Matej Hrica <[email protected]>
1 parent 9b8e9eb commit c9621c9

File tree

2 files changed

+73
-0
lines changed

2 files changed

+73
-0
lines changed

src/utils/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub mod macos;
1616
pub use macos::epoll;
1717
#[cfg(target_os = "macos")]
1818
pub use macos::eventfd;
19+
pub mod pollable_channel;
1920
pub mod rand;
2021
#[cfg(target_os = "linux")]
2122
pub mod signal;

src/utils/src/pollable_channel.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use crate::eventfd::{EventFd, EFD_NONBLOCK, EFD_SEMAPHORE};
2+
use std::collections::VecDeque;
3+
use std::io;
4+
use std::io::ErrorKind;
5+
use std::os::fd::{AsRawFd, RawFd};
6+
use std::sync::{Arc, Mutex};
7+
8+
/// A multiple producer single consumer channel that can be listened to by a file descriptor
9+
pub fn pollable_channel<T: Send>(
10+
) -> io::Result<(PollableChannelSender<T>, PollableChannelReciever<T>)> {
11+
let eventfd = EventFd::new(EFD_NONBLOCK | EFD_SEMAPHORE)?;
12+
13+
let inner = Arc::new(Inner {
14+
eventfd,
15+
queue: Mutex::new(VecDeque::new()),
16+
});
17+
let tx = PollableChannelSender {
18+
inner: inner.clone(),
19+
};
20+
let rx = PollableChannelReciever { inner };
21+
Ok((tx, rx))
22+
}
23+
24+
struct Inner<T: Send> {
25+
eventfd: EventFd,
26+
queue: Mutex<VecDeque<T>>,
27+
}
28+
29+
#[derive(Clone)]
30+
pub struct PollableChannelSender<T: Send> {
31+
inner: Arc<Inner<T>>,
32+
}
33+
34+
impl<T: Send> PollableChannelSender<T> {
35+
pub fn send(&self, msg: T) -> io::Result<()> {
36+
let mut data_lock = self.inner.queue.lock().unwrap();
37+
data_lock.push_back(msg);
38+
self.inner.eventfd.write(1)?;
39+
Ok(())
40+
}
41+
}
42+
43+
pub struct PollableChannelReciever<T: Send> {
44+
inner: Arc<Inner<T>>,
45+
}
46+
47+
impl<T: Send> PollableChannelReciever<T> {
48+
pub fn try_recv(&self) -> io::Result<Option<T>> {
49+
let mut data_lock = self.inner.queue.lock().unwrap();
50+
match self.inner.eventfd.read() {
51+
Ok(_) => (),
52+
Err(e) if e.kind() == ErrorKind::WouldBlock => (),
53+
Err(e) => return Err(e),
54+
}
55+
56+
Ok(data_lock.pop_back())
57+
}
58+
59+
pub fn len(&self) -> usize {
60+
self.inner.queue.lock().unwrap().len()
61+
}
62+
63+
pub fn is_empty(&self) -> bool {
64+
self.inner.queue.lock().unwrap().is_empty()
65+
}
66+
}
67+
68+
impl<T: Send> AsRawFd for PollableChannelReciever<T> {
69+
fn as_raw_fd(&self) -> RawFd {
70+
self.inner.eventfd.as_raw_fd()
71+
}
72+
}

0 commit comments

Comments
 (0)