diff --git a/Cargo.toml b/Cargo.toml index 451e379..bbcb1fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,5 +14,15 @@ categories = ["concurrency"] [dependencies] slab = "0.4" +[dev-dependencies] +rand = "0.8.4" +quickcheck = "1.0.3" +quickcheck_macros = "1.0.0" +criterion = "0.3" + +[[bench]] +name = "benchmark" +harness = false + [target.'cfg(loom)'.dependencies] loom = "0.4.0" diff --git a/benches/benchmark.rs b/benches/benchmark.rs new file mode 100644 index 0000000..9f5a941 --- /dev/null +++ b/benches/benchmark.rs @@ -0,0 +1,457 @@ +use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion}; +use left_right::*; +use rand::{distributions::Uniform, Rng}; +use std::collections::VecDeque; + +macro_rules! bench_instance { + ( + name: $name: ident, + range: $range: literal, + delays: { + absorb_set: $absorb_set: literal, + absorb_clear: $absorb_clear: literal, + compress_set: $compress_set: literal, + compress_clear: $compress_clear: literal, + }, + ops: { + key_bits: $key_bits: literal, + clear_bits: $clear_bits: literal, + }, + iteration: { + len: $len: literal, + chunk_len: $chunk_len: literal, + publish_len: $publish_len: literal, + } + ) => { + fn $name(c: &mut Criterion) { + run::<$range>( + c, + stringify!($name), + $key_bits, + $clear_bits, + $absorb_set, + $absorb_clear, + $compress_set, + $compress_clear, + $len, + $chunk_len, + $publish_len, + ) + } + }; +} + +/* # About the values used: + * delays: number of iterations inside a black-box spin-loop + * absorb_set: Higher values MASSIVELY benefit compression. 500 is about a lookup in a small Map (n~=64, Hash ~= BTree) followed by a non-trivial String operation. + * absorb_clear: Higher values MASSIVELY benefit compression. 12000 is about clearing a small Map of Strings (n~=64). + * compress_set: Lower values benefit compression. 125 is about a non-trivial String operation. + * compress_clear: Lower values benefit compression. slightly lower than compress_set because we just discard a String instead of combining two. + * !compress_not: No setting for Independent/Dependent delay because even 0 would be far greater than a simple key-mem-inequality check and would grind compression to a screeching halt. + * ops: + * key_bits: Lower values MASSIVELY benefit high-range compression. 6 is equivalent to 64 entries, (very) low for big Maps, but high for non-map values. + * clear_bits: Lower values MASSIVELY benefit low-range compression. 11 is equivalent to clearing a Map every 2048 operations, (VERY) low for big Maps, maybe reasonable for an arena of some kind? + * iteration: + * len: No performance benefit either way. + * chunk_len: Higher values slightly benefit high-range compression by amortizing linear none-removal. TODO: Maybe implement buffered appends? + * publish_len: Higher values greatly benefit compressions memory savings, but shouldn't have a noticeable performance impact either way. + */ + +bench_instance!( + name: none_favorable, + range: 0, + delays: { + absorb_set: 1000, + absorb_clear: 15000, + compress_set: 125, + compress_clear: 100, + }, + ops: { + key_bits: 6, + clear_bits: 11, + }, + iteration: { + len: 0x20000, + chunk_len: 0x80, + publish_len: 0x800, + } +); +bench_instance!( + name: max_favorable, + range: 0xFFFFFFFFFFFFFFFF, + delays: { + absorb_set: 1000, + absorb_clear: 15000, + compress_set: 125, + compress_clear: 100, + }, + ops: { + key_bits: 6, + clear_bits: 11, + }, + iteration: { + len: 0x20000, + chunk_len: 0x80, + publish_len: 0x800, + } +); +bench_instance!( + name: r1_favorable, + range: 1, + delays: { + absorb_set: 1000, + absorb_clear: 15000, + compress_set: 125, + compress_clear: 100, + }, + ops: { + key_bits: 6, + clear_bits: 11, + }, + iteration: { + len: 0x20000, + chunk_len: 0x80, + publish_len: 0x800, + } +); +bench_instance!( + name: r16_favorable, + range: 16, + delays: { + absorb_set: 1000, + absorb_clear: 15000, + compress_set: 125, + compress_clear: 100, + }, + ops: { + key_bits: 6, + clear_bits: 11, + }, + iteration: { + len: 0x20000, + chunk_len: 0x80, + publish_len: 0x800, + } +); +bench_instance!( + name: r64_favorable, + range: 64, + delays: { + absorb_set: 1000, + absorb_clear: 15000, + compress_set: 125, + compress_clear: 100, + }, + ops: { + key_bits: 6, + clear_bits: 11, + }, + iteration: { + len: 0x20000, + chunk_len: 0x80, + publish_len: 0x800, + } +); + +bench_instance!( + name: none_unfavorable, + range: 0, + delays: { + absorb_set: 500, + absorb_clear: 10000, + compress_set: 125, + compress_clear: 100, + }, + ops: { + key_bits: 8, + clear_bits: 14, + }, + iteration: { + len: 0x20000, + chunk_len: 0x80, + publish_len: 0x800, + } +); +bench_instance!( + name: max_unfavorable, + range: 0xFFFFFFFFFFFFFFFF, + delays: { + absorb_set: 500, + absorb_clear: 10000, + compress_set: 125, + compress_clear: 100, + }, + ops: { + key_bits: 8, + clear_bits: 14, + }, + iteration: { + len: 0x20000, + chunk_len: 0x80, + publish_len: 0x800, + } +); +bench_instance!( + name: r1_unfavorable, + range: 1, + delays: { + absorb_set: 500, + absorb_clear: 10000, + compress_set: 125, + compress_clear: 100, + }, + ops: { + key_bits: 8, + clear_bits: 14, + }, + iteration: { + len: 0x20000, + chunk_len: 0x80, + publish_len: 0x800, + } +); +bench_instance!( + name: r16_unfavorable, + range: 16, + delays: { + absorb_set: 500, + absorb_clear: 10000, + compress_set: 125, + compress_clear: 100, + }, + ops: { + key_bits: 8, + clear_bits: 14, + }, + iteration: { + len: 0x20000, + chunk_len: 0x80, + publish_len: 0x800, + } +); +bench_instance!( + name: r64_unfavorable, + range: 64, + delays: { + absorb_set: 500, + absorb_clear: 10000, + compress_set: 125, + compress_clear: 100, + }, + ops: { + key_bits: 8, + clear_bits: 14, + }, + iteration: { + len: 0x20000, + chunk_len: 0x80, + publish_len: 0x800, + } +); + +bench_instance!( + name: none_no_clear, + range: 0, + delays: { + absorb_set: 500, + absorb_clear: 10000, + compress_set: 125, + compress_clear: 100, + }, + ops: { + key_bits: 8, + clear_bits: 63, + }, + iteration: { + len: 0x20000, + chunk_len: 0x80, + publish_len: 0x800, + } +); +bench_instance!( + name: r1_no_clear, + range: 1, + delays: { + absorb_set: 500, + absorb_clear: 10000, + compress_set: 125, + compress_clear: 100, + }, + ops: { + key_bits: 8, + clear_bits: 63, + }, + iteration: { + len: 0x20000, + chunk_len: 0x80, + publish_len: 0x800, + } +); + +criterion_group!( + benches, + none_favorable, + max_favorable, + r1_favorable, + r16_favorable, + r64_favorable, + none_unfavorable, + max_unfavorable, + r1_unfavorable, + r16_unfavorable, + r64_unfavorable, + none_no_clear, + r1_no_clear +); +criterion_main!(benches); + +fn run( + c: &mut Criterion, + name: &str, + key_bits: u8, + clear_bits: u8, + absorb_set: u16, + absorb_clear: u16, + compress_set: u8, + compress_clear: u8, + len: usize, + chunk_len: usize, + publish_len: usize, +) { + c.bench_function(name, |b| { + b.iter_batched( + || { + let ops = random_ops(key_bits, clear_bits, compress_set, compress_clear, len); + let (w, _) = new_from_empty::<_, FakeMapOp>(FakeMap:: { + absorb_set, + absorb_clear, + }); + (ops, w) + }, + |(mut ops, mut w)| { + let mut log_len = 0; + while !ops.is_empty() { + w.extend(ops.drain(0..black_box(chunk_len))); + log_len += chunk_len; + if log_len >= publish_len { + log_len -= publish_len; + w.publish(); + } + } + }, + BatchSize::LargeInput, + ) + }); +} + +pub(crate) fn random_ops( + key_bits: u8, + clear_bits: u8, + compress_set: u8, + compress_clear: u8, + len: usize, +) -> VecDeque { + let rng = rand::thread_rng(); + let dist = Uniform::new(0, usize::MAX); + rng.sample_iter(&dist) + .take(len) + .map(|x| { + let key = (x & !((!0) << key_bits)) as u16; + if x & !((!0) << clear_bits) == 0 { + FakeMapOp::Clear { + compress_set, + compress_clear, + } + } else { + FakeMapOp::Set { + key, + compress_set, + compress_clear, + } + } + }) + .collect() +} + +fn black_box_spin(spin_count: usize) { + let mut counter = 1; + for spin_count in 0..black_box(spin_count) { + counter += black_box(spin_count); + } + if black_box(counter) > 0 { + Some(()) + } else { + None + } + .unwrap() +} + +pub(crate) enum FakeMapOp { + Set { + key: u16, + compress_set: u8, + compress_clear: u8, + }, + Clear { + compress_set: u8, + compress_clear: u8, + }, +} +#[derive(Clone, Debug, Default)] +pub(crate) struct FakeMap { + absorb_set: u16, + absorb_clear: u16, +} +impl Absorb for FakeMap { + fn absorb_first(&mut self, operation: &mut FakeMapOp, _: &Self) { + black_box_spin(match operation { + FakeMapOp::Set { .. } => self.absorb_set, + FakeMapOp::Clear { .. } => self.absorb_clear, + } as usize); + } + fn sync_with(&mut self, first: &Self) { + *self = first.clone(); + } + + const MAX_COMPRESS_RANGE: usize = RANGE; + fn try_compress(mut prev: &mut FakeMapOp, next: FakeMapOp) -> TryCompressResult { + match (&mut prev, next) { + ( + FakeMapOp::Set { key: prev_key, .. }, + FakeMapOp::Set { + key, + compress_set, + compress_clear, + }, + ) => { + if *prev_key == key { + black_box_spin(compress_set as usize); + TryCompressResult::Compressed + } else { + TryCompressResult::Independent(FakeMapOp::Set { + key, + compress_set, + compress_clear, + }) + } + } + ( + _, + FakeMapOp::Clear { + compress_set, + compress_clear, + }, + ) => { + black_box_spin(compress_clear as usize); + *prev = FakeMapOp::Clear { + compress_set, + compress_clear, + }; + TryCompressResult::Compressed + } + (FakeMapOp::Clear { .. }, next @ FakeMapOp::Set { .. }) => { + TryCompressResult::Dependent(next) + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index e535198..96d1bb2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -189,6 +189,29 @@ pub use crate::read::{ReadGuard, ReadHandle, ReadHandleFactory}; pub mod aliasing; +/// The result of calling [`Absorb::try_compress`](Absorb::try_compress). +#[derive(Debug)] +pub enum TryCompressResult { + /// Returned when [`try_compress`](Absorb::try_compress) was successful. + /// + /// The expectation is that the `prev` argument to `try_compress` now represents the combined operation after consuming `next`. + /// + /// Compression will continue by attempting to combine the new `prev` with its predecessors. + Compressed, + /// The two operations passed to [`try_compress`](Absorb::try_compress) were not combined, but commute. + /// + /// Since `prev` and `next` commute, compression will continue attempting to merge `next` with operations that precede `prev` in the oplog, potentially changing the relative ordering of `prev` and `next`. + /// + /// Returns ownership of `next` so that compression can continue (or `next` can be restored). + Independent(O), + /// The two operations passed to [`try_compress`](Absorb::try_compress) were not combined, and do not commute. + /// + /// Since `prev` and `next` do not commute, `next` cannot be moved past `prev`, and must thus be left in its current location. + /// + /// Returns ownership of `next` so that it can be put back in the oplog (after `prev`). + Dependent(O), +} + /// Types that can incorporate operations of type `O`. /// /// This trait allows `left-right` to keep the two copies of the underlying data structure (see the @@ -261,6 +284,27 @@ pub trait Absorb { /// subtly affect results like the `RandomState` of a `HashMap` which can change iteration /// order. fn sync_with(&mut self, first: &Self); + + /// Range at which [`WriteHandle`] tries to compress the oplog, reset each time a compression succeeds. + /// + /// Can be used to avoid having insertion into the oplog be O(oplog.len * ops.len) if it is filled with mainly independent ops. + /// + /// Defaults to `0`, which disables compression and allows the usage of an efficient fallback. + const MAX_COMPRESS_RANGE: usize = 0; + + /// Try to compress two ops into a single op and return a [`TryCompressResult`]. + /// + /// Used to optimize the oplog while extending it, `prev` is the target inside the oplog, `next` is the op being inserted. + /// + /// Defaults to [`TryCompressResult::Dependent`], which sub-optimally disables compression. + /// Setting [`Self::MAX_COMPRESS_RANGE`](Absorb::MAX_COMPRESS_RANGE) to or leaving it at it's default of `0` is vastly more efficient for that. + fn try_compress(prev: &mut O, next: O) -> TryCompressResult { + // yes, unnecessary, but: makes it so that prev is not an unused variable + // and really matches the mental model of 'all ops are dependent'. + match prev { + _ => TryCompressResult::Dependent(next), + } + } } /// Construct a new write and read handle pair from an empty data structure. diff --git a/src/utilities.rs b/src/utilities.rs index 88520e8..8f632f8 100644 --- a/src/utilities.rs +++ b/src/utilities.rs @@ -8,13 +8,63 @@ impl Absorb for i32 { *self += operation.0; } - fn absorb_second(&mut self, operation: CounterAddOp, _: &Self) { - *self += operation.0; + fn sync_with(&mut self, first: &Self) { + *self = *first } +} + +#[cfg(test)] +#[derive(Debug, Eq, PartialEq)] +pub enum CompressibleCounterOp { + Set(i32), + Add(i32), + Sub(i32), +} - fn drop_first(self: Box) {} +#[cfg(test)] +impl Absorb> for i32 { + fn absorb_first( + &mut self, + operation: &mut CompressibleCounterOp, + _: &Self, + ) { + match operation { + CompressibleCounterOp::Set(v) => *self = *v, + CompressibleCounterOp::Add(v) => *self += *v, + CompressibleCounterOp::Sub(v) => *self -= *v, + } + } fn sync_with(&mut self, first: &Self) { *self = *first } + + const MAX_COMPRESS_RANGE: usize = MAX_COMPRESS_RANGE; + + fn try_compress( + prev: &mut CompressibleCounterOp, + next: CompressibleCounterOp, + ) -> TryCompressResult> { + match (prev, next) { + (CompressibleCounterOp::Add(prev), CompressibleCounterOp::Add(next)) => { + *prev += next; + TryCompressResult::Compressed + } + (CompressibleCounterOp::Sub(prev), CompressibleCounterOp::Sub(next)) => { + *prev += next; + TryCompressResult::Compressed + } + (CompressibleCounterOp::Add(_), next @ CompressibleCounterOp::Sub(_)) => { + TryCompressResult::Independent(next) + } + (CompressibleCounterOp::Sub(_), CompressibleCounterOp::Add(next)) => { + TryCompressResult::Independent(CompressibleCounterOp::Add(next)) + } + (CompressibleCounterOp::Set(_), next) => TryCompressResult::Dependent(next), + (prev, CompressibleCounterOp::Set(next)) => { + *prev = CompressibleCounterOp::Set(next); + TryCompressResult::Compressed + } + } + } } diff --git a/src/write.rs b/src/write.rs index 747367e..55f5a68 100644 --- a/src/write.rs +++ b/src/write.rs @@ -4,7 +4,7 @@ use crate::Absorb; use crate::sync::{fence, Arc, AtomicUsize, MutexGuard, Ordering}; use std::collections::VecDeque; use std::marker::PhantomData; -use std::ops::DerefMut; +use std::ops::{DerefMut, Range}; use std::ptr::NonNull; #[cfg(test)] use std::sync::atomic::AtomicBool; @@ -28,7 +28,7 @@ where { epochs: crate::Epochs, w_handle: NonNull, - oplog: VecDeque, + oplog: VecDeque>, swap_index: usize, r_handle: ReadHandle, last_epochs: Vec, @@ -337,13 +337,21 @@ where // we can drain out the operations that only the w_handle copy needs // // NOTE: the if above is because drain(0..0) would remove 0 - for op in self.oplog.drain(0..self.swap_index) { + for op in self + .oplog + .drain(0..self.swap_index) + .map(|opt| opt.expect("Nones are always temporary")) + { T::absorb_second(w_handle, op, r_handle); } } // we cannot give owned operations to absorb_first // since they'll also be needed by the r_handle copy - for op in self.oplog.iter_mut() { + for op in self + .oplog + .iter_mut() + .map(|op| op.as_mut().expect("Nones are always temporary")) + { T::absorb_first(w_handle, op, r_handle); } // the w_handle copy is about to become the r_handle, and can ignore the oplog @@ -464,6 +472,7 @@ where where I: IntoIterator, { + // During the first publish cycle, always use optimization. if self.first { // Safety: we know there are no outstanding w_handle readers, since we haven't // refreshed ever before, so we can modify it directly! @@ -475,12 +484,181 @@ where for op in ops { Absorb::absorb_second(w_inner, op, &*r_handle); } + } else if T::MAX_COMPRESS_RANGE == 0 { + // If compression is disabled, use efficient, non-compressing fallback. + self.oplog.extend(ops.into_iter().map(|op| Some(op))); } else { - self.oplog.extend(ops); + // Compression is enabled + // used to avoid walking more of the oplog than necessary. + let mut rev_dirty_range = 0usize..0; + for next in ops { + self.compress_insert_op(next, &mut rev_dirty_range); + } + self.oplog_retain_some(rev_dirty_range); } } } +impl, O> WriteHandle { + /// Rev-iterate all ops appended since the last publish while attempting to combine them with the next op, + /// cut short when an attempt fails due to encountering a dependency (e.g. clear then set), or after running out of range. + fn compress_insert_op(&mut self, mut next: O, rev_dirty_range: &mut Range) { + // While debugging, make very very sure rev_dirty_range.start is correct. + debug_assert!( + self.oplog + .iter() + .rev() + .take(rev_dirty_range.start) + .find(|loc| loc.is_some()) + .is_none(), + "We never skip over any Some." + ); + debug_assert!( + self.oplog + .iter() + .rev() + .skip(rev_dirty_range.start) + .next() + .map(Option::is_some) + .unwrap_or(true), + "We start on the first Some if it exists." + ); + // used to avoid linear insertion time in case of predominantly independent ops. + let mut range_remaining = T::MAX_COMPRESS_RANGE; + // used to more efficiently insert next if possible + let mut none: Option<(usize, &mut Option)> = None; + // rev-iterate all unpublished and potentially non-none ops already in the oplog + for (prev_rev_idx, prev_loc) in { + self.oplog + .iter_mut() + .skip(self.swap_index) // only consider the fresh part of the oplog + .rev() // We need to walk it in reverse + .enumerate() // we need the reverse index for the rev_dirty_range optimization + .skip(rev_dirty_range.start.saturating_sub(1)) // skip nones at the back (except one for efficient insertion) + } { + if let Some(prev) = prev_loc.as_mut() { + match T::try_compress(prev, next) { + // The ops were successfully compressed, take prev as the new next + crate::TryCompressResult::Compressed => { + // We successfully compressed ops and therefore take the combined op as the new next,... + next = prev_loc + .take() + .expect("We just checked that prev_loc is Some."); + // ...remember the empty loc for efficient insertion,... + none.replace((prev_rev_idx, prev_loc)); + // ...and reset our range. + range_remaining = T::MAX_COMPRESS_RANGE; + // If the now empty loc is at the back of the non-none oplog we can increment rev_dirty_range.start. + if prev_rev_idx == rev_dirty_range.start { + rev_dirty_range.start += 1; + } + // If the now empty loc is before the front of the non-none oplog we need to increase rev_dirty_range.end. + if prev_rev_idx >= rev_dirty_range.end { + rev_dirty_range.end = prev_rev_idx + 1; + } + } + // The ops are independent of each other, restore next and continue + crate::TryCompressResult::Independent(re_next) => { + next = re_next; + // We consumed one of our range and need to check whether to break or continue. + range_remaining -= 1; + if range_remaining == 0 { + break; + } else { + continue; + } + } + // prev must precede next: restore next then break + crate::TryCompressResult::Dependent(re_next) => { + next = re_next; + break; + } + } + } else { + // Remember empty loc for efficient insertion + none.replace((prev_rev_idx, prev_loc)); + // If the empty loc is at the back of the non-none oplog we can increment rev_dirty_range.start. + if prev_rev_idx == rev_dirty_range.start { + rev_dirty_range.start += 1; + } + } + } + // found nothing to combine with / encountered dependency + // See if we found an empty loc during iteration, else push + if let Some((none_rev_idx, none_loc)) = none { + if let Some(_supposed_none) = none_loc.replace(next) { + unreachable!("cached None location held Some(_)"); + } + // If we inserted before the end of the non-none oplog we need to decrease rev_dirty_range.start. + if none_rev_idx < rev_dirty_range.start { + rev_dirty_range.start = none_rev_idx; + } + // If we inserted at the front of the all-some oplog we can decrement rev_dirty_range.end. + if none_rev_idx + 1 == rev_dirty_range.end { + rev_dirty_range.end -= 1; + } + } else { + debug_assert!(rev_dirty_range.start == 0, "We only push if we found no nones and we always find at least one none at the back if it is there."); + self.oplog.push_back(Some(next)); + } + } + /// stably remove temporary nones from the oplog + fn oplog_retain_some(&mut self, rev_dirty_range: Range) { + let some_len = { + // some_range is an un-inverted rev_dirty_range + let mut some_range = { + let len = self.oplog.len(); + len - rev_dirty_range.end..len - rev_dirty_range.start + }; + // Find the first none, ok to skip at least one, because we start on Some. + let mut none_range = some_range.clone(); + for none_idx in &mut none_range { + if self.oplog[none_idx].is_none() { + break; + } + } + // Use some_idx to find a none + 'find_none: for some_idx in &mut some_range { + if self.oplog[some_idx].is_none() { + // Now use none_idx to find a some + for none_idx in &mut none_range { + if self.oplog[none_idx].is_some() { + // some_idx is none, none_idx is some => swap + self.oplog.swap(some_idx, none_idx); + continue 'find_none; + } + } + // No some to swap with found => done + // Need to reverse last increment of some_range + some_range.start = some_idx; + break 'find_none; + } + } + // some_range.start either stops on the first none or is oplog.len, meaning we can now use it as a len to truncate the oplog. + some_range.start + }; + debug_assert!( + self.oplog + .iter() + .skip(some_len) + .find(|op| op.is_some()) + .is_none(), + "We never truncate off any Some." + ); + debug_assert!( + self.oplog + .iter() + .skip(self.swap_index) + .take(some_len - self.swap_index) + .find(|op| op.is_none()) + .is_none(), + "We never leave behind any None." + ); + // some_len is either the first remaining none or oplog.len() + self.oplog.truncate(some_len); + } +} + /// `WriteHandle` can be sent across thread boundaries: /// /// ``` @@ -558,8 +736,11 @@ struct CheckWriteHandleSend; #[cfg(test)] mod tests { + use std::iter::once; + use crate::sync::{AtomicUsize, Mutex, Ordering}; - use crate::Absorb; + use crate::{Absorb, TryCompressResult}; + use quickcheck_macros::quickcheck; use slab::Slab; include!("./utilities.rs"); @@ -576,6 +757,42 @@ mod tests { w.append(CounterAddOp(3)); assert_eq!(w.oplog.len(), 2); } + #[test] + fn append_test_compress() { + type Op = CompressibleCounterOp<{ usize::MAX }>; + let (mut w, r) = crate::new::(); + // Get first optimization out of the picture + assert_eq!(w.first, true); + w.append(Op::Add(8)); + assert_eq!(w.oplog.len(), 0); + assert_eq!(w.first, true); + w.publish(); + assert_eq!(*r.enter().unwrap(), 8); + assert_eq!(w.first, false); + // Adds will combine + w.append(Op::Add(7)); + w.append(Op::Add(6)); + assert_eq!(w.oplog.len(), 1); + // All Subs will combine, Add with the one from above + w.append(Op::Sub(5)); + w.extend([Op::Sub(4), Op::Add(3), Op::Sub(2)]); + assert_eq!(w.oplog.len(), 2); + // Set will clear oplog, Adds will combine + w.extend([Op::Set(1), Op::Add(2), Op::Sub(1), Op::Add(3)]); + assert_eq!(w.oplog.len(), 3); + w.publish(); + assert_eq!(*r.enter().unwrap(), 5); + // full len still 3 because only first absorb + assert_eq!(w.oplog.len(), 3); + assert_eq!(w.oplog.len(), w.swap_index); + assert_eq!(*r.enter().unwrap(), 5); + w.publish(); + assert_eq!(*r.enter().unwrap(), 5); + // now also second absorb => len == 0 + assert_eq!(w.oplog.len(), 0); + assert_eq!(w.oplog.len(), w.swap_index); + assert_eq!(*r.enter().unwrap(), 5); + } #[test] fn take_test() { @@ -615,7 +832,49 @@ mod tests { assert_eq!(*w.take(), 3); // no operations - let (w, _r) = crate::new_from_empty::(2); + let (w, _r) = crate::new_from_empty::(2); + assert_eq!(*w.take(), 2); + } + #[test] + fn take_test_compress_equiv() { + type Op = CompressibleCounterOp<{ usize::MAX }>; + // publish twice then take with no pending operations + let (mut w, _r) = crate::new_from_empty::(2); + w.append(Op::Add(1)); + w.publish(); + w.append(Op::Add(1)); + w.publish(); + assert_eq!(*w.take(), 4); + + // publish twice then pending operation published by take + let (mut w, _r) = crate::new_from_empty::(2); + w.append(Op::Add(1)); + w.publish(); + w.append(Op::Add(1)); + w.publish(); + w.append(Op::Add(2)); + assert_eq!(*w.take(), 6); + + // normal publish then pending operations published by take + let (mut w, _r) = crate::new_from_empty::(2); + w.append(Op::Add(1)); + w.publish(); + w.append(Op::Add(1)); + assert_eq!(*w.take(), 4); + + // pending operations published by take + let (mut w, _r) = crate::new_from_empty::(2); + w.append(Op::Add(1)); + assert_eq!(*w.take(), 3); + + // emptry op queue + let (mut w, _r) = crate::new_from_empty::(2); + w.append(Op::Add(1)); + w.publish(); + assert_eq!(*w.take(), 3); + + // no operations + let (w, _r) = crate::new_from_empty::(2); assert_eq!(*w.take(), 2); } @@ -623,7 +882,7 @@ mod tests { fn wait_test() { use std::sync::{Arc, Barrier}; use std::thread; - let (mut w, _r) = crate::new::(); + let (mut w, _r) = crate::new::(); // Case 1: If epoch is set to default. let test_epochs: crate::Epochs = Default::default(); @@ -684,6 +943,20 @@ mod tests { assert_eq!(w.oplog.iter().skip(w.swap_index).count(), 0); assert!(!w.has_pending_operations()); } + #[test] + fn flush_noblock_compress_equiv() { + type Op = CompressibleCounterOp<{ usize::MAX }>; + let (mut w, r) = crate::new::(); + w.append(Op::Add(42)); + w.publish(); + assert_eq!(*r.enter().unwrap(), 42); + + // pin the epoch + let _count = r.enter(); + // refresh would hang here + assert_eq!(w.oplog.iter().skip(w.swap_index).count(), 0); + assert!(!w.has_pending_operations()); + } #[test] fn flush_no_refresh() { @@ -713,4 +986,191 @@ mod tests { w.publish(); assert_eq!(w.refreshes, 4); } + #[test] + fn flush_no_refresh_compress_equiv() { + type Op = CompressibleCounterOp<{ usize::MAX }>; + let (mut w, _) = crate::new::(); + + // Until we refresh, writes are written directly instead of going to the + // oplog (because there can't be any readers on the w_handle table). + assert!(!w.has_pending_operations()); + w.publish(); + assert!(!w.has_pending_operations()); + assert_eq!(w.refreshes, 1); + + w.append(Op::Add(42)); + assert!(w.has_pending_operations()); + w.publish(); + assert!(!w.has_pending_operations()); + assert_eq!(w.refreshes, 2); + + w.append(Op::Add(42)); + assert!(w.has_pending_operations()); + w.publish(); + assert!(!w.has_pending_operations()); + assert_eq!(w.refreshes, 3); + + // Sanity check that a refresh would have been visible + assert!(!w.has_pending_operations()); + w.publish(); + assert_eq!(w.refreshes, 4); + } + #[test] + fn try_compress_default() { + assert!( + { + let mut prev = CounterAddOp(1); + match i32::try_compress(&mut prev, CounterAddOp(2)) { + TryCompressResult::Dependent(CounterAddOp(2)) => prev.0 == 1, + _ => false, + } + }, + "Default return value of Absorb::try_compress differs from a no-op TryCompressResult::Dependent" + ); + } + #[test] + fn limited_compress_range() { + type Op = CompressibleCounterOp<1>; + let (mut w, r) = crate::new::(); + // Get first optimization out of the picture + w.publish(); + assert_eq!(*r.enter().unwrap(), 0); + assert_eq!(w.first, false); + // Both Adds will combine + w.append(Op::Add(7)); + w.append(Op::Add(6)); + assert_eq!(w.oplog.len(), 1); + // First Sub will combine, Add and second Sub get stopped by range before finding others. + w.append(Op::Sub(5)); + w.extend([Op::Sub(4), Op::Add(3), Op::Sub(2)]); + assert_eq!(w.oplog.len(), 4); + // Set still consumes everything because the range keeps resetting, Add and Sub block each other like above. + w.extend([Op::Set(1), Op::Add(2), Op::Sub(1), Op::Add(3)]); + assert_eq!(w.oplog.len(), 4); + w.publish(); + assert_eq!(*r.enter().unwrap(), 5); + // full len still 4 because only first absorb + assert_eq!(w.oplog.len(), 4); + assert_eq!(w.oplog.len(), w.swap_index); + assert_eq!(*r.enter().unwrap(), 5); + w.publish(); + assert_eq!(*r.enter().unwrap(), 5); + // now also second absorb => len == 0 + assert_eq!(w.oplog.len(), 0); + assert_eq!(w.oplog.len(), w.swap_index); + assert_eq!(*r.enter().unwrap(), 5); + } + #[test] + fn rev_dirty_range_start_exploit_new_none_bridge() { + type Op = CompressibleCounterOp<{ usize::MAX }>; + let (mut w, _r) = crate::new::(); + // Get first optimization out of the picture + w.publish(); + assert_eq!(w.first, false); + // Force contrived oplog, causes Sub of second extend to remove the first Sub during compression, + // bridging the gap between Nones, which rev_dirty_range.start is able to exploit. + w.oplog.extend([ + Some(Op::Add(3)), + Some(Op::Add(2)), + Some(Op::Sub(1)), + Some(Op::Add(1)), + ]); + let mut rev_dirty_range = 0..0; + for op in [Op::Add(1), Op::Sub(1), Op::Add(1)] { + w.compress_insert_op(op, &mut rev_dirty_range); + } + // rev_dirty_range.end is 3 instead of 2 because when inserting last Add at rev_idx: 3 it can't know about Sub at rev_idx: 2. + assert_eq!(rev_dirty_range, 2..3); + w.oplog + .iter() + .zip([Some(Op::Add(8)), Some(Op::Sub(2)), None, None]) + .for_each(|(op, expected)| assert_eq!(*op, expected)); + w.oplog_retain_some(rev_dirty_range); + w.oplog + .iter() + .zip([Some(Op::Add(8)), Some(Op::Sub(2))]) + .for_each(|(op, expected)| assert_eq!(*op, expected)); + } + #[test] + fn oplog_remove_nones_early_stop() { + type Op = CompressibleCounterOp<{ usize::MAX }>; + let (mut w, _r) = crate::new::(); + // Get first optimization out of the picture + w.publish(); + assert_eq!(w.first, false); + // Force contrived oplog which causes none removal to stop early after failing to find a Some to swap a None with. + w.oplog + .extend([Some(Op::Add(3)), Some(Op::Sub(1)), None, Some(Op::Sub(1))]); + w.append(Op::Add(1)); + w.oplog + .iter() + .zip([Some(Op::Add(4)), Some(Op::Sub(1)), Some(Op::Sub(1))]) + .for_each(|(op, expected)| assert_eq!(*op, expected)); + } + #[quickcheck] + fn compress_correct(input: (Vec, Vec<(u8, bool, bool)>)) -> bool { + // !IMPORTANT!: `input: Vec<(Vec, bool, bool)>` would be more thorough, convenient and concise, but completely blows up miri. + type Op = CompressibleCounterOp<2>; + let (mut w, _) = crate::new::(); + // Get non-compressing first optimization out of the picture + w.publish(); + assert_eq!(w.first, false); + + // Map numbers to Ops, insert and publish them + let mut remaining = input.0.len(); + let mut ops = input.0.into_iter(); + let mut chunks = once((0, true, true)).chain(input.1.into_iter()).cycle(); + let mut expected = 0; + + while remaining > 0 { + let (len, compress, publish) = chunks.next().unwrap(); + // To have no empty and more extends per test run + let len = (len as usize & 0xF) + 1; + remaining = remaining.saturating_sub(len); + let chunk = (&mut ops).take(len).map(|x| { + let x = x as i32; + if x > 0 { + expected += x; + Op::Add(x) + } else if x < 0 { + expected += x; + Op::Sub(-x) + } else { + expected = 0; + Op::Set(0) + } + }); + if compress { + w.extend(chunk); + } else { + // Occasionally not compressing covers more corner cases + w.oplog.extend(chunk.map(|op| Some(op))); + } + if publish { + w.publish(); + } + } + assert_eq!(ops.next(), None); + let val = *w.take(); + // Check if value correct + val == expected + } + #[test] + fn oplog_retain_some_none_range_start_some() { + type Op = CompressibleCounterOp<2>; + let (mut w, _) = crate::new::(); + // Get non-compressing first optimization out of the picture + w.publish(); + assert_eq!(w.first, false); + w.oplog.extend([ + Some(Op::Sub(1)), + Some(Op::Add(1)), + Some(Op::Sub(1)), + Some(Op::Sub(1)), + Some(Op::Add(1)), + ]); + // Causes none_range to start on a some due to rev_dirty_range.end not jumping over first add after re-inserting first sub. + // Makes sure it can cope and doesn't panic. + w.extend([Op::Sub(1), Op::Sub(1)]); + } } diff --git a/tests/loom.rs b/tests/loom.rs index 68fa136..6e83164 100644 --- a/tests/loom.rs +++ b/tests/loom.rs @@ -1,9 +1,8 @@ -#[cfg(loom)] -#[cfg(test)] +#[cfg(all(test, loom))] mod loom_tests { // Evil hack to share CounterAddOp between // unit tests and integration tests. - use left_right::Absorb; + use left_right::{Absorb, TryCompressResult}; include!("../src/utilities.rs"); use loom::thread;