Skip to content

Commit ee96ae2

Browse files
committed
Bugfix: wait for the reactor to confirm the modification
1 parent b28e2c7 commit ee96ae2

File tree

1 file changed

+75
-32
lines changed

1 file changed

+75
-32
lines changed

src/reactor.rs

Lines changed: 75 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use core::task::Waker;
44

55
use std::io::{self, ErrorKind};
66
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
7+
use std::sync::MutexGuard;
78

89
use enumset::{EnumSet, EnumSetType};
910

@@ -82,13 +83,15 @@ struct Registration {
8283
struct Registrations<const N: usize> {
8384
vec: heapless::Vec<Registration, N>,
8485
event_fd: Option<OwnedFd>,
86+
waiting: usize,
8587
}
8688

8789
impl<const N: usize> Registrations<N> {
8890
const fn new() -> Self {
8991
Self {
9092
vec: heapless::Vec::new(),
9193
event_fd: None,
94+
waiting: 0,
9295
}
9396
}
9497

@@ -129,8 +132,6 @@ impl<const N: usize> Registrations<N> {
129132

130133
self.vec.swap_remove(index);
131134

132-
self.notify()?;
133-
134135
Ok(())
135136
}
136137

@@ -147,8 +148,6 @@ impl<const N: usize> Registrations<N> {
147148
}
148149
}
149150

150-
self.notify()?;
151-
152151
Ok(())
153152
}
154153

@@ -312,13 +311,15 @@ impl<const N: usize> Registrations<N> {
312311

313312
pub struct Reactor<const N: usize> {
314313
registrations: std::sync::Mutex<Registrations<N>>,
314+
condvar: std::sync::Condvar,
315315
started: AtomicBool,
316316
}
317317

318318
impl<const N: usize> Reactor<N> {
319319
const fn new() -> Self {
320320
Self {
321321
registrations: std::sync::Mutex::new(Registrations::new()),
322+
condvar: std::sync::Condvar::new(),
322323
started: AtomicBool::new(false),
323324
}
324325
}
@@ -340,23 +341,23 @@ impl<const N: usize> Reactor<N> {
340341
}
341342

342343
pub(crate) fn register(&self, fd: RawFd) -> io::Result<()> {
343-
self.lock(|regs| regs.register(fd))
344+
self.lock_modify(|regs| regs.register(fd))
344345
}
345346

346347
pub(crate) fn deregister(&self, fd: RawFd) -> io::Result<()> {
347-
self.lock(|regs| regs.deregister(fd))
348+
self.lock_modify(|regs| regs.deregister(fd))
348349
}
349350

350351
// pub(crate) fn set(&self, fd: RawFd, event: Event, waker: &Waker) -> io::Result<()> {
351352
// self.lock(|regs| regs.set(fd, event, waker))
352353
// }
353354

354355
pub(crate) fn fetch(&self, fd: RawFd, event: Event) -> io::Result<bool> {
355-
self.lock(|regs| regs.fetch(fd, event))
356+
self.lock_modify(|regs| regs.fetch(fd, event))
356357
}
357358

358359
pub(crate) fn fetch_or_set(&self, fd: RawFd, event: Event, waker: &Waker) -> io::Result<bool> {
359-
self.lock(|regs| {
360+
self.lock_modify(|regs| {
360361
if regs.fetch(fd, event)? {
361362
Ok(true)
362363
} else {
@@ -368,58 +369,100 @@ impl<const N: usize> Reactor<N> {
368369
}
369370

370371
fn run(&self) -> io::Result<()> {
371-
if !self.lock(Registrations::create_notification)? {
372+
if !self.lock(|mut guard| guard.create_notification())? {
372373
Err(ErrorKind::AlreadyExists)?;
373374
}
374375

375376
debug!("Running");
376377

378+
let mut fds = Fds::new();
379+
let mut update = false;
380+
377381
let result = loop {
378-
let result = self.wait();
382+
let max = self.lock_apply(|inner| {
383+
if !update {
384+
update = true;
385+
} else {
386+
inner.update_events(&fds)?;
387+
}
388+
389+
inner.set_fds(&mut fds)
390+
});
391+
392+
let result = match max {
393+
Err(err) => Err(err),
394+
Ok(None) => unreachable!("EventFD is not there?"),
395+
Ok(Some(max)) => {
396+
debug!("Start select");
397+
398+
let result = syscall_los!(unsafe {
399+
sys::select(
400+
max + 1,
401+
fds.read.assume_init_mut(),
402+
fds.write.assume_init_mut(),
403+
fds.except.assume_init_mut(),
404+
core::ptr::null_mut(),
405+
)
406+
});
407+
408+
debug!("End select");
409+
410+
result.map(|_| ())
411+
}
412+
};
379413

380414
if result.is_err() {
381415
break result;
382416
}
383417
};
384418

385-
if !self.lock(Registrations::destroy_notification)? {
419+
if !self.lock(|mut guard| guard.destroy_notification())? {
386420
Err(ErrorKind::NotFound)?;
387421
}
388422

389423
result
390424
}
391425

392-
fn wait(&self) -> io::Result<()> {
393-
let mut fds = Fds::new();
394-
395-
if let Some(max) = self.lock(|inner| inner.set_fds(&mut fds))? {
396-
debug!("Start select");
426+
fn lock_modify<F, R>(&self, f: F) -> io::Result<R>
427+
where
428+
F: FnOnce(&mut Registrations<N>) -> io::Result<R>,
429+
{
430+
self.lock(|mut guard| {
431+
guard.waiting += 1;
397432

398-
syscall_los!(unsafe {
399-
sys::select(
400-
max + 1,
401-
fds.read.assume_init_mut(),
402-
fds.write.assume_init_mut(),
403-
fds.except.assume_init_mut(),
404-
core::ptr::null_mut(),
405-
)
406-
})?;
433+
let result = f(&mut guard);
407434

408-
debug!("End select");
435+
guard.notify()?;
409436

410-
self.lock(|inner| inner.update_events(&fds))?;
411-
}
437+
let _guard = self
438+
.condvar
439+
.wait_while(guard, |registrations| registrations.waiting > 0)
440+
.unwrap();
412441

413-
Ok(())
442+
result
443+
})
414444
}
415445

416-
fn lock<F, R>(&self, f: F) -> io::Result<R>
446+
fn lock_apply<F, R>(&self, f: F) -> io::Result<R>
417447
where
418448
F: FnOnce(&mut Registrations<N>) -> io::Result<R>,
419449
{
420-
let mut inner = self.registrations.lock().unwrap();
450+
self.lock(|mut guard| {
451+
let result = f(&mut guard);
452+
453+
guard.waiting = 0;
421454

422-
f(&mut inner)
455+
self.condvar.notify_all();
456+
457+
result
458+
})
459+
}
460+
461+
fn lock<F, R>(&self, f: F) -> io::Result<R>
462+
where
463+
F: FnOnce(MutexGuard<Registrations<N>>) -> io::Result<R>,
464+
{
465+
f(self.registrations.lock().unwrap())
423466
}
424467
}
425468

0 commit comments

Comments
 (0)