diff --git a/src/write.rs b/src/write.rs index 747367e..f0fcdc5 100644 --- a/src/write.rs +++ b/src/write.rs @@ -293,6 +293,45 @@ where } } + /// Try to publish once without waiting. + /// + /// This performs a single, non-blocking check of reader epochs. If all current readers have + /// advanced since the last swap, it performs a publish and returns `true`. If any reader may + /// still be accessing the old copy, it does nothing and returns `false`. + /// + /// Unlike [`publish`](Self::publish), this never spins or waits. Use it on latency-sensitive + /// paths where skipping a publish is preferable to blocking; call again later or fall back to + /// [`publish`](Self::publish) if you must ensure visibility. + /// + /// Returns `true` if a publish occurred, `false` otherwise. + pub fn try_publish(&mut self) -> bool { + let epochs = Arc::clone(&self.epochs); + let mut epochs = epochs.lock().unwrap(); + + // This wait loop is exactly like the one in wait, except that if we find a reader that + // has not observed the latest swap, we return rather than spin-and-retry. + self.last_epochs.resize(epochs.capacity(), 0); + for (ri, epoch) in epochs.iter() { + if self.last_epochs[ri] % 2 == 0 { + continue; + } + + let now = epoch.load(Ordering::Acquire); + if now != self.last_epochs[ri] { + continue; + } else { + return false; + } + } + #[cfg(test)] + { + self.is_waiting.store(false, Ordering::Relaxed); + } + self.update_and_swap(&mut epochs); + + true + } + /// Publish all operations append to the log to reads. /// /// This method needs to wait for all readers to move to the "other" copy of the data so that @@ -312,6 +351,17 @@ where self.wait(&mut epochs); + self.update_and_swap(&mut epochs) + } + + /// Brings `w_handle` up to date with the oplog, then swaps `r_handle` and `w_handle`. + /// + /// This method must only be called when all readers have exited `w_handle` (e.g., after + /// `wait`). + fn update_and_swap( + &mut self, + epochs: &mut MutexGuard<'_, slab::Slab>>, + ) -> &mut Self { if !self.first { // all the readers have left! // safety: we haven't freed the Box, and no readers are accessing the w_handle @@ -558,8 +608,8 @@ struct CheckWriteHandleSend; #[cfg(test)] mod tests { - use crate::sync::{AtomicUsize, Mutex, Ordering}; - use crate::Absorb; + use crate::sync::{Arc, AtomicUsize, Mutex, Ordering}; + use crate::{read, Absorb}; use slab::Slab; include!("./utilities.rs"); @@ -713,4 +763,28 @@ mod tests { w.publish(); assert_eq!(w.refreshes, 4); } + + #[test] + fn try_publish() { + let (mut w, _r) = crate::new::(); + + // Case 1: A reader has not advanced (odd and unchanged) -> returns false + let mut epochs_slab = Slab::new(); + let idx = epochs_slab.insert(Arc::new(AtomicUsize::new(1))); // odd epoch, "in read" + // Ensure last_epochs sees this reader as odd and unchanged + w.last_epochs = vec![0; epochs_slab.capacity()]; + w.last_epochs[idx] = 1; + w.epochs = Arc::new(Mutex::new(epochs_slab)); + assert_eq!(w.try_publish(), false); + + // Case 2: All readers have advanced since last swap -> returns true and publishes + let mut epochs_slab_ok = Slab::new(); + let idx_ok = epochs_slab_ok.insert(Arc::new(AtomicUsize::new(2))); // advanced + w.last_epochs = vec![0; epochs_slab_ok.capacity()]; + w.last_epochs[idx_ok] = 1; // previously odd + w.epochs = Arc::new(Mutex::new(epochs_slab_ok)); + let before = w.refreshes; + assert_eq!(w.try_publish(), true); + assert_eq!(w.refreshes, before + 1); + } } diff --git a/tests/deque.rs b/tests/deque.rs index 8ef5333..bfbebae 100644 --- a/tests/deque.rs +++ b/tests/deque.rs @@ -135,7 +135,7 @@ fn deque() { expect(&r, &[1, 2, 3]); w.append(Op::PushBack(mkval(4))); - w.publish(); + assert!(w.try_publish()); registry.expect(4); expect(&r, &[1, 2, 3, 4]); diff --git a/tests/loom.rs b/tests/loom.rs index 68fa136..d01c9eb 100644 --- a/tests/loom.rs +++ b/tests/loom.rs @@ -14,7 +14,7 @@ mod loom_tests { let (mut w, r) = left_right::new::(); w.append(CounterAddOp(1)); - w.publish(); + assert!(w.try_publish()); let jh = thread::spawn(move || *r.enter().unwrap());