Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 81 additions & 34 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use core::task::Waker;

use std::io::{self, ErrorKind};
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
use std::sync::MutexGuard;

use enumset::{EnumSet, EnumSetType};

Expand All @@ -13,9 +14,13 @@ use libc as sys;

use crate::{syscall, syscall_los, syscall_los_eagain};

// For ESP-IDF sys::FDSETSIZE is currently wrongly set to 1024 in the `libc` crate
// Therefore, use a custom value for now
#[cfg(target_os = "espidf")]
const MAX_REGISTRATIONS: usize = 20;

//const FD_SEGMENT: usize = sys::FD_SETSIZE / core::mem::size_of::<sys::fd_set>();
#[cfg(not(target_os = "espidf"))]
const MAX_REGISTRATIONS: usize = sys::FD_SETSIZE;

#[derive(EnumSetType, Debug)]
pub(crate) enum Event {
Expand Down Expand Up @@ -82,13 +87,15 @@ struct Registration {
struct Registrations<const N: usize> {
vec: heapless::Vec<Registration, N>,
event_fd: Option<OwnedFd>,
waiting: usize,
}

impl<const N: usize> Registrations<N> {
const fn new() -> Self {
Self {
vec: heapless::Vec::new(),
event_fd: None,
waiting: 0,
}
}

Expand All @@ -103,7 +110,7 @@ impl<const N: usize> Registrations<N> {
Err(ErrorKind::InvalidInput)?;
}

if fd >= sys::FD_SETSIZE as RawFd {
if fd >= sys::FD_SETSIZE as RawFd || fd >= N as RawFd {
Err(ErrorKind::OutOfMemory)?;
}

Expand All @@ -129,8 +136,6 @@ impl<const N: usize> Registrations<N> {

self.vec.swap_remove(index);

self.notify()?;

Ok(())
}

Expand All @@ -147,8 +152,6 @@ impl<const N: usize> Registrations<N> {
}
}

self.notify()?;

Ok(())
}

Expand Down Expand Up @@ -312,13 +315,15 @@ impl<const N: usize> Registrations<N> {

pub struct Reactor<const N: usize> {
registrations: std::sync::Mutex<Registrations<N>>,
condvar: std::sync::Condvar,
started: AtomicBool,
}

impl<const N: usize> Reactor<N> {
const fn new() -> Self {
Self {
registrations: std::sync::Mutex::new(Registrations::new()),
condvar: std::sync::Condvar::new(),
started: AtomicBool::new(false),
}
}
Expand All @@ -340,23 +345,23 @@ impl<const N: usize> Reactor<N> {
}

pub(crate) fn register(&self, fd: RawFd) -> io::Result<()> {
self.lock(|regs| regs.register(fd))
self.modify(|regs| regs.register(fd))
}

pub(crate) fn deregister(&self, fd: RawFd) -> io::Result<()> {
self.lock(|regs| regs.deregister(fd))
self.modify(|regs| regs.deregister(fd))
}

// pub(crate) fn set(&self, fd: RawFd, event: Event, waker: &Waker) -> io::Result<()> {
// self.lock(|regs| regs.set(fd, event, waker))
// }

pub(crate) fn fetch(&self, fd: RawFd, event: Event) -> io::Result<bool> {
self.lock(|regs| regs.fetch(fd, event))
self.modify(|regs| regs.fetch(fd, event))
}

pub(crate) fn fetch_or_set(&self, fd: RawFd, event: Event, waker: &Waker) -> io::Result<bool> {
self.lock(|regs| {
self.modify(|regs| {
if regs.fetch(fd, event)? {
Ok(true)
} else {
Expand All @@ -368,58 +373,100 @@ impl<const N: usize> Reactor<N> {
}

fn run(&self) -> io::Result<()> {
if !self.lock(Registrations::create_notification)? {
if !self.lock(|mut guard| guard.create_notification())? {
Err(ErrorKind::AlreadyExists)?;
}

debug!("Running");

let mut fds = Fds::new();
let mut update = false;

let result = loop {
let result = self.wait();
let max = self.apply(|inner| {
if !update {
update = true;
} else {
inner.update_events(&fds)?;
}

inner.set_fds(&mut fds)
});

let result = match max {
Err(err) => Err(err),
Ok(None) => unreachable!("EventFD is not there?"),
Ok(Some(max)) => {
debug!("Start select");

let result = syscall_los!(unsafe {
sys::select(
max + 1,
fds.read.assume_init_mut(),
fds.write.assume_init_mut(),
fds.except.assume_init_mut(),
core::ptr::null_mut(),
)
});

debug!("End select");

result.map(|_| ())
}
};

if result.is_err() {
break result;
}
};

if !self.lock(Registrations::destroy_notification)? {
if !self.lock(|mut guard| guard.destroy_notification())? {
Err(ErrorKind::NotFound)?;
}

result
}

fn wait(&self) -> io::Result<()> {
let mut fds = Fds::new();

if let Some(max) = self.lock(|inner| inner.set_fds(&mut fds))? {
debug!("Start select");
fn modify<F, R>(&self, f: F) -> io::Result<R>
where
F: FnOnce(&mut Registrations<N>) -> io::Result<R>,
{
self.lock(|mut guard| {
guard.waiting += 1;

syscall_los!(unsafe {
sys::select(
max + 1,
fds.read.assume_init_mut(),
fds.write.assume_init_mut(),
fds.except.assume_init_mut(),
core::ptr::null_mut(),
)
})?;
let result = f(&mut guard);

debug!("End select");
guard.notify()?;

self.lock(|inner| inner.update_events(&fds))?;
}
let _guard = self
.condvar
.wait_while(guard, |registrations| registrations.waiting > 0)
.unwrap();

Ok(())
result
})
}

fn lock<F, R>(&self, f: F) -> io::Result<R>
fn apply<F, R>(&self, f: F) -> io::Result<R>
where
F: FnOnce(&mut Registrations<N>) -> io::Result<R>,
{
let mut inner = self.registrations.lock().unwrap();
self.lock(|mut guard| {
let result = f(&mut guard);

guard.waiting = 0;

f(&mut inner)
self.condvar.notify_all();

result
})
}

fn lock<F, R>(&self, f: F) -> io::Result<R>
where
F: FnOnce(MutexGuard<Registrations<N>>) -> io::Result<R>,
{
f(self.registrations.lock().unwrap())
}
}

Expand Down
Loading