Skip to content

Commit 73a020f

Browse files
authored
Bugfix: wait for the reactor to confirm the modification (#3)
* Bugfix: wait for the reactor to confirm the modification * Report out of memory on FDs bigger than N * Un-hardcode MAX_REGISTRATIONS on regular OSes * Clippy * Better method names
1 parent b28e2c7 commit 73a020f

File tree

1 file changed

+81
-34
lines changed

1 file changed

+81
-34
lines changed

src/reactor.rs

Lines changed: 81 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use core::task::Waker;
44

55
use std::io::{self, ErrorKind};
66
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
7+
use std::sync::MutexGuard;
78

89
use enumset::{EnumSet, EnumSetType};
910

@@ -13,9 +14,13 @@ use libc as sys;
1314

1415
use crate::{syscall, syscall_los, syscall_los_eagain};
1516

17+
// For ESP-IDF sys::FDSETSIZE is currently wrongly set to 1024 in the `libc` crate
18+
// Therefore, use a custom value for now
19+
#[cfg(target_os = "espidf")]
1620
const MAX_REGISTRATIONS: usize = 20;
1721

18-
//const FD_SEGMENT: usize = sys::FD_SETSIZE / core::mem::size_of::<sys::fd_set>();
22+
#[cfg(not(target_os = "espidf"))]
23+
const MAX_REGISTRATIONS: usize = sys::FD_SETSIZE;
1924

2025
#[derive(EnumSetType, Debug)]
2126
pub(crate) enum Event {
@@ -82,13 +87,15 @@ struct Registration {
8287
struct Registrations<const N: usize> {
8388
vec: heapless::Vec<Registration, N>,
8489
event_fd: Option<OwnedFd>,
90+
waiting: usize,
8591
}
8692

8793
impl<const N: usize> Registrations<N> {
8894
const fn new() -> Self {
8995
Self {
9096
vec: heapless::Vec::new(),
9197
event_fd: None,
98+
waiting: 0,
9299
}
93100
}
94101

@@ -103,7 +110,7 @@ impl<const N: usize> Registrations<N> {
103110
Err(ErrorKind::InvalidInput)?;
104111
}
105112

106-
if fd >= sys::FD_SETSIZE as RawFd {
113+
if fd >= sys::FD_SETSIZE as RawFd || fd >= N as RawFd {
107114
Err(ErrorKind::OutOfMemory)?;
108115
}
109116

@@ -129,8 +136,6 @@ impl<const N: usize> Registrations<N> {
129136

130137
self.vec.swap_remove(index);
131138

132-
self.notify()?;
133-
134139
Ok(())
135140
}
136141

@@ -147,8 +152,6 @@ impl<const N: usize> Registrations<N> {
147152
}
148153
}
149154

150-
self.notify()?;
151-
152155
Ok(())
153156
}
154157

@@ -312,13 +315,15 @@ impl<const N: usize> Registrations<N> {
312315

313316
pub struct Reactor<const N: usize> {
314317
registrations: std::sync::Mutex<Registrations<N>>,
318+
condvar: std::sync::Condvar,
315319
started: AtomicBool,
316320
}
317321

318322
impl<const N: usize> Reactor<N> {
319323
const fn new() -> Self {
320324
Self {
321325
registrations: std::sync::Mutex::new(Registrations::new()),
326+
condvar: std::sync::Condvar::new(),
322327
started: AtomicBool::new(false),
323328
}
324329
}
@@ -340,23 +345,23 @@ impl<const N: usize> Reactor<N> {
340345
}
341346

342347
pub(crate) fn register(&self, fd: RawFd) -> io::Result<()> {
343-
self.lock(|regs| regs.register(fd))
348+
self.modify(|regs| regs.register(fd))
344349
}
345350

346351
pub(crate) fn deregister(&self, fd: RawFd) -> io::Result<()> {
347-
self.lock(|regs| regs.deregister(fd))
352+
self.modify(|regs| regs.deregister(fd))
348353
}
349354

350355
// pub(crate) fn set(&self, fd: RawFd, event: Event, waker: &Waker) -> io::Result<()> {
351356
// self.lock(|regs| regs.set(fd, event, waker))
352357
// }
353358

354359
pub(crate) fn fetch(&self, fd: RawFd, event: Event) -> io::Result<bool> {
355-
self.lock(|regs| regs.fetch(fd, event))
360+
self.modify(|regs| regs.fetch(fd, event))
356361
}
357362

358363
pub(crate) fn fetch_or_set(&self, fd: RawFd, event: Event, waker: &Waker) -> io::Result<bool> {
359-
self.lock(|regs| {
364+
self.modify(|regs| {
360365
if regs.fetch(fd, event)? {
361366
Ok(true)
362367
} else {
@@ -368,58 +373,100 @@ impl<const N: usize> Reactor<N> {
368373
}
369374

370375
fn run(&self) -> io::Result<()> {
371-
if !self.lock(Registrations::create_notification)? {
376+
if !self.lock(|mut guard| guard.create_notification())? {
372377
Err(ErrorKind::AlreadyExists)?;
373378
}
374379

375380
debug!("Running");
376381

382+
let mut fds = Fds::new();
383+
let mut update = false;
384+
377385
let result = loop {
378-
let result = self.wait();
386+
let max = self.apply(|inner| {
387+
if !update {
388+
update = true;
389+
} else {
390+
inner.update_events(&fds)?;
391+
}
392+
393+
inner.set_fds(&mut fds)
394+
});
395+
396+
let result = match max {
397+
Err(err) => Err(err),
398+
Ok(None) => unreachable!("EventFD is not there?"),
399+
Ok(Some(max)) => {
400+
debug!("Start select");
401+
402+
let result = syscall_los!(unsafe {
403+
sys::select(
404+
max + 1,
405+
fds.read.assume_init_mut(),
406+
fds.write.assume_init_mut(),
407+
fds.except.assume_init_mut(),
408+
core::ptr::null_mut(),
409+
)
410+
});
411+
412+
debug!("End select");
413+
414+
result.map(|_| ())
415+
}
416+
};
379417

380418
if result.is_err() {
381419
break result;
382420
}
383421
};
384422

385-
if !self.lock(Registrations::destroy_notification)? {
423+
if !self.lock(|mut guard| guard.destroy_notification())? {
386424
Err(ErrorKind::NotFound)?;
387425
}
388426

389427
result
390428
}
391429

392-
fn wait(&self) -> io::Result<()> {
393-
let mut fds = Fds::new();
394-
395-
if let Some(max) = self.lock(|inner| inner.set_fds(&mut fds))? {
396-
debug!("Start select");
430+
fn modify<F, R>(&self, f: F) -> io::Result<R>
431+
where
432+
F: FnOnce(&mut Registrations<N>) -> io::Result<R>,
433+
{
434+
self.lock(|mut guard| {
435+
guard.waiting += 1;
397436

398-
syscall_los!(unsafe {
399-
sys::select(
400-
max + 1,
401-
fds.read.assume_init_mut(),
402-
fds.write.assume_init_mut(),
403-
fds.except.assume_init_mut(),
404-
core::ptr::null_mut(),
405-
)
406-
})?;
437+
let result = f(&mut guard);
407438

408-
debug!("End select");
439+
guard.notify()?;
409440

410-
self.lock(|inner| inner.update_events(&fds))?;
411-
}
441+
let _guard = self
442+
.condvar
443+
.wait_while(guard, |registrations| registrations.waiting > 0)
444+
.unwrap();
412445

413-
Ok(())
446+
result
447+
})
414448
}
415449

416-
fn lock<F, R>(&self, f: F) -> io::Result<R>
450+
fn apply<F, R>(&self, f: F) -> io::Result<R>
417451
where
418452
F: FnOnce(&mut Registrations<N>) -> io::Result<R>,
419453
{
420-
let mut inner = self.registrations.lock().unwrap();
454+
self.lock(|mut guard| {
455+
let result = f(&mut guard);
456+
457+
guard.waiting = 0;
421458

422-
f(&mut inner)
459+
self.condvar.notify_all();
460+
461+
result
462+
})
463+
}
464+
465+
fn lock<F, R>(&self, f: F) -> io::Result<R>
466+
where
467+
F: FnOnce(MutexGuard<Registrations<N>>) -> io::Result<R>,
468+
{
469+
f(self.registrations.lock().unwrap())
423470
}
424471
}
425472

0 commit comments

Comments
 (0)