From ee96ae263ab0333c15aee7e4f4b8515269191223 Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Thu, 8 Aug 2024 07:24:46 +0000 Subject: [PATCH 1/5] Bugfix: wait for the reactor to confirm the modification --- src/reactor.rs | 107 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 75 insertions(+), 32 deletions(-) diff --git a/src/reactor.rs b/src/reactor.rs index ec604f7..7efdc2e 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -4,6 +4,7 @@ use core::task::Waker; use std::io::{self, ErrorKind}; use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}; +use std::sync::MutexGuard; use enumset::{EnumSet, EnumSetType}; @@ -82,6 +83,7 @@ struct Registration { struct Registrations { vec: heapless::Vec, event_fd: Option, + waiting: usize, } impl Registrations { @@ -89,6 +91,7 @@ impl Registrations { Self { vec: heapless::Vec::new(), event_fd: None, + waiting: 0, } } @@ -129,8 +132,6 @@ impl Registrations { self.vec.swap_remove(index); - self.notify()?; - Ok(()) } @@ -147,8 +148,6 @@ impl Registrations { } } - self.notify()?; - Ok(()) } @@ -312,6 +311,7 @@ impl Registrations { pub struct Reactor { registrations: std::sync::Mutex>, + condvar: std::sync::Condvar, started: AtomicBool, } @@ -319,6 +319,7 @@ impl Reactor { const fn new() -> Self { Self { registrations: std::sync::Mutex::new(Registrations::new()), + condvar: std::sync::Condvar::new(), started: AtomicBool::new(false), } } @@ -340,11 +341,11 @@ impl Reactor { } pub(crate) fn register(&self, fd: RawFd) -> io::Result<()> { - self.lock(|regs| regs.register(fd)) + self.lock_modify(|regs| regs.register(fd)) } pub(crate) fn deregister(&self, fd: RawFd) -> io::Result<()> { - self.lock(|regs| regs.deregister(fd)) + self.lock_modify(|regs| regs.deregister(fd)) } // pub(crate) fn set(&self, fd: RawFd, event: Event, waker: &Waker) -> io::Result<()> { @@ -352,11 +353,11 @@ impl Reactor { // } pub(crate) fn fetch(&self, fd: RawFd, event: Event) -> io::Result { - self.lock(|regs| regs.fetch(fd, event)) + self.lock_modify(|regs| regs.fetch(fd, event)) } pub(crate) fn fetch_or_set(&self, fd: RawFd, event: Event, waker: &Waker) -> io::Result { - self.lock(|regs| { + self.lock_modify(|regs| { if regs.fetch(fd, event)? { Ok(true) } else { @@ -368,58 +369,100 @@ impl Reactor { } fn run(&self) -> io::Result<()> { - if !self.lock(Registrations::create_notification)? { + if !self.lock(|mut guard| guard.create_notification())? { Err(ErrorKind::AlreadyExists)?; } debug!("Running"); + let mut fds = Fds::new(); + let mut update = false; + let result = loop { - let result = self.wait(); + let max = self.lock_apply(|inner| { + if !update { + update = true; + } else { + inner.update_events(&fds)?; + } + + inner.set_fds(&mut fds) + }); + + let result = match max { + Err(err) => Err(err), + Ok(None) => unreachable!("EventFD is not there?"), + Ok(Some(max)) => { + debug!("Start select"); + + let result = syscall_los!(unsafe { + sys::select( + max + 1, + fds.read.assume_init_mut(), + fds.write.assume_init_mut(), + fds.except.assume_init_mut(), + core::ptr::null_mut(), + ) + }); + + debug!("End select"); + + result.map(|_| ()) + } + }; if result.is_err() { break result; } }; - if !self.lock(Registrations::destroy_notification)? { + if !self.lock(|mut guard| guard.destroy_notification())? { Err(ErrorKind::NotFound)?; } result } - fn wait(&self) -> io::Result<()> { - let mut fds = Fds::new(); - - if let Some(max) = self.lock(|inner| inner.set_fds(&mut fds))? { - debug!("Start select"); + fn lock_modify(&self, f: F) -> io::Result + where + F: FnOnce(&mut Registrations) -> io::Result, + { + self.lock(|mut guard| { + guard.waiting += 1; - syscall_los!(unsafe { - sys::select( - max + 1, - fds.read.assume_init_mut(), - fds.write.assume_init_mut(), - fds.except.assume_init_mut(), - core::ptr::null_mut(), - ) - })?; + let result = f(&mut guard); - debug!("End select"); + guard.notify()?; - self.lock(|inner| inner.update_events(&fds))?; - } + let _guard = self + .condvar + .wait_while(guard, |registrations| registrations.waiting > 0) + .unwrap(); - Ok(()) + result + }) } - fn lock(&self, f: F) -> io::Result + fn lock_apply(&self, f: F) -> io::Result where F: FnOnce(&mut Registrations) -> io::Result, { - let mut inner = self.registrations.lock().unwrap(); + self.lock(|mut guard| { + let result = f(&mut guard); + + guard.waiting = 0; - f(&mut inner) + self.condvar.notify_all(); + + result + }) + } + + fn lock(&self, f: F) -> io::Result + where + F: FnOnce(MutexGuard>) -> io::Result, + { + f(self.registrations.lock().unwrap()) } } From 65c1e17eb442e175e57ba55ca28393993818fa47 Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Thu, 8 Aug 2024 20:25:19 +0000 Subject: [PATCH 2/5] Report out of memory on FDs bigger than N --- src/reactor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/reactor.rs b/src/reactor.rs index 7efdc2e..90f8383 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -106,7 +106,7 @@ impl Registrations { Err(ErrorKind::InvalidInput)?; } - if fd >= sys::FD_SETSIZE as RawFd { + if fd >= sys::FD_SETSIZE as RawFd || fd >= N as RawFd { Err(ErrorKind::OutOfMemory)?; } From e545aa950952c5d036c194a7093d8682cae606d9 Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Thu, 8 Aug 2024 20:36:38 +0000 Subject: [PATCH 3/5] Un-hardcode MAX_REGISTRATIONS on regular OSes --- src/reactor.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/reactor.rs b/src/reactor.rs index 90f8383..03bba4c 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -14,9 +14,13 @@ use libc as sys; use crate::{syscall, syscall_los, syscall_los_eagain}; +// For ESP-IDF sys::FDSETSIZE is currently wrongly set to 1024 in the `libc` crate +// Therefore, use a custom value for now +#[cfg(target_os = "espidf")] const MAX_REGISTRATIONS: usize = 20; -//const FD_SEGMENT: usize = sys::FD_SETSIZE / core::mem::size_of::(); +#[cfg(not(target_os = "espidf"))] +const MAX_REGISTRATIONS: usize = sys::FD_SETSIZE as usize; #[derive(EnumSetType, Debug)] pub(crate) enum Event { From bf67584a3a44d7b24ea7f9f9b48a097c8e0466ce Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Thu, 8 Aug 2024 20:38:18 +0000 Subject: [PATCH 4/5] Clippy --- src/reactor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/reactor.rs b/src/reactor.rs index 03bba4c..6ca9538 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -20,7 +20,7 @@ use crate::{syscall, syscall_los, syscall_los_eagain}; const MAX_REGISTRATIONS: usize = 20; #[cfg(not(target_os = "espidf"))] -const MAX_REGISTRATIONS: usize = sys::FD_SETSIZE as usize; +const MAX_REGISTRATIONS: usize = sys::FD_SETSIZE; #[derive(EnumSetType, Debug)] pub(crate) enum Event { From 737c07ba64b23bb143ee883909e1e2cb7638538f Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Fri, 9 Aug 2024 06:24:04 +0000 Subject: [PATCH 5/5] Better method names --- src/reactor.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/reactor.rs b/src/reactor.rs index 6ca9538..6ccbd97 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -345,11 +345,11 @@ impl Reactor { } pub(crate) fn register(&self, fd: RawFd) -> io::Result<()> { - self.lock_modify(|regs| regs.register(fd)) + self.modify(|regs| regs.register(fd)) } pub(crate) fn deregister(&self, fd: RawFd) -> io::Result<()> { - self.lock_modify(|regs| regs.deregister(fd)) + self.modify(|regs| regs.deregister(fd)) } // pub(crate) fn set(&self, fd: RawFd, event: Event, waker: &Waker) -> io::Result<()> { @@ -357,11 +357,11 @@ impl Reactor { // } pub(crate) fn fetch(&self, fd: RawFd, event: Event) -> io::Result { - self.lock_modify(|regs| regs.fetch(fd, event)) + self.modify(|regs| regs.fetch(fd, event)) } pub(crate) fn fetch_or_set(&self, fd: RawFd, event: Event, waker: &Waker) -> io::Result { - self.lock_modify(|regs| { + self.modify(|regs| { if regs.fetch(fd, event)? { Ok(true) } else { @@ -383,7 +383,7 @@ impl Reactor { let mut update = false; let result = loop { - let max = self.lock_apply(|inner| { + let max = self.apply(|inner| { if !update { update = true; } else { @@ -427,7 +427,7 @@ impl Reactor { result } - fn lock_modify(&self, f: F) -> io::Result + fn modify(&self, f: F) -> io::Result where F: FnOnce(&mut Registrations) -> io::Result, { @@ -447,7 +447,7 @@ impl Reactor { }) } - fn lock_apply(&self, f: F) -> io::Result + fn apply(&self, f: F) -> io::Result where F: FnOnce(&mut Registrations) -> io::Result, {