Skip to content

Commit ecbcdc8

Browse files
committed
Added try_publish to writer
1 parent 754478b commit ecbcdc8

File tree

1 file changed

+64
-2
lines changed

1 file changed

+64
-2
lines changed

src/write.rs

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,36 @@ where
293293
}
294294
}
295295

296+
/// Publishes if all writers have moved to other side without spinning.
297+
///
298+
/// This method needs is difftent then publish to only check the readers once and publish
299+
/// if all readers have moved. Returns true if it was sucessfully
300+
301+
pub fn try_publish(&mut self) -> bool {
302+
let epochs = Arc::clone(&self.epochs);
303+
let mut epochs = epochs.lock().unwrap();
304+
305+
// we're over-estimating here, but slab doesn't expose its max index
306+
self.last_epochs.resize(epochs.capacity(), 0);
307+
for (ri, epoch) in epochs.iter() {
308+
if self.last_epochs[ri] % 2 == 0 {
309+
continue;
310+
}
311+
312+
let now = epoch.load(Ordering::Acquire);
313+
if now != self.last_epochs[ri] {
314+
// reader must have seen the last swap, since they have done at least one
315+
// operation since we last looked at their epoch, which _must_ mean that they
316+
// are no longer using the old pointer value.
317+
continue;
318+
} else {
319+
return false;
320+
}
321+
}
322+
self.do_publish(&mut epochs);
323+
return true;
324+
}
325+
296326
/// Publish all operations append to the log to reads.
297327
///
298328
/// This method needs to wait for all readers to move to the "other" copy of the data so that
@@ -312,6 +342,14 @@ where
312342

313343
self.wait(&mut epochs);
314344

345+
self.do_publish(&mut epochs)
346+
}
347+
348+
/// Actual doing the publishing
349+
fn do_publish(
350+
&mut self,
351+
epochs: &mut MutexGuard<'_, slab::Slab<Arc<AtomicUsize>>>,
352+
) -> &mut Self {
315353
if !self.first {
316354
// all the readers have left!
317355
// safety: we haven't freed the Box, and no readers are accessing the w_handle
@@ -558,8 +596,8 @@ struct CheckWriteHandleSend;
558596

559597
#[cfg(test)]
560598
mod tests {
561-
use crate::sync::{AtomicUsize, Mutex, Ordering};
562-
use crate::Absorb;
599+
use crate::sync::{Arc, AtomicUsize, Mutex, Ordering};
600+
use crate::{read, Absorb};
563601
use slab::Slab;
564602
include!("./utilities.rs");
565603

@@ -713,4 +751,28 @@ mod tests {
713751
w.publish();
714752
assert_eq!(w.refreshes, 4);
715753
}
754+
755+
#[test]
756+
fn try_publish() {
757+
let (mut w, _r) = crate::new::<i32, _>();
758+
759+
// Case 1: A reader has not advanced (odd and unchanged) -> returns false
760+
let mut epochs_slab = Slab::new();
761+
let idx = epochs_slab.insert(Arc::new(AtomicUsize::new(1))); // odd epoch, "in read"
762+
// Ensure last_epochs sees this reader as odd and unchanged
763+
w.last_epochs = vec![0; epochs_slab.capacity()];
764+
w.last_epochs[idx] = 1;
765+
w.epochs = Arc::new(Mutex::new(epochs_slab));
766+
assert_eq!(w.try_publish(), false);
767+
768+
// Case 2: All readers have advanced since last swap -> returns true and publishes
769+
let mut epochs_slab_ok = Slab::new();
770+
let idx_ok = epochs_slab_ok.insert(Arc::new(AtomicUsize::new(2))); // advanced
771+
w.last_epochs = vec![0; epochs_slab_ok.capacity()];
772+
w.last_epochs[idx_ok] = 1; // previously odd
773+
w.epochs = Arc::new(Mutex::new(epochs_slab_ok));
774+
let before = w.refreshes;
775+
assert_eq!(w.try_publish(), true);
776+
assert_eq!(w.refreshes, before + 1);
777+
}
716778
}

0 commit comments

Comments
 (0)