Skip to content

Commit d5f31dd

Browse files
authored
Added try_publish to writer (#120)
1 parent 754478b commit d5f31dd

File tree

3 files changed

+78
-4
lines changed

3 files changed

+78
-4
lines changed

src/write.rs

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,45 @@ where
293293
}
294294
}
295295

296+
/// Try to publish once without waiting.
297+
///
298+
/// This performs a single, non-blocking check of reader epochs. If all current readers have
299+
/// advanced since the last swap, it performs a publish and returns `true`. If any reader may
300+
/// still be accessing the old copy, it does nothing and returns `false`.
301+
///
302+
/// Unlike [`publish`](Self::publish), this never spins or waits. Use it on latency-sensitive
303+
/// paths where skipping a publish is preferable to blocking; call again later or fall back to
304+
/// [`publish`](Self::publish) if you must ensure visibility.
305+
///
306+
/// Returns `true` if a publish occurred, `false` otherwise.
307+
pub fn try_publish(&mut self) -> bool {
308+
let epochs = Arc::clone(&self.epochs);
309+
let mut epochs = epochs.lock().unwrap();
310+
311+
// This wait loop is exactly like the one in wait, except that if we find a reader that
312+
// has not observed the latest swap, we return rather than spin-and-retry.
313+
self.last_epochs.resize(epochs.capacity(), 0);
314+
for (ri, epoch) in epochs.iter() {
315+
if self.last_epochs[ri] % 2 == 0 {
316+
continue;
317+
}
318+
319+
let now = epoch.load(Ordering::Acquire);
320+
if now != self.last_epochs[ri] {
321+
continue;
322+
} else {
323+
return false;
324+
}
325+
}
326+
#[cfg(test)]
327+
{
328+
self.is_waiting.store(false, Ordering::Relaxed);
329+
}
330+
self.update_and_swap(&mut epochs);
331+
332+
true
333+
}
334+
296335
/// Publish all operations append to the log to reads.
297336
///
298337
/// This method needs to wait for all readers to move to the "other" copy of the data so that
@@ -312,6 +351,17 @@ where
312351

313352
self.wait(&mut epochs);
314353

354+
self.update_and_swap(&mut epochs)
355+
}
356+
357+
/// Brings `w_handle` up to date with the oplog, then swaps `r_handle` and `w_handle`.
358+
///
359+
/// This method must only be called when all readers have exited `w_handle` (e.g., after
360+
/// `wait`).
361+
fn update_and_swap(
362+
&mut self,
363+
epochs: &mut MutexGuard<'_, slab::Slab<Arc<AtomicUsize>>>,
364+
) -> &mut Self {
315365
if !self.first {
316366
// all the readers have left!
317367
// safety: we haven't freed the Box, and no readers are accessing the w_handle
@@ -558,8 +608,8 @@ struct CheckWriteHandleSend;
558608

559609
#[cfg(test)]
560610
mod tests {
561-
use crate::sync::{AtomicUsize, Mutex, Ordering};
562-
use crate::Absorb;
611+
use crate::sync::{Arc, AtomicUsize, Mutex, Ordering};
612+
use crate::{read, Absorb};
563613
use slab::Slab;
564614
include!("./utilities.rs");
565615

@@ -713,4 +763,28 @@ mod tests {
713763
w.publish();
714764
assert_eq!(w.refreshes, 4);
715765
}
766+
767+
#[test]
768+
fn try_publish() {
769+
let (mut w, _r) = crate::new::<i32, _>();
770+
771+
// Case 1: A reader has not advanced (odd and unchanged) -> returns false
772+
let mut epochs_slab = Slab::new();
773+
let idx = epochs_slab.insert(Arc::new(AtomicUsize::new(1))); // odd epoch, "in read"
774+
// Ensure last_epochs sees this reader as odd and unchanged
775+
w.last_epochs = vec![0; epochs_slab.capacity()];
776+
w.last_epochs[idx] = 1;
777+
w.epochs = Arc::new(Mutex::new(epochs_slab));
778+
assert_eq!(w.try_publish(), false);
779+
780+
// Case 2: All readers have advanced since last swap -> returns true and publishes
781+
let mut epochs_slab_ok = Slab::new();
782+
let idx_ok = epochs_slab_ok.insert(Arc::new(AtomicUsize::new(2))); // advanced
783+
w.last_epochs = vec![0; epochs_slab_ok.capacity()];
784+
w.last_epochs[idx_ok] = 1; // previously odd
785+
w.epochs = Arc::new(Mutex::new(epochs_slab_ok));
786+
let before = w.refreshes;
787+
assert_eq!(w.try_publish(), true);
788+
assert_eq!(w.refreshes, before + 1);
789+
}
716790
}

tests/deque.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ fn deque() {
135135
expect(&r, &[1, 2, 3]);
136136

137137
w.append(Op::PushBack(mkval(4)));
138-
w.publish();
138+
assert!(w.try_publish());
139139

140140
registry.expect(4);
141141
expect(&r, &[1, 2, 3, 4]);

tests/loom.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ mod loom_tests {
1414
let (mut w, r) = left_right::new::<i32, _>();
1515

1616
w.append(CounterAddOp(1));
17-
w.publish();
17+
assert!(w.try_publish());
1818

1919
let jh = thread::spawn(move || *r.enter().unwrap());
2020

0 commit comments

Comments
 (0)