-
Notifications
You must be signed in to change notification settings - Fork 96
Allow for compression of the oplog in case of mergeable operations. #100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 24 commits
480edca
393255e
e238b4f
0c430ec
04adefa
abcf063
1ca8a2a
4f5d9fe
5bc3699
0aa5900
722c318
f22dc65
6c7bfd7
3a37d0d
c6aa598
f0c9966
cde672f
f74cadd
000d512
cec99e0
0382332
f49adca
13c380e
5f58376
4b61b26
1ca7bbc
e3ff872
5138829
b852b9c
6ca3be6
b6cbabe
14c2c9c
0016728
e2a5b82
1524b18
f512288
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,207 @@ | ||
| use std::collections::{BTreeMap, HashMap}; | ||
|
|
||
| use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; | ||
| mod utilities; | ||
| use left_right::*; | ||
| use utilities::*; | ||
|
|
||
| // Number of ops to insert/publish in total | ||
| const LEN: usize = 1 << 16; | ||
| // Number of ops per extend | ||
| const CHUNK_LEN: usize = 1 << 6; | ||
| // Number of ops between publishes | ||
| const FREQ: usize = 1 << 10; | ||
|
|
||
| fn hash_max(c: &mut Criterion) { | ||
| c.bench_function("hash_max", |b| { | ||
| b.iter_batched( | ||
| || { | ||
| let ops = random_ops(LEN); | ||
| let (w, _) = new::<HashMap<_, _>, MapOp<{ usize::MAX }>>(); | ||
| (ops, w) | ||
| }, | ||
| |(mut ops, mut w)| { | ||
| let mut log_len = 0; | ||
| while !ops.is_empty() { | ||
| w.extend(ops.drain(0..CHUNK_LEN)); | ||
| log_len += CHUNK_LEN; | ||
| if log_len >= FREQ { | ||
| log_len -= FREQ; | ||
| w.publish(); | ||
| } | ||
| } | ||
| }, | ||
| BatchSize::LargeInput, | ||
| ) | ||
| }); | ||
| } | ||
|
|
||
| fn btree_max(c: &mut Criterion) { | ||
| c.bench_function("btree_max", |b| { | ||
| b.iter_batched( | ||
| || { | ||
| let ops = random_ops(LEN); | ||
| let (w, _) = new::<BTreeMap<_, _>, MapOp<{ usize::MAX }>>(); | ||
| (ops, w) | ||
| }, | ||
| |(mut ops, mut w)| { | ||
| let mut log_len = 0; | ||
| while !ops.is_empty() { | ||
| w.extend(ops.drain(0..CHUNK_LEN)); | ||
| log_len += CHUNK_LEN; | ||
| if log_len >= FREQ { | ||
| log_len -= FREQ; | ||
| w.publish(); | ||
| } | ||
| } | ||
| }, | ||
| BatchSize::LargeInput, | ||
| ) | ||
| }); | ||
| } | ||
| fn hash_1(c: &mut Criterion) { | ||
| c.bench_function("hash_1", |b| { | ||
| b.iter_batched( | ||
| || { | ||
| let ops = random_ops(LEN); | ||
| let (w, _) = new::<HashMap<_, _>, MapOp<1>>(); | ||
| (ops, w) | ||
| }, | ||
| |(mut ops, mut w)| { | ||
| let mut log_len = 0; | ||
| while !ops.is_empty() { | ||
| w.extend(ops.drain(0..CHUNK_LEN)); | ||
| log_len += CHUNK_LEN; | ||
| if log_len >= FREQ { | ||
| log_len -= FREQ; | ||
| w.publish(); | ||
| } | ||
| } | ||
| }, | ||
| BatchSize::LargeInput, | ||
| ) | ||
| }); | ||
| } | ||
|
|
||
| fn btree_1(c: &mut Criterion) { | ||
| c.bench_function("btree_1", |b| { | ||
| b.iter_batched( | ||
| || { | ||
| let ops = random_ops(LEN); | ||
| let (w, _) = new::<BTreeMap<_, _>, MapOp<1>>(); | ||
| (ops, w) | ||
| }, | ||
| |(mut ops, mut w)| { | ||
| let mut log_len = 0; | ||
| while !ops.is_empty() { | ||
| w.extend(ops.drain(0..CHUNK_LEN)); | ||
| log_len += CHUNK_LEN; | ||
| if log_len >= FREQ { | ||
| log_len -= FREQ; | ||
| w.publish(); | ||
| } | ||
| } | ||
| }, | ||
| BatchSize::LargeInput, | ||
| ) | ||
| }); | ||
| } | ||
| fn hash_16(c: &mut Criterion) { | ||
| c.bench_function("hash_16", |b| { | ||
| b.iter_batched( | ||
| || { | ||
| let ops = random_ops(LEN); | ||
| let (w, _) = new::<HashMap<_, _>, MapOp<16>>(); | ||
| (ops, w) | ||
| }, | ||
| |(mut ops, mut w)| { | ||
| let mut log_len = 0; | ||
| while !ops.is_empty() { | ||
| w.extend(ops.drain(0..CHUNK_LEN)); | ||
| log_len += CHUNK_LEN; | ||
| if log_len >= FREQ { | ||
| log_len -= FREQ; | ||
| w.publish(); | ||
| } | ||
| } | ||
| }, | ||
| BatchSize::LargeInput, | ||
| ) | ||
| }); | ||
| } | ||
|
|
||
| fn btree_16(c: &mut Criterion) { | ||
| c.bench_function("btree_16", |b| { | ||
| b.iter_batched( | ||
| || { | ||
| let ops = random_ops(LEN); | ||
| let (w, _) = new::<BTreeMap<_, _>, MapOp<16>>(); | ||
| (ops, w) | ||
| }, | ||
| |(mut ops, mut w)| { | ||
| let mut log_len = 0; | ||
| while !ops.is_empty() { | ||
| w.extend(ops.drain(0..CHUNK_LEN)); | ||
| log_len += CHUNK_LEN; | ||
| if log_len >= FREQ { | ||
| log_len -= FREQ; | ||
| w.publish(); | ||
| } | ||
| } | ||
| }, | ||
| BatchSize::LargeInput, | ||
| ) | ||
| }); | ||
| } | ||
| fn hash_none(c: &mut Criterion) { | ||
| c.bench_function("hash_none", |b| { | ||
| b.iter_batched( | ||
| || { | ||
| let ops = random_ops(LEN); | ||
| let (w, _) = new::<HashMap<_, _>, MapOp<0>>(); | ||
| (ops, w) | ||
| }, | ||
| |(mut ops, mut w)| { | ||
| let mut log_len = 0; | ||
| while !ops.is_empty() { | ||
| w.extend(ops.drain(0..CHUNK_LEN)); | ||
| log_len += CHUNK_LEN; | ||
| if log_len >= FREQ { | ||
| log_len -= FREQ; | ||
| w.publish(); | ||
| } | ||
| } | ||
| }, | ||
| BatchSize::LargeInput, | ||
| ) | ||
| }); | ||
| } | ||
|
|
||
| fn btree_none(c: &mut Criterion) { | ||
| c.bench_function("btree_none", |b| { | ||
| b.iter_batched( | ||
| || { | ||
| let ops = random_ops(LEN); | ||
| let (w, _) = new::<BTreeMap<_, _>, MapOp<0>>(); | ||
| (ops, w) | ||
| }, | ||
| |(mut ops, mut w)| { | ||
| let mut log_len = 0; | ||
| while !ops.is_empty() { | ||
| w.extend(ops.drain(0..CHUNK_LEN)); | ||
| log_len += CHUNK_LEN; | ||
| if log_len >= FREQ { | ||
| log_len -= FREQ; | ||
| w.publish(); | ||
| } | ||
| } | ||
| }, | ||
| BatchSize::LargeInput, | ||
| ) | ||
| }); | ||
| } | ||
|
|
||
| criterion_group!( | ||
| benches, btree_max, btree_16, btree_1, btree_none, hash_max, hash_16, hash_1, hash_none, | ||
| ); | ||
| criterion_main!(benches); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| #![allow(dead_code)] | ||
| use left_right::*; | ||
| use rand::{distributions::Uniform, Rng}; | ||
| use std::collections::{BTreeMap, HashMap, VecDeque}; | ||
|
|
||
| pub(crate) fn random_ops<const RANGE: usize>(len: usize) -> VecDeque<MapOp<RANGE>> { | ||
| let rng = rand::thread_rng(); | ||
| let dist = Uniform::new(0, usize::MAX); | ||
| rng.sample_iter(&dist) | ||
| .take(len) | ||
| .map(|x| { | ||
| // 64 keys, low(current) favors heavy compression, higher favors low/no compression | ||
| let key = x & !((!1) << 6); | ||
| // rest value | ||
| let value = x >> 6; | ||
| // One in 1024 is MapOp::Clear, low(current) favors heavy compression, higher favors low/no compression | ||
| if x & !((!1) << 10) == 0 { | ||
| MapOp::Clear | ||
| } else { | ||
| // We are using a Map of Strings to have non-trivial operations. | ||
| MapOp::Set(key, format!("value of {:?} is: {:?}", key, value)) | ||
| } | ||
| }) | ||
| .collect() | ||
| } | ||
|
|
||
| pub(crate) enum MapOp<const RANGE: usize> { | ||
| Set(usize, String), | ||
| Clear, | ||
| } | ||
| impl<const RANGE: usize> Absorb<MapOp<RANGE>> for HashMap<usize, String> { | ||
| fn absorb_first(&mut self, operation: &mut MapOp<RANGE>, _: &Self) { | ||
| match operation { | ||
| MapOp::Set(key, value) => { | ||
| if let Some(loc) = self.get_mut(key) { | ||
| *loc = value.clone(); | ||
| } else { | ||
| self.insert(*key, value.clone()); | ||
| } | ||
| } | ||
| MapOp::Clear => { | ||
| self.clear(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn sync_with(&mut self, first: &Self) { | ||
| *self = first.clone(); | ||
| } | ||
|
|
||
| const MAX_COMPRESS_RANGE: usize = RANGE; | ||
| fn try_compress( | ||
| mut prev: &mut MapOp<RANGE>, | ||
| next: MapOp<RANGE>, | ||
| ) -> TryCompressResult<MapOp<RANGE>> { | ||
| match (&mut prev, next) { | ||
| (MapOp::Set(prev_key, prev_value), MapOp::Set(key, value)) => { | ||
| if *prev_key == key { | ||
| *prev_value = value; | ||
| TryCompressResult::Compressed | ||
| } else { | ||
| TryCompressResult::Independent(MapOp::Set(key, value)) | ||
| } | ||
| } | ||
| (_, MapOp::Clear) => { | ||
| *prev = MapOp::Clear; | ||
| TryCompressResult::Compressed | ||
| } | ||
| (MapOp::Clear, next @ MapOp::Set(_, _)) => TryCompressResult::Dependent(next), | ||
| } | ||
| } | ||
| } | ||
| impl<const RANGE: usize> Absorb<MapOp<RANGE>> for BTreeMap<usize, String> { | ||
| fn absorb_first(&mut self, operation: &mut MapOp<RANGE>, _: &Self) { | ||
| match operation { | ||
| MapOp::Set(key, value) => { | ||
| if let Some(loc) = self.get_mut(key) { | ||
| *loc = value.clone(); | ||
| } else { | ||
| self.insert(*key, value.clone()); | ||
| } | ||
| } | ||
| MapOp::Clear => { | ||
| self.clear(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn sync_with(&mut self, first: &Self) { | ||
| *self = first.clone(); | ||
| } | ||
|
|
||
| const MAX_COMPRESS_RANGE: usize = RANGE; | ||
| fn try_compress( | ||
| mut prev: &mut MapOp<RANGE>, | ||
| next: MapOp<RANGE>, | ||
| ) -> TryCompressResult<MapOp<RANGE>> { | ||
| match (&mut prev, next) { | ||
| (MapOp::Set(prev_key, prev_value), MapOp::Set(key, value)) => { | ||
| if *prev_key == key { | ||
| *prev_value = value; | ||
| TryCompressResult::Compressed | ||
| } else { | ||
| TryCompressResult::Independent(MapOp::Set(key, value)) | ||
| } | ||
| } | ||
| (_, MapOp::Clear) => { | ||
| *prev = MapOp::Clear; | ||
| TryCompressResult::Compressed | ||
| } | ||
| (MapOp::Clear, next @ MapOp::Set(_, _)) => TryCompressResult::Dependent(next), | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -189,6 +189,22 @@ pub use crate::read::{ReadGuard, ReadHandle, ReadHandleFactory}; | |
|
|
||
| pub mod aliasing; | ||
|
|
||
| /// The result of calling [`Absorb::try_compress`](Absorb::try_compress). | ||
| #[derive(Debug)] | ||
| pub enum TryCompressResult<O> { | ||
| /// Returned when [`try_compress`](Absorb::try_compress) was successful | ||
| /// and `prev` is now the combined operation after consuming `next` | ||
| /// and can be used as the new `next` to continue our current attempt at compression with the next `prev`. | ||
Wulf0x67E7 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Compressed, | ||
| /// Returned when [`try_compress`](Absorb::try_compress) failed because `prev` and `next` are independent of each other | ||
| /// and can't be compressed together, though `next` may precede `prev` in the oplog, | ||
| /// meaning we can resume our attempt at compression with the next `prev`. Contains `next`. | ||
Wulf0x67E7 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Independent(O), | ||
| /// Returned when [`try_compress`](Absorb::try_compress) failed because `prev` must precede `next`, | ||
| /// halting any further attempt to compress `next` before it's insertion. Contains `next`. | ||
Wulf0x67E7 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Dependent(O), | ||
| } | ||
|
|
||
| /// Types that can incorporate operations of type `O`. | ||
| /// | ||
| /// This trait allows `left-right` to keep the two copies of the underlying data structure (see the | ||
|
|
@@ -261,6 +277,31 @@ pub trait Absorb<O> { | |
| /// subtly affect results like the `RandomState` of a `HashMap` which can change iteration | ||
| /// order. | ||
| fn sync_with(&mut self, first: &Self); | ||
|
|
||
| /// Range at which [`WriteHandle`] tries to compress the oplog, reset each time a compression succeeds. | ||
| /// | ||
| /// Can be used to avoid having insertion into the oplog be O(oplog.len * ops.len) if it is filled with mainly independent ops. | ||
| /// | ||
| /// Defaults to `0`, which disables compression and allows the usage of an efficient fallback. | ||
| const MAX_COMPRESS_RANGE: usize = 0; | ||
|
|
||
| /// Try to compress two ops into a single op to optimize the oplog. | ||
| /// | ||
| /// `prev` is the target of the compression and temporarily removed from the oplog, `next` is the op to be inserted. | ||
| /// | ||
| /// A return value of [`TryCompressResult::Compressed`] means the ops were successfully compressed, | ||
| /// [`TryCompressResult::Independent`] that while the ops can't be compressed, `next` can safely precede `prev`, | ||
| /// and [`TryCompressResult::Dependent`] that they can not be compressed, and `prev` must precede `next`. | ||
| /// | ||
| /// Defaults to [`TryCompressResult::Dependent`], which sub-optimally disables compression. | ||
| /// Setting [`Self::MAX_COMPRESS_RANGE`](Absorb::MAX_COMPRESS_RANGE) to or leaving it at it's default of `0` is vastly more efficient for that. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think rustfmt may complain about the line wrapping of this comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have vscode configured to automatically run |
||
| fn try_compress(prev: &mut O, next: O) -> TryCompressResult<O> { | ||
| // yes, unnecessary, but: makes it so that prev is not an unused variable | ||
| // and really matches the mental model of 'all ops are dependent'. | ||
| match prev { | ||
| _ => TryCompressResult::Dependent(next), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Construct a new write and read handle pair from an empty data structure. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you post the results from these once you feel like they're in a decent place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do. Done for today though. In their current state compression with unlimited range against no compression is about 20% faster for btreemap and about 35% faster for hashmap, with limited compression falling somewhere in between. Considering that due to the higher lookup-cost btreemap should be the one benefitting more there definitely is something fishy going on.