Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 75 additions & 2 deletions src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,45 @@ where
}
}

/// Try to publish once without waiting.
///
/// This performs a single, non-blocking check of reader epochs. If all current readers have
/// advanced since the last swap, it performs a publish and returns `true`. If any reader may
/// still be accessing the old copy, it does nothing and returns `false`.
///
/// Unlike [`publish`](Self::publish), this never spins or waits. Use it on latency-sensitive
/// paths where skipping a publish is preferable to blocking; call again later or fall back to
/// [`publish`](Self::publish) if you must ensure visibility.
///
/// Returns `true` if a publish occurred, `false` otherwise.
pub fn try_publish(&mut self) -> bool {
let epochs = Arc::clone(&self.epochs);
let mut epochs = epochs.lock().unwrap();

// This wait loop is exactly like the one in wait, except that if we find a reader that
// has not observed the latest swap, we return rather than spin-and-retry.
self.last_epochs.resize(epochs.capacity(), 0);
for (ri, epoch) in epochs.iter() {
if self.last_epochs[ri] % 2 == 0 {
continue;
}

let now = epoch.load(Ordering::Acquire);
if now != self.last_epochs[ri] {
continue;
} else {
return false;
}
}
#[cfg(test)]
{
self.is_waiting.store(false, Ordering::Relaxed);
}
self.update_and_swap(&mut epochs);

true
}

/// Publish all operations append to the log to reads.
///
/// This method needs to wait for all readers to move to the "other" copy of the data so that
Expand All @@ -312,6 +351,16 @@ where

self.wait(&mut epochs);

self.update_and_swap(&mut epochs)
}

/// Brings `w_handle` up to date with the oplog, then swaps `r_handle` and `w_handle`.
///
/// This method must only be called when all readers have exited `w_handle` (e.g., after `wait`)
fn update_and_swap(
&mut self,
epochs: &mut MutexGuard<'_, slab::Slab<Arc<AtomicUsize>>>,
) -> &mut Self {
if !self.first {
// all the readers have left!
// safety: we haven't freed the Box, and no readers are accessing the w_handle
Expand Down Expand Up @@ -558,8 +607,8 @@ struct CheckWriteHandleSend;

#[cfg(test)]
mod tests {
use crate::sync::{AtomicUsize, Mutex, Ordering};
use crate::Absorb;
use crate::sync::{Arc, AtomicUsize, Mutex, Ordering};
use crate::{read, Absorb};
use slab::Slab;
include!("./utilities.rs");

Expand Down Expand Up @@ -713,4 +762,28 @@ mod tests {
w.publish();
assert_eq!(w.refreshes, 4);
}

#[test]
fn try_publish() {
let (mut w, _r) = crate::new::<i32, _>();

// Case 1: A reader has not advanced (odd and unchanged) -> returns false
let mut epochs_slab = Slab::new();
let idx = epochs_slab.insert(Arc::new(AtomicUsize::new(1))); // odd epoch, "in read"
// Ensure last_epochs sees this reader as odd and unchanged
w.last_epochs = vec![0; epochs_slab.capacity()];
w.last_epochs[idx] = 1;
w.epochs = Arc::new(Mutex::new(epochs_slab));
assert_eq!(w.try_publish(), false);

// Case 2: All readers have advanced since last swap -> returns true and publishes
let mut epochs_slab_ok = Slab::new();
let idx_ok = epochs_slab_ok.insert(Arc::new(AtomicUsize::new(2))); // advanced
w.last_epochs = vec![0; epochs_slab_ok.capacity()];
w.last_epochs[idx_ok] = 1; // previously odd
w.epochs = Arc::new(Mutex::new(epochs_slab_ok));
let before = w.refreshes;
assert_eq!(w.try_publish(), true);
assert_eq!(w.refreshes, before + 1);
}
}
2 changes: 1 addition & 1 deletion tests/deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ fn deque() {
expect(&r, &[1, 2, 3]);

w.append(Op::PushBack(mkval(4)));
w.publish();
assert!(w.try_publish());

registry.expect(4);
expect(&r, &[1, 2, 3, 4]);
Expand Down
2 changes: 1 addition & 1 deletion tests/loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod loom_tests {
let (mut w, r) = left_right::new::<i32, _>();

w.append(CounterAddOp(1));
w.publish();
assert!(w.try_publish());

let jh = thread::spawn(move || *r.enter().unwrap());

Expand Down
Loading