From 16de6fb06af798d6ba466d0094340c18140c8b21 Mon Sep 17 00:00:00 2001 From: Lol3rrr Date: Sat, 19 Mar 2022 14:42:32 +0100 Subject: [PATCH 01/13] Writer now has custom yield The WriteHandle now stores a function pointer to a yield function, which will be called to avoid simply spinning while waiting for all the Readers to move on. This function can be specified when creating the WriteHandle and will default to std::thread::yield_now. --- src/lib.rs | 24 ++++++++++++++++++++++++ src/write.rs | 17 +++++++++++++++-- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e535198..de54fde 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -277,6 +277,18 @@ where (w, r) } +/// TODO +pub fn new_from_empty_with_yield(t: T, yield_fn: fn()) -> (WriteHandle, ReadHandle) +where + T: Absorb + Clone, +{ + let epochs = Default::default(); + + let r = ReadHandle::new(t.clone(), Arc::clone(&epochs)); + let w = WriteHandle::new_with_yield(t, epochs, r.clone(), yield_fn); + (w, r) +} + /// Construct a new write and read handle pair from the data structure default. /// /// The type must implement `Default` so we can construct two empty instances. You must ensure that @@ -297,3 +309,15 @@ where let w = WriteHandle::new(T::default(), epochs, r.clone()); (w, r) } + +/// TODO +pub fn new_with_yield(yield_fn: fn()) -> (WriteHandle, ReadHandle) +where + T: Absorb + Default, +{ + let epochs = Default::default(); + + let r = ReadHandle::new(T::default(), Arc::clone(&epochs)); + let w = WriteHandle::new_with_yield(T::default(), epochs, r.clone(), yield_fn); + (w, r) +} diff --git a/src/write.rs b/src/write.rs index 747367e..eef8d52 100644 --- a/src/write.rs +++ b/src/write.rs @@ -3,12 +3,12 @@ use crate::Absorb; use crate::sync::{fence, Arc, AtomicUsize, MutexGuard, Ordering}; use std::collections::VecDeque; +use std::fmt; use std::marker::PhantomData; use std::ops::DerefMut; use std::ptr::NonNull; #[cfg(test)] use std::sync::atomic::AtomicBool; -use std::{fmt, thread}; /// A writer handle to a left-right guarded data structure. /// @@ -42,6 +42,9 @@ where second: bool, /// If we call `Self::take` the drop needs to be different. taken: bool, + /// This function will be used to yield the current execution instead of just spinning while + /// waiting for all readers to move on + yield_fn: fn(), } // safety: if a `WriteHandle` is sent across a thread boundary, we need to be able to take @@ -213,6 +216,15 @@ where T: Absorb, { pub(crate) fn new(w_handle: T, epochs: crate::Epochs, r_handle: ReadHandle) -> Self { + Self::new_with_yield(w_handle, epochs, r_handle, std::thread::yield_now) + } + + pub(crate) fn new_with_yield( + w_handle: T, + epochs: crate::Epochs, + r_handle: ReadHandle, + yield_fn: fn(), + ) -> Self { Self { epochs, // safety: Box is not null and covariant. @@ -228,6 +240,7 @@ where first: true, second: true, taken: false, + yield_fn, } } @@ -275,7 +288,7 @@ where if iter != 20 { iter += 1; } else { - thread::yield_now(); + (self.yield_fn)(); } } From f5dde6b06c7584b1c8a6dc15f50631017a0209e6 Mon Sep 17 00:00:00 2001 From: Lol3rrr Date: Sun, 20 Mar 2022 18:29:36 +0100 Subject: [PATCH 02/13] Improved the Documentation for the new methods --- src/lib.rs | 44 ++++++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index de54fde..3fd4f68 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -263,21 +263,23 @@ pub trait Absorb { fn sync_with(&mut self, first: &Self); } -/// Construct a new write and read handle pair from an empty data structure. +/// Construct a new write and read handle pair from an empty data structure with +/// [yield_now](std::thread::yield_now) as the yield function. /// -/// The type must implement `Clone` so we can construct the second copy from the first. +/// See [new_from_empty_with_yield] for a more detailed explaination pub fn new_from_empty(t: T) -> (WriteHandle, ReadHandle) where T: Absorb + Clone, { - let epochs = Default::default(); - - let r = ReadHandle::new(t.clone(), Arc::clone(&epochs)); - let w = WriteHandle::new(t, epochs, r.clone()); - (w, r) + new_from_empty_with_yield(t, std::thread::yield_now) } -/// TODO +/// Construct a new write and read handle pair from an empty data structure. +/// +/// The type must implement `Clone` so we can construct the second copy from the first. +/// +/// The `yield_fn` will be called by the writer while its waiting for all the readers to move on +/// after a refresh. This allows for more efficient waiting instead of simply spinning. pub fn new_from_empty_with_yield(t: T, yield_fn: fn()) -> (WriteHandle, ReadHandle) where T: Absorb + Clone, @@ -289,6 +291,17 @@ where (w, r) } +/// Construct a new write and read handle pair from the data structure default, with +/// [yield_now](std::thread::yield_now) as the yield function. +/// +/// See [new_with_yield] for a more detailed explaination +pub fn new() -> (WriteHandle, ReadHandle) +where + T: Absorb + Default, +{ + new_with_yield(std::thread::yield_now) +} + /// Construct a new write and read handle pair from the data structure default. /// /// The type must implement `Default` so we can construct two empty instances. You must ensure that @@ -299,18 +312,9 @@ where /// /// If your type's `Default` implementation does not guarantee this, you can use `new_from_empty`, /// which relies on `Clone` instead of `Default`. -pub fn new() -> (WriteHandle, ReadHandle) -where - T: Absorb + Default, -{ - let epochs = Default::default(); - - let r = ReadHandle::new(T::default(), Arc::clone(&epochs)); - let w = WriteHandle::new(T::default(), epochs, r.clone()); - (w, r) -} - -/// TODO +/// +/// The `yield_fn` will be called by the writer while its waiting for all the readers to move on +/// after a refresh. This allows for more efficient waiting instead of simply spinning. pub fn new_with_yield(yield_fn: fn()) -> (WriteHandle, ReadHandle) where T: Absorb + Default, From 328667bf4838a9c6c3f9fccb838096cf45c4d913 Mon Sep 17 00:00:00 2001 From: Lol3rrr Date: Sun, 20 Mar 2022 19:05:58 +0100 Subject: [PATCH 03/13] Added "std" feature and replaced most uses of "std" Added the "std" feature to the Crate, which is enabled by default, to allow for switching between no_std and std. Then replaced most uses of "std" with either "core" or "alloc" depending on the Type, however it is still unclear how we deal with the Mutex/MutexGuard Types as these are currently needed but not supported in no_std directly --- Cargo.toml | 4 ++++ src/aliasing.rs | 29 ++++++++++++++++------------- src/lib.rs | 7 +++++++ src/read.rs | 9 +++++---- src/read/factory.rs | 2 +- src/read/guard.rs | 6 +++--- src/sync.rs | 6 ++++-- src/write.rs | 22 +++++++++------------- 8 files changed, 49 insertions(+), 36 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 451e379..1425691 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,5 +14,9 @@ categories = ["concurrency"] [dependencies] slab = "0.4" +[features] +default = ["std"] +std = [] + [target.'cfg(loom)'.dependencies] loom = "0.4.0" diff --git a/src/aliasing.rs b/src/aliasing.rs index 96a5ea7..6d4c225 100644 --- a/src/aliasing.rs +++ b/src/aliasing.rs @@ -128,9 +128,10 @@ //! //! But this warrants repeating: **your `D` types for `Aliased` _must_ be private**. -use std::marker::PhantomData; -use std::mem::MaybeUninit; -use std::ops::Deref; +use alloc::{boxed::Box, string::String, vec::Vec}; +use core::marker::PhantomData; +use core::mem::MaybeUninit; +use core::ops::Deref; // Just to make the doc comment linking work. #[allow(unused_imports)] @@ -183,7 +184,7 @@ where // a) the T is behind a MaybeUninit, and so will cannot be accessed safely; and // b) we only expose _either_ &T while aliased, or &mut after the aliasing ends. Aliased { - aliased: std::ptr::read(&self.aliased), + aliased: core::ptr::read(&self.aliased), drop_behavior: PhantomData, _no_auto_send: PhantomData, } @@ -211,7 +212,7 @@ where pub unsafe fn change_drop(self) -> Aliased { Aliased { // safety: - aliased: std::ptr::read(&self.aliased), + aliased: core::ptr::read(&self.aliased), drop_behavior: PhantomData, _no_auto_send: PhantomData, } @@ -247,7 +248,7 @@ where // That T has not been dropped (getting a Aliased is unsafe). // T is no longer aliased (by the safety assumption of getting a Aliased), // so we are allowed to re-take ownership of the T. - unsafe { std::ptr::drop_in_place(self.aliased.as_mut_ptr()) } + unsafe { core::ptr::drop_in_place(self.aliased.as_mut_ptr()) } } } } @@ -276,7 +277,7 @@ where } } -use std::hash::{Hash, Hasher}; +use core::hash::{Hash, Hasher}; impl Hash for Aliased where D: DropBehavior, @@ -290,7 +291,7 @@ where } } -use std::fmt; +use core::fmt; impl fmt::Debug for Aliased where D: DropBehavior, @@ -323,7 +324,7 @@ where D: DropBehavior, T: PartialOrd, { - fn partial_cmp(&self, other: &Self) -> Option { + fn partial_cmp(&self, other: &Self) -> Option { self.as_ref().partial_cmp(other.as_ref()) } @@ -349,12 +350,12 @@ where D: DropBehavior, T: Ord, { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { + fn cmp(&self, other: &Self) -> core::cmp::Ordering { self.as_ref().cmp(other.as_ref()) } } -use std::borrow::Borrow; +use core::borrow::Borrow; impl Borrow for Aliased where D: DropBehavior, @@ -385,6 +386,8 @@ where self.as_ref() } } + +#[cfg(feature = "std")] impl Borrow for Aliased where D: DropBehavior, @@ -410,7 +413,7 @@ where self.as_ref() } } -impl Borrow for Aliased, D> +impl Borrow for Aliased, D> where T: ?Sized, D: DropBehavior, @@ -419,7 +422,7 @@ where self.as_ref() } } -impl Borrow for Aliased, D> +impl Borrow for Aliased, D> where T: ?Sized, D: DropBehavior, diff --git a/src/lib.rs b/src/lib.rs index 3fd4f68..bc9fd57 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#![cfg_attr(not(feature = "std"), no_std)] //! A concurrency primitive for high concurrency reads over a single-writer data structure. //! //! The primitive keeps two copies of the backing data structure, one that is accessed by readers, @@ -174,9 +175,13 @@ )] #![allow(clippy::type_complexity)] +// Needed for no_std support +extern crate alloc; + mod sync; use crate::sync::{Arc, AtomicUsize, Mutex}; +use alloc::boxed::Box; type Epochs = Arc>>>; @@ -267,6 +272,7 @@ pub trait Absorb { /// [yield_now](std::thread::yield_now) as the yield function. /// /// See [new_from_empty_with_yield] for a more detailed explaination +#[cfg(feature = "std")] pub fn new_from_empty(t: T) -> (WriteHandle, ReadHandle) where T: Absorb + Clone, @@ -295,6 +301,7 @@ where /// [yield_now](std::thread::yield_now) as the yield function. /// /// See [new_with_yield] for a more detailed explaination +#[cfg(feature = "std")] pub fn new() -> (WriteHandle, ReadHandle) where T: Absorb + Default, diff --git a/src/read.rs b/src/read.rs index d38784f..a493449 100644 --- a/src/read.rs +++ b/src/read.rs @@ -1,8 +1,9 @@ use crate::sync::{fence, Arc, AtomicPtr, AtomicUsize, Ordering}; -use std::cell::Cell; -use std::fmt; -use std::marker::PhantomData; -use std::ptr::NonNull; +use alloc::boxed::Box; +use core::cell::Cell; +use core::fmt; +use core::marker::PhantomData; +use core::ptr::NonNull; // To make [`WriteHandle`] and friends work. #[cfg(doc)] diff --git a/src/read/factory.rs b/src/read/factory.rs index 147e6d2..9305d67 100644 --- a/src/read/factory.rs +++ b/src/read/factory.rs @@ -1,6 +1,6 @@ use super::ReadHandle; use crate::sync::{Arc, AtomicPtr}; -use std::fmt; +use core::fmt; /// A type that is both `Sync` and `Send` and lets you produce new [`ReadHandle`] instances. /// diff --git a/src/read/guard.rs b/src/read/guard.rs index 308ef62..4f2b40a 100644 --- a/src/read/guard.rs +++ b/src/read/guard.rs @@ -1,6 +1,6 @@ use crate::sync::{AtomicUsize, Ordering}; -use std::cell::Cell; -use std::mem; +use core::cell::Cell; +use core::mem; #[derive(Debug, Copy, Clone)] pub(super) struct ReadHandleState<'rh> { @@ -107,7 +107,7 @@ impl<'rh, T: ?Sized> AsRef for ReadGuard<'rh, T> { } } -impl<'rh, T: ?Sized> std::ops::Deref for ReadGuard<'rh, T> { +impl<'rh, T: ?Sized> core::ops::Deref for ReadGuard<'rh, T> { type Target = T; fn deref(&self) -> &Self::Target { self.t diff --git a/src/sync.rs b/src/sync.rs index 3a87edf..5463e66 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -17,6 +17,8 @@ pub(crate) fn fence(ord: Ordering) { } #[cfg(not(loom))] -pub(crate) use std::sync::atomic::{fence, AtomicPtr, AtomicUsize, Ordering}; +pub(crate) use alloc::sync::Arc; #[cfg(not(loom))] -pub(crate) use std::sync::{Arc, Mutex, MutexGuard}; +pub(crate) use core::sync::atomic::{fence, AtomicPtr, AtomicUsize, Ordering}; +#[cfg(not(loom))] +pub(crate) use std::sync::{Mutex, MutexGuard}; diff --git a/src/write.rs b/src/write.rs index eef8d52..03a6477 100644 --- a/src/write.rs +++ b/src/write.rs @@ -2,11 +2,11 @@ use crate::read::ReadHandle; use crate::Absorb; use crate::sync::{fence, Arc, AtomicUsize, MutexGuard, Ordering}; -use std::collections::VecDeque; -use std::fmt; -use std::marker::PhantomData; -use std::ops::DerefMut; -use std::ptr::NonNull; +use alloc::{boxed::Box, collections::VecDeque, vec::Vec}; +use core::fmt; +use core::marker::PhantomData; +use core::ops::DerefMut; +use core::ptr::NonNull; #[cfg(test)] use std::sync::atomic::AtomicBool; @@ -86,7 +86,7 @@ pub struct Taken, O> { _marker: PhantomData, } -impl + std::fmt::Debug, O> std::fmt::Debug for Taken { +impl + core::fmt::Debug, O> core::fmt::Debug for Taken { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Taken") .field( @@ -148,7 +148,7 @@ where /// have departed. Then it uses [`Absorb::drop_first`] to drop one of the copies of the data and /// returns the other copy as a [`Taken`] smart pointer. fn take_inner(&mut self) -> Option> { - use std::ptr; + use core::ptr; // Can only take inner once. if self.taken { return None; @@ -215,10 +215,6 @@ impl WriteHandle where T: Absorb, { - pub(crate) fn new(w_handle: T, epochs: crate::Epochs, r_handle: ReadHandle) -> Self { - Self::new_with_yield(w_handle, epochs, r_handle, std::thread::yield_now) - } - pub(crate) fn new_with_yield( w_handle: T, epochs: crate::Epochs, @@ -422,7 +418,7 @@ where /// /// Its effects will not be exposed to readers until you call [`publish`](Self::publish). pub fn append(&mut self, op: O) -> &mut Self { - self.extend(std::iter::once(op)); + self.extend(core::iter::once(op)); self } @@ -455,7 +451,7 @@ where } // allow using write handle for reads -use std::ops::Deref; +use core::ops::Deref; impl Deref for WriteHandle where T: Absorb, From 1b8148efd00782cdcaadcecc01312477528a744b Mon Sep 17 00:00:00 2001 From: Lol3rrr Date: Sat, 26 Mar 2022 22:23:46 +0100 Subject: [PATCH 04/13] Small changes from Review Made the small adjustments from the initial review on the Pull-Request --- src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index bc9fd57..1dcc5d8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,3 @@ -#![cfg_attr(not(feature = "std"), no_std)] //! A concurrency primitive for high concurrency reads over a single-writer data structure. //! //! The primitive keeps two copies of the backing data structure, one that is accessed by readers, @@ -167,6 +166,7 @@ //! closure instead. Instead, consider using [`ReadGuard::map`] and [`ReadGuard::try_map`], which //! (like `RefCell`'s [`Ref::map`](std::cell::Ref::map)) allow you to provide a guarded reference //! deeper into your data structure. +#![cfg_attr(not(feature = "std"), no_std)] #![warn( missing_docs, rust_2018_idioms, @@ -269,9 +269,9 @@ pub trait Absorb { } /// Construct a new write and read handle pair from an empty data structure with -/// [yield_now](std::thread::yield_now) as the yield function. +/// [`yield_now`](std::thread::yield_now) as the yield function. /// -/// See [new_from_empty_with_yield] for a more detailed explaination +/// See [`new_from_empty_with_yield`] for a more detailed explanation #[cfg(feature = "std")] pub fn new_from_empty(t: T) -> (WriteHandle, ReadHandle) where @@ -298,9 +298,9 @@ where } /// Construct a new write and read handle pair from the data structure default, with -/// [yield_now](std::thread::yield_now) as the yield function. +/// [`yield_now`](std::thread::yield_now) as the yield function. /// -/// See [new_with_yield] for a more detailed explaination +/// See [`new_with_yield`] for a more detailed explanation. #[cfg(feature = "std")] pub fn new() -> (WriteHandle, ReadHandle) where From 7200e93c1411b37ff27cb02730e40823037950b2 Mon Sep 17 00:00:00 2001 From: Lol3rrr Date: Sat, 26 Mar 2022 23:09:40 +0100 Subject: [PATCH 05/13] Initial HandleList implementation A basic implementation of a Lock-Free HandleList --- src/handle_list.rs | 163 +++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 + 2 files changed, 165 insertions(+) create mode 100644 src/handle_list.rs diff --git a/src/handle_list.rs b/src/handle_list.rs new file mode 100644 index 0000000..5ea8237 --- /dev/null +++ b/src/handle_list.rs @@ -0,0 +1,163 @@ +use crate::sync::{Arc, AtomicPtr, AtomicUsize, Ordering}; +use alloc::boxed::Box; + +// TODO +// * For now I'm just using Ordering::SeqCst, because I havent really looked into what exactly we +// need for the Ordering, so this should probably be made more accurate in the Future + +/// A Lock-Free List of Handles +pub struct HandleList { + // The Head of the List + head: AtomicPtr, +} + +/// A Snapshot of the HandleList +/// +/// Iterating over this Snapshot only yields the Entries that were present when this Snapshot was taken +pub struct ListSnapshot { + // The Head-Ptr at the time of creation + head: *const ListEntry, +} + +/// An Iterator over the Entries in a Snapshot +pub struct SnapshotIter { + // A Pointer to the next Entry that will be yielded + current: *const ListEntry, +} + +struct ListEntry { + data: Arc, + // We can use a normal Ptr here because we never append or remove Entries and only add new Entries + // by changing the Head, so we never modify this Ptr and therefore dont need an AtomicPtr + next: *const Self, +} + +impl HandleList { + /// Creates a new empty HandleList + pub const fn new() -> Self { + Self { + head: AtomicPtr::new(core::ptr::null_mut()), + } + } + + /// Adds a new Entry to the List and returns the Counter for the Entry + pub fn new_entry(&self) -> Arc { + let count = Arc::new(AtomicUsize::new(0)); + + let n_node = Box::new(ListEntry { + data: count.clone(), + next: core::ptr::null(), + }); + let n_node_ptr = Box::into_raw(n_node); + + let mut current_head = self.head.load(Ordering::SeqCst); + loop { + // Safety + // This is save, because we have not stored the Ptr elsewhere so we have exclusive + // access. + // The Ptr is also still valid, because we never free Entries on the List + unsafe { (*n_node_ptr).next = current_head }; + + // Attempt to add the Entry to the List by setting it as the new Head + match self.head.compare_exchange( + current_head, + n_node_ptr, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => return count, + Err(n_head) => { + // Store the found Head-Ptr to avoid an extra load at the start of every loop + current_head = n_head; + } + } + } + } + + /// Creates a new Snapshot of the List at this Point in Time + pub fn snapshot(&self) -> ListSnapshot { + ListSnapshot { + head: self.head.load(Ordering::SeqCst), + } + } +} + +impl Default for HandleList { + fn default() -> Self { + Self::new() + } +} + +impl ListSnapshot { + /// Obtain an iterator over the Entries in this Snapshot + pub fn iter(&self) -> SnapshotIter { + SnapshotIter { current: self.head } + } +} + +impl Iterator for SnapshotIter { + // TODO + // Maybe don't return an owned Value here + type Item = Arc; + + fn next(&mut self) -> Option { + if self.current.is_null() { + return None; + } + + // Safety + // The Ptr is not null, because of the previous if-statement. + // The Data is also not freed, because we never free Entries on the List. + // We also have no one mutating Entries on the List and therefore we can access this without + // any extra synchronization needed. + let entry = unsafe { &*self.current }; + + self.current = entry.next; + + Some(entry.data.clone()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn create_list() { + let list = HandleList::new(); + drop(list); + } + + #[test] + fn empty_snapshot() { + let list = HandleList::new(); + + let snapshot = list.snapshot(); + + // Assert that the Iterator over the Snapshot is empty + assert_eq!(0, snapshot.iter().count()); + } + + #[test] + fn snapshots_and_entries() { + let list = HandleList::new(); + + let empty_snapshot = list.snapshot(); + assert_eq!(0, empty_snapshot.iter().count()); + + let entry = list.new_entry(); + entry.store(1, Ordering::SeqCst); + + // Make sure that the Snapshot we got before adding a new Entry is still empty + assert_eq!(0, empty_snapshot.iter().count()); + + let second_snapshot = list.snapshot(); + assert_eq!(1, second_snapshot.iter().count()); + + let snapshot_entry = second_snapshot.iter().next().unwrap(); + assert_eq!( + entry.load(Ordering::SeqCst), + snapshot_entry.load(Ordering::SeqCst) + ); + } +} diff --git a/src/lib.rs b/src/lib.rs index 1dcc5d8..92a702c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -192,6 +192,8 @@ pub use crate::write::WriteHandle; mod read; pub use crate::read::{ReadGuard, ReadHandle, ReadHandleFactory}; +mod handle_list; + pub mod aliasing; /// Types that can incorporate operations of type `O`. From 2725aa30a8e7ffa53912ba4b53afe39da384aa43 Mon Sep 17 00:00:00 2001 From: Lol3rrr Date: Sun, 27 Mar 2022 00:29:43 +0100 Subject: [PATCH 06/13] Initial switch to HandleList This includes all the basic changes needed to swap to the HandleList and remove the Mutex. However there are a lot of TODOs that still need to be resolved --- src/lib.rs | 4 ++-- src/read.rs | 17 +++++++++------- src/read/factory.rs | 3 ++- src/sync.rs | 2 -- src/write.rs | 47 ++++++++++++++++++++++++++++----------------- 5 files changed, 43 insertions(+), 30 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 92a702c..17b6022 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -180,10 +180,10 @@ extern crate alloc; mod sync; -use crate::sync::{Arc, AtomicUsize, Mutex}; +use crate::sync::Arc; use alloc::boxed::Box; -type Epochs = Arc>>>; +type Epochs = Arc; mod write; pub use crate::write::Taken; diff --git a/src/read.rs b/src/read.rs index a493449..de667b1 100644 --- a/src/read.rs +++ b/src/read.rs @@ -41,7 +41,6 @@ pub struct ReadHandle { pub(crate) inner: Arc>, pub(crate) epochs: crate::Epochs, epoch: Arc, - epoch_i: usize, enters: Cell, // `ReadHandle` is _only_ Send if T is Sync. If T is !Sync, then it's not okay for us to expose @@ -56,16 +55,23 @@ impl Drop for ReadHandle { fn drop(&mut self) { // epoch must already be even for us to have &mut self, // so okay to lock since we're not holding up the epoch anyway. + + // TODO + // * Update Documentation and also + /* let e = self.epochs.lock().unwrap().remove(self.epoch_i); assert!(Arc::ptr_eq(&e, &self.epoch)); assert_eq!(self.enters.get(), 0); + */ } } impl fmt::Debug for ReadHandle { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ReadHandle") - .field("epochs", &self.epochs) + // TODO + // Figure out a Debug version for self.epochs + //.field("epochs", &self.epochs) .field("epoch", &self.epoch) .finish() } @@ -85,15 +91,12 @@ impl ReadHandle { } fn new_with_arc(inner: Arc>, epochs: crate::Epochs) -> Self { - // tell writer about our epoch tracker - let epoch = Arc::new(AtomicUsize::new(0)); - // okay to lock, since we're not holding up the epoch - let epoch_i = epochs.lock().unwrap().insert(Arc::clone(&epoch)); + // Obtain a new Epoch-Entry + let epoch = epochs.new_entry(); Self { epochs, epoch, - epoch_i, enters: Cell::new(0), inner, _unimpl_send: PhantomData, diff --git a/src/read/factory.rs b/src/read/factory.rs index 9305d67..ddb4c34 100644 --- a/src/read/factory.rs +++ b/src/read/factory.rs @@ -16,7 +16,8 @@ pub struct ReadHandleFactory { impl fmt::Debug for ReadHandleFactory { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ReadHandleFactory") - .field("epochs", &self.epochs) + // TODO + //.field("epochs", &self.epochs) .finish() } } diff --git a/src/sync.rs b/src/sync.rs index 5463e66..c5631bf 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -20,5 +20,3 @@ pub(crate) fn fence(ord: Ordering) { pub(crate) use alloc::sync::Arc; #[cfg(not(loom))] pub(crate) use core::sync::atomic::{fence, AtomicPtr, AtomicUsize, Ordering}; -#[cfg(not(loom))] -pub(crate) use std::sync::{Mutex, MutexGuard}; diff --git a/src/write.rs b/src/write.rs index 03a6477..57d17c8 100644 --- a/src/write.rs +++ b/src/write.rs @@ -1,7 +1,8 @@ +use crate::handle_list::ListSnapshot; use crate::read::ReadHandle; use crate::Absorb; -use crate::sync::{fence, Arc, AtomicUsize, MutexGuard, Ordering}; +use crate::sync::{fence, Arc, Ordering}; use alloc::{boxed::Box, collections::VecDeque, vec::Vec}; use core::fmt; use core::marker::PhantomData; @@ -66,7 +67,9 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WriteHandle") - .field("epochs", &self.epochs) + // TODO + // Figure out a way to implement Debug for Epochs + //.field("epochs", &self.epochs) .field("w_handle", &self.w_handle) .field("oplog", &self.oplog) .field("swap_index", &self.swap_index) @@ -171,9 +174,11 @@ where let r_handle = self.r_handle.inner.swap(ptr::null_mut(), Ordering::Release); // now, wait for all readers to depart - let epochs = Arc::clone(&self.epochs); - let mut epochs = epochs.lock().unwrap(); - self.wait(&mut epochs); + //let epochs = Arc::clone(&self.epochs); + //let mut epochs = epochs.lock().unwrap(); + + let epoch_snapshot = self.epochs.snapshot(); + self.wait(&epoch_snapshot); // ensure that the subsequent epoch reads aren't re-ordered to before the swap fence(Ordering::SeqCst); @@ -240,7 +245,7 @@ where } } - fn wait(&mut self, epochs: &mut MutexGuard<'_, slab::Slab>>) { + fn wait(&mut self, epochs: &ListSnapshot) { let mut iter = 0; let mut starti = 0; @@ -248,11 +253,12 @@ where { self.is_waiting.store(true, Ordering::Relaxed); } - // we're over-estimating here, but slab doesn't expose its max index - self.last_epochs.resize(epochs.capacity(), 0); + + // make sure we have enough space for all the epochs in the current snapshot + self.last_epochs.resize(epochs.iter().count(), 0); 'retry: loop { // read all and see if all have changed (which is likely) - for (ii, (ri, epoch)) in epochs.iter().enumerate().skip(starti) { + for (ii, (ri, epoch)) in epochs.iter().enumerate().enumerate().skip(starti) { // if the reader's epoch was even last we read it (which was _after_ the swap), // then they either do not have the pointer, or must have read the pointer strictly // after the swap. in either case, they cannot be using the old pointer value (what @@ -316,10 +322,12 @@ where // NOTE: it is safe for us to hold the lock for the entire duration of the swap. we will // only block on pre-existing readers, and they are never waiting to push onto epochs // unless they have finished reading. - let epochs = Arc::clone(&self.epochs); - let mut epochs = epochs.lock().unwrap(); + //let epochs = Arc::clone(&self.epochs); + //let mut epochs = epochs.lock().unwrap(); + + let epoch_snapshot = self.epochs.snapshot(); - self.wait(&mut epochs); + self.wait(&epoch_snapshot); if !self.first { // all the readers have left! @@ -384,7 +392,7 @@ where // ensure that the subsequent epoch reads aren't re-ordered to before the swap fence(Ordering::SeqCst); - for (ri, epoch) in epochs.iter() { + for (ri, epoch) in epoch_snapshot.iter().enumerate() { self.last_epochs[ri] = epoch.load(Ordering::Acquire); } @@ -567,7 +575,7 @@ struct CheckWriteHandleSend; #[cfg(test)] mod tests { - use crate::sync::{AtomicUsize, Mutex, Ordering}; + use crate::sync::{AtomicUsize, Ordering}; use crate::Absorb; use slab::Slab; include!("./utilities.rs"); @@ -628,6 +636,8 @@ mod tests { assert_eq!(*w.take(), 2); } + // TODO + /* #[test] fn wait_test() { use std::sync::{Arc, Barrier}; @@ -636,9 +646,9 @@ mod tests { // Case 1: If epoch is set to default. let test_epochs: crate::Epochs = Default::default(); - let mut test_epochs = test_epochs.lock().unwrap(); + let test_snapshot = test_epochs.snapshot(); // since there is no epoch to waiting for, wait function will return immediately. - w.wait(&mut test_epochs); + w.wait(&test_snapshot); // Case 2: If one of the reader is still reading(epoch is odd and count is same as in last_epoch) // and wait has been called. @@ -662,8 +672,8 @@ mod tests { let test_epochs = Arc::new(Mutex::new(epochs_slab)); let wait_handle = thread::spawn(move || { barrier2.wait(); - let mut test_epochs = test_epochs.lock().unwrap(); - w.wait(&mut test_epochs); + let test_epochs = test_epochs.snapshot(); + w.wait(&test_epochs); }); barrier.wait(); @@ -679,6 +689,7 @@ mod tests { // of held_epoch. let _ = wait_handle.join(); } + */ #[test] fn flush_noblock() { From f881ff1de4e71c50de661b049b3c9276273e32f4 Mon Sep 17 00:00:00 2001 From: Lol3rrr Date: Sun, 27 Mar 2022 01:13:28 +0100 Subject: [PATCH 07/13] Fixed some more TODOs Fixed some more TODOs that were still open --- src/handle_list.rs | 27 +++++++++++++++++++++++++-- src/read.rs | 16 +++------------- src/read/factory.rs | 3 +-- src/write.rs | 24 +++++++++--------------- 4 files changed, 38 insertions(+), 32 deletions(-) diff --git a/src/handle_list.rs b/src/handle_list.rs index 5ea8237..b47afee 100644 --- a/src/handle_list.rs +++ b/src/handle_list.rs @@ -1,3 +1,5 @@ +use core::fmt::{Debug, Formatter}; + use crate::sync::{Arc, AtomicPtr, AtomicUsize, Ordering}; use alloc::boxed::Box; @@ -44,8 +46,12 @@ impl HandleList { pub fn new_entry(&self) -> Arc { let count = Arc::new(AtomicUsize::new(0)); + self.add_counter(count.clone()); + count + } + fn add_counter(&self, count: Arc) { let n_node = Box::new(ListEntry { - data: count.clone(), + data: count, next: core::ptr::null(), }); let n_node_ptr = Box::into_raw(n_node); @@ -65,7 +71,7 @@ impl HandleList { Ordering::SeqCst, Ordering::SeqCst, ) { - Ok(_) => return count, + Ok(_) => return, Err(n_head) => { // Store the found Head-Ptr to avoid an extra load at the start of every loop current_head = n_head; @@ -80,6 +86,16 @@ impl HandleList { head: self.head.load(Ordering::SeqCst), } } + + /// Inserts the Items of the Iterator, but in reverse order + pub fn extend(&self, iter: I) + where + I: IntoIterator>, + { + for item in iter.into_iter() { + self.add_counter(item); + } + } } impl Default for HandleList { @@ -87,6 +103,13 @@ impl Default for HandleList { Self::new() } } +impl Debug for HandleList { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + // TODO + // Figure out how exactly we want the Debug output to look + write!(f, "HandleList") + } +} impl ListSnapshot { /// Obtain an iterator over the Entries in this Snapshot diff --git a/src/read.rs b/src/read.rs index de667b1..ba6098b 100644 --- a/src/read.rs +++ b/src/read.rs @@ -53,25 +53,15 @@ unsafe impl Send for ReadHandle where T: Sync {} impl Drop for ReadHandle { fn drop(&mut self) { - // epoch must already be even for us to have &mut self, - // so okay to lock since we're not holding up the epoch anyway. - - // TODO - // * Update Documentation and also - /* - let e = self.epochs.lock().unwrap().remove(self.epoch_i); - assert!(Arc::ptr_eq(&e, &self.epoch)); - assert_eq!(self.enters.get(), 0); - */ + // We dont need a Drop implementation as of now, because the Epoch for this Handle will not + // be freed again } } impl fmt::Debug for ReadHandle { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ReadHandle") - // TODO - // Figure out a Debug version for self.epochs - //.field("epochs", &self.epochs) + .field("epochs", &self.epochs) .field("epoch", &self.epoch) .finish() } diff --git a/src/read/factory.rs b/src/read/factory.rs index ddb4c34..9305d67 100644 --- a/src/read/factory.rs +++ b/src/read/factory.rs @@ -16,8 +16,7 @@ pub struct ReadHandleFactory { impl fmt::Debug for ReadHandleFactory { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ReadHandleFactory") - // TODO - //.field("epochs", &self.epochs) + .field("epochs", &self.epochs) .finish() } } diff --git a/src/write.rs b/src/write.rs index 57d17c8..6186598 100644 --- a/src/write.rs +++ b/src/write.rs @@ -67,9 +67,7 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WriteHandle") - // TODO - // Figure out a way to implement Debug for Epochs - //.field("epochs", &self.epochs) + .field("epochs", &self.epochs) .field("w_handle", &self.w_handle) .field("oplog", &self.oplog) .field("swap_index", &self.swap_index) @@ -318,12 +316,6 @@ where // we need to wait until all epochs have changed since the swaps *or* until a "finished" // flag has been observed to be on for two subsequent iterations (there still may be some // readers present since we did the previous refresh) - // - // NOTE: it is safe for us to hold the lock for the entire duration of the swap. we will - // only block on pre-existing readers, and they are never waiting to push onto epochs - // unless they have finished reading. - //let epochs = Arc::clone(&self.epochs); - //let mut epochs = epochs.lock().unwrap(); let epoch_snapshot = self.epochs.snapshot(); @@ -575,6 +567,7 @@ struct CheckWriteHandleSend; #[cfg(test)] mod tests { + use crate::sync::{AtomicUsize, Ordering}; use crate::Absorb; use slab::Slab; @@ -636,8 +629,6 @@ mod tests { assert_eq!(*w.take(), 2); } - // TODO - /* #[test] fn wait_test() { use std::sync::{Arc, Barrier}; @@ -669,11 +660,15 @@ mod tests { assert_eq!(false, is_waiting_v); let barrier2 = Arc::clone(&barrier); - let test_epochs = Arc::new(Mutex::new(epochs_slab)); + let test_epochs: crate::Epochs = Default::default(); + // We need to reverse the iterator here because when using `extend` it inserts the Items in reverse + test_epochs.extend(epochs_slab.into_iter().map(|(_, tmp)| tmp).rev()); + assert_eq!(3, test_epochs.snapshot().iter().count()); + let wait_handle = thread::spawn(move || { barrier2.wait(); - let test_epochs = test_epochs.snapshot(); - w.wait(&test_epochs); + let test_epochs_snapshot = test_epochs.snapshot(); + w.wait(&test_epochs_snapshot); }); barrier.wait(); @@ -689,7 +684,6 @@ mod tests { // of held_epoch. let _ = wait_handle.join(); } - */ #[test] fn flush_noblock() { From 9cf1237d3652ddec4b96c07bb96f0234b6c44b68 Mon Sep 17 00:00:00 2001 From: Lol3rrr Date: Sun, 27 Mar 2022 01:23:27 +0100 Subject: [PATCH 08/13] Updated tests for no_std Updated the Tests to now also work in a no_std environment, which mainly was just swapping out function names and using the core/alloc versions of std items. However the main doc test is still failing and Im not entirely sure on how to mitigate this Problem in an elegant way --- src/utilities.rs | 3 +++ src/write.rs | 23 +++++++++++++---------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/utilities.rs b/src/utilities.rs index 88520e8..29420f3 100644 --- a/src/utilities.rs +++ b/src/utilities.rs @@ -1,3 +1,6 @@ +#[cfg(test)] +use alloc::boxed::Box; + #[cfg(test)] #[derive(Debug)] pub struct CounterAddOp(pub i32); diff --git a/src/write.rs b/src/write.rs index 6186598..dfd16cd 100644 --- a/src/write.rs +++ b/src/write.rs @@ -9,7 +9,7 @@ use core::marker::PhantomData; use core::ops::DerefMut; use core::ptr::NonNull; #[cfg(test)] -use std::sync::atomic::AtomicBool; +use core::sync::atomic::AtomicBool; /// A writer handle to a left-right guarded data structure. /// @@ -573,9 +573,11 @@ mod tests { use slab::Slab; include!("./utilities.rs"); + fn test_yield() {} + #[test] fn append_test() { - let (mut w, _r) = crate::new::(); + let (mut w, _r) = crate::new_with_yield::(test_yield); assert_eq!(w.first, true); w.append(CounterAddOp(1)); assert_eq!(w.oplog.len(), 0); @@ -590,7 +592,7 @@ mod tests { #[test] fn take_test() { // publish twice then take with no pending operations - let (mut w, _r) = crate::new_from_empty::(2); + let (mut w, _r) = crate::new_from_empty_with_yield::(2, test_yield); w.append(CounterAddOp(1)); w.publish(); w.append(CounterAddOp(1)); @@ -598,7 +600,7 @@ mod tests { assert_eq!(*w.take(), 4); // publish twice then pending operation published by take - let (mut w, _r) = crate::new_from_empty::(2); + let (mut w, _r) = crate::new_from_empty_with_yield::(2, test_yield); w.append(CounterAddOp(1)); w.publish(); w.append(CounterAddOp(1)); @@ -607,29 +609,30 @@ mod tests { assert_eq!(*w.take(), 6); // normal publish then pending operations published by take - let (mut w, _r) = crate::new_from_empty::(2); + let (mut w, _r) = crate::new_from_empty_with_yield::(2, test_yield); w.append(CounterAddOp(1)); w.publish(); w.append(CounterAddOp(1)); assert_eq!(*w.take(), 4); // pending operations published by take - let (mut w, _r) = crate::new_from_empty::(2); + let (mut w, _r) = crate::new_from_empty_with_yield::(2, test_yield); w.append(CounterAddOp(1)); assert_eq!(*w.take(), 3); // emptry op queue - let (mut w, _r) = crate::new_from_empty::(2); + let (mut w, _r) = crate::new_from_empty_with_yield::(2, test_yield); w.append(CounterAddOp(1)); w.publish(); assert_eq!(*w.take(), 3); // no operations - let (w, _r) = crate::new_from_empty::(2); + let (w, _r) = crate::new_from_empty_with_yield::(2, test_yield); assert_eq!(*w.take(), 2); } #[test] + #[cfg(feature = "std")] fn wait_test() { use std::sync::{Arc, Barrier}; use std::thread; @@ -687,7 +690,7 @@ mod tests { #[test] fn flush_noblock() { - let (mut w, r) = crate::new::(); + let (mut w, r) = crate::new_with_yield::(test_yield); w.append(CounterAddOp(42)); w.publish(); assert_eq!(*r.enter().unwrap(), 42); @@ -701,7 +704,7 @@ mod tests { #[test] fn flush_no_refresh() { - let (mut w, _) = crate::new::(); + let (mut w, _) = crate::new_with_yield::(test_yield); // Until we refresh, writes are written directly instead of going to the // oplog (because there can't be any readers on the w_handle table). From 72ae2c6b540835d28d0cffddf7d4267c9d059aa4 Mon Sep 17 00:00:00 2001 From: Lol3rrr Date: Sun, 27 Mar 2022 01:26:31 +0100 Subject: [PATCH 09/13] Added a github workflow to check for no_std compilation --- .github/workflows/nostd.yml | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 .github/workflows/nostd.yml diff --git a/.github/workflows/nostd.yml b/.github/workflows/nostd.yml new file mode 100644 index 0000000..d94ae5d --- /dev/null +++ b/.github/workflows/nostd.yml @@ -0,0 +1,24 @@ +on: + push: + branches: [main] + pull_request: +name: no-std +jobs: + nostd: + runs-on: ubuntu-latest + name: ${{ matrix.target }} + strategy: + matrix: + target: [thumbv7m-none-eabi, aarch64-unknown-none] + steps: + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + target: ${{ matrix.target }} + - uses: actions/checkout@v2 + - name: cargo check + uses: actions-rs/cargo@v1 + with: + command: check + args: --target ${{ matrix.target }} --no-default-features \ No newline at end of file From 0e0b406e39b692ea846cae74a6c1c14847b5f7ec Mon Sep 17 00:00:00 2001 From: Lol3rrr Date: Wed, 18 May 2022 15:22:12 +0200 Subject: [PATCH 10/13] Updated the HandleList to now free entries on Drop Reworked a little bit of the Structure for the new HandleList to now handle dropping the List correctly and free all the Entries of the List. This meant we need to create a new InnerList that is wrapped in an Arc, which then allows us to know that once InnerList is dropped, no one else can have access to it or any of its Entries anymore --- src/handle_list.rs | 51 ++++++++++++++++++++++++++++++++++++++++----- src/lib.rs | 11 +++++----- src/read.rs | 4 ++-- src/read/factory.rs | 4 ++-- src/write.rs | 4 +++- 5 files changed, 58 insertions(+), 16 deletions(-) diff --git a/src/handle_list.rs b/src/handle_list.rs index b47afee..690e220 100644 --- a/src/handle_list.rs +++ b/src/handle_list.rs @@ -9,6 +9,10 @@ use alloc::boxed::Box; /// A Lock-Free List of Handles pub struct HandleList { + inner: Arc, +} + +struct InnerList { // The Head of the List head: AtomicPtr, } @@ -19,6 +23,9 @@ pub struct HandleList { pub struct ListSnapshot { // The Head-Ptr at the time of creation head: *const ListEntry, + + // This entry exists to make sure that we keep the inner List alive and it wont be freed from under us + _list: Arc, } /// An Iterator over the Entries in a Snapshot @@ -36,9 +43,11 @@ struct ListEntry { impl HandleList { /// Creates a new empty HandleList - pub const fn new() -> Self { + pub fn new() -> Self { Self { - head: AtomicPtr::new(core::ptr::null_mut()), + inner: Arc::new(InnerList { + head: AtomicPtr::new(core::ptr::null_mut()), + }), } } @@ -56,7 +65,7 @@ impl HandleList { }); let n_node_ptr = Box::into_raw(n_node); - let mut current_head = self.head.load(Ordering::SeqCst); + let mut current_head = self.inner.head.load(Ordering::SeqCst); loop { // Safety // This is save, because we have not stored the Ptr elsewhere so we have exclusive @@ -65,7 +74,7 @@ impl HandleList { unsafe { (*n_node_ptr).next = current_head }; // Attempt to add the Entry to the List by setting it as the new Head - match self.head.compare_exchange( + match self.inner.head.compare_exchange( current_head, n_node_ptr, Ordering::SeqCst, @@ -83,11 +92,13 @@ impl HandleList { /// Creates a new Snapshot of the List at this Point in Time pub fn snapshot(&self) -> ListSnapshot { ListSnapshot { - head: self.head.load(Ordering::SeqCst), + head: self.inner.head.load(Ordering::SeqCst), + _list: self.inner.clone(), } } /// Inserts the Items of the Iterator, but in reverse order + #[cfg(test)] pub fn extend(&self, iter: I) where I: IntoIterator>, @@ -110,6 +121,13 @@ impl Debug for HandleList { write!(f, "HandleList") } } +impl Clone for HandleList { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} impl ListSnapshot { /// Obtain an iterator over the Entries in this Snapshot @@ -141,6 +159,29 @@ impl Iterator for SnapshotIter { } } +impl Drop for InnerList { + fn drop(&mut self) { + // We iterate over all the Entries of the List and free every Entry of the List + let mut current = self.head.load(Ordering::SeqCst); + while !current.is_null() { + // # Safety + // This is safe, because we only enter the loop body if the Pointer is not null and we + // also know that the Entry is not yet freed because we only free them once we are dropped + // and because we are now in Drop, noone before us has freed any Entry on the List + let current_r = unsafe { &*current }; + + let next = current_r.next as *mut ListEntry; + + // # Safety + // This is safe, because of the same garantuees detailed above for `current_r` + let entry = unsafe { Box::from_raw(current) }; + drop(entry); + + current = next; + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/lib.rs b/src/lib.rs index 17b6022..3859083 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -180,10 +180,9 @@ extern crate alloc; mod sync; -use crate::sync::Arc; use alloc::boxed::Box; -type Epochs = Arc; +type Epochs = handle_list::HandleList; mod write; pub use crate::write::Taken; @@ -292,9 +291,9 @@ pub fn new_from_empty_with_yield(t: T, yield_fn: fn()) -> (WriteHandle + Clone, { - let epochs = Default::default(); + let epochs: Epochs = Default::default(); - let r = ReadHandle::new(t.clone(), Arc::clone(&epochs)); + let r = ReadHandle::new(t.clone(), epochs.clone()); let w = WriteHandle::new_with_yield(t, epochs, r.clone(), yield_fn); (w, r) } @@ -328,9 +327,9 @@ pub fn new_with_yield(yield_fn: fn()) -> (WriteHandle, ReadHandle where T: Absorb + Default, { - let epochs = Default::default(); + let epochs: Epochs = Default::default(); - let r = ReadHandle::new(T::default(), Arc::clone(&epochs)); + let r = ReadHandle::new(T::default(), epochs.clone()); let w = WriteHandle::new_with_yield(T::default(), epochs, r.clone(), yield_fn); (w, r) } diff --git a/src/read.rs b/src/read.rs index ba6098b..a5c439d 100644 --- a/src/read.rs +++ b/src/read.rs @@ -69,7 +69,7 @@ impl fmt::Debug for ReadHandle { impl Clone for ReadHandle { fn clone(&self) -> Self { - ReadHandle::new_with_arc(Arc::clone(&self.inner), Arc::clone(&self.epochs)) + ReadHandle::new_with_arc(Arc::clone(&self.inner), self.epochs.clone()) } } @@ -98,7 +98,7 @@ impl ReadHandle { pub fn factory(&self) -> ReadHandleFactory { ReadHandleFactory { inner: Arc::clone(&self.inner), - epochs: Arc::clone(&self.epochs), + epochs: self.epochs.clone(), } } } diff --git a/src/read/factory.rs b/src/read/factory.rs index 9305d67..3db8471 100644 --- a/src/read/factory.rs +++ b/src/read/factory.rs @@ -25,7 +25,7 @@ impl Clone for ReadHandleFactory { fn clone(&self) -> Self { Self { inner: Arc::clone(&self.inner), - epochs: Arc::clone(&self.epochs), + epochs: self.epochs.clone(), } } } @@ -34,6 +34,6 @@ impl ReadHandleFactory { /// Produce a new [`ReadHandle`] to the same left-right data structure as this factory was /// originally produced from. pub fn handle(&self) -> ReadHandle { - ReadHandle::new_with_arc(Arc::clone(&self.inner), Arc::clone(&self.epochs)) + ReadHandle::new_with_arc(Arc::clone(&self.inner), self.epochs.clone()) } } diff --git a/src/write.rs b/src/write.rs index dfd16cd..31cfb10 100644 --- a/src/write.rs +++ b/src/write.rs @@ -2,7 +2,9 @@ use crate::handle_list::ListSnapshot; use crate::read::ReadHandle; use crate::Absorb; -use crate::sync::{fence, Arc, Ordering}; +#[cfg(test)] +use crate::sync::Arc; +use crate::sync::{fence, Ordering}; use alloc::{boxed::Box, collections::VecDeque, vec::Vec}; use core::fmt; use core::marker::PhantomData; From ba70de828d4d676c4ab259fbc8ff48fc62cebbf6 Mon Sep 17 00:00:00 2001 From: Lol3rrr Date: Sun, 22 May 2022 00:09:33 +0200 Subject: [PATCH 11/13] Fixed most of the simpler Problems that had straight forward solutions Fixed most of the basic Problems picked out on the last PR comment, mostly the ones that had very straight forward solutions and didnt require any big changes or new ideas/features. --- src/handle_list.rs | 44 ++++++++++++++++++++++++++++---------------- src/write.rs | 14 ++++++-------- 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/src/handle_list.rs b/src/handle_list.rs index 690e220..9d416f3 100644 --- a/src/handle_list.rs +++ b/src/handle_list.rs @@ -1,4 +1,7 @@ -use core::fmt::{Debug, Formatter}; +use core::{ + fmt::{Debug, Formatter}, + marker::PhantomData, +}; use crate::sync::{Arc, AtomicPtr, AtomicUsize, Ordering}; use alloc::boxed::Box; @@ -29,9 +32,10 @@ pub struct ListSnapshot { } /// An Iterator over the Entries in a Snapshot -pub struct SnapshotIter { +pub struct SnapshotIter<'s> { // A Pointer to the next Entry that will be yielded current: *const ListEntry, + _marker: PhantomData<&'s ()>, } struct ListEntry { @@ -131,49 +135,57 @@ impl Clone for HandleList { impl ListSnapshot { /// Obtain an iterator over the Entries in this Snapshot - pub fn iter(&self) -> SnapshotIter { - SnapshotIter { current: self.head } + pub fn iter(&self) -> SnapshotIter<'_> { + SnapshotIter { + current: self.head, + _marker: PhantomData {}, + } } } -impl Iterator for SnapshotIter { +impl<'s> Iterator for SnapshotIter<'s> { // TODO // Maybe don't return an owned Value here - type Item = Arc; + type Item = &'s AtomicUsize; fn next(&mut self) -> Option { if self.current.is_null() { return None; } - // Safety + // # Safety // The Ptr is not null, because of the previous if-statement. - // The Data is also not freed, because we never free Entries on the List. - // We also have no one mutating Entries on the List and therefore we can access this without - // any extra synchronization needed. + // + // The Data is also not freed yet, because we know that at least one snapshot is still + // alive (because we bind to it through the lifetime) and as long as at least one + // snapshot exists, the InnerList will not be freed or dropped. This means that the entries + // in the List are also not yet freed and therefore its safe to still access them let entry = unsafe { &*self.current }; self.current = entry.next; - Some(entry.data.clone()) + Some(&entry.data) } } impl Drop for InnerList { fn drop(&mut self) { // We iterate over all the Entries of the List and free every Entry of the List - let mut current = self.head.load(Ordering::SeqCst); + let mut current = *self.head.get_mut(); while !current.is_null() { // # Safety // This is safe, because we only enter the loop body if the Pointer is not null and we // also know that the Entry is not yet freed because we only free them once we are dropped // and because we are now in Drop, noone before us has freed any Entry on the List - let current_r = unsafe { &*current }; - - let next = current_r.next as *mut ListEntry; + let next = unsafe { &*current }.next as *mut ListEntry; // # Safety - // This is safe, because of the same garantuees detailed above for `current_r` + // 1. We know that the Pointer was allocated using Box::new + // 2. We are the only ones to convert it back into a Box again, because we only ever do + // this when the InnerList is dropped (now) and then also free all the Entries so there + // is no chance of one entry surviving or still being stored somewhere for later use. + // 3. There is also no other reference to the Element, because otherwise the InnerList + // could not be dropped and we would not be in this section let entry = unsafe { Box::from_raw(current) }; drop(entry); diff --git a/src/write.rs b/src/write.rs index 31cfb10..5bf94fd 100644 --- a/src/write.rs +++ b/src/write.rs @@ -2,9 +2,7 @@ use crate::handle_list::ListSnapshot; use crate::read::ReadHandle; use crate::Absorb; -#[cfg(test)] -use crate::sync::Arc; -use crate::sync::{fence, Ordering}; +use crate::sync::{fence, Arc, Ordering}; use alloc::{boxed::Box, collections::VecDeque, vec::Vec}; use core::fmt; use core::marker::PhantomData; @@ -174,9 +172,6 @@ where let r_handle = self.r_handle.inner.swap(ptr::null_mut(), Ordering::Release); // now, wait for all readers to depart - //let epochs = Arc::clone(&self.epochs); - //let mut epochs = epochs.lock().unwrap(); - let epoch_snapshot = self.epochs.snapshot(); self.wait(&epoch_snapshot); @@ -318,9 +313,12 @@ where // we need to wait until all epochs have changed since the swaps *or* until a "finished" // flag has been observed to be on for two subsequent iterations (there still may be some // readers present since we did the previous refresh) - + // + // NOTE: + // Here we take a Snapshot of the currently existing Readers and only consider these + // as all new readers will already be on the other copy, so there is no need to wait for + // them let epoch_snapshot = self.epochs.snapshot(); - self.wait(&epoch_snapshot); if !self.first { From f2952d5300898a2cf577bd832d6bc5ba4dbf1756 Mon Sep 17 00:00:00 2001 From: Lol3rrr Date: Thu, 7 Sep 2023 21:29:21 +0200 Subject: [PATCH 12/13] Add better way to get length of epochs snapshot Every list entry now also stores the number of following entries, which is relatively cheap to do, as we only ever append new entries at the start. This allows us to easily get the current length of the list, as we can simply read the current head and read how many followers it has. --- src/handle_list.rs | 23 +++++++++++++++++++++++ src/write.rs | 3 ++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/handle_list.rs b/src/handle_list.rs index 9d416f3..bff2bf3 100644 --- a/src/handle_list.rs +++ b/src/handle_list.rs @@ -40,6 +40,8 @@ pub struct SnapshotIter<'s> { struct ListEntry { data: Arc, + // Stores the number of following entries in the list + followers: usize, // We can use a normal Ptr here because we never append or remove Entries and only add new Entries // by changing the Head, so we never modify this Ptr and therefore dont need an AtomicPtr next: *const Self, @@ -62,9 +64,12 @@ impl HandleList { self.add_counter(count.clone()); count } + + /// Adds a new Counter to the List of Entries, increasing the size of the List fn add_counter(&self, count: Arc) { let n_node = Box::new(ListEntry { data: count, + followers: 0, next: core::ptr::null(), }); let n_node_ptr = Box::into_raw(n_node); @@ -77,6 +82,15 @@ impl HandleList { // The Ptr is also still valid, because we never free Entries on the List unsafe { (*n_node_ptr).next = current_head }; + // Update the follower count of the new entry + if !current_head.is_null() { + // Safety + // This is save, because we know the Ptr is not null and we know that + // Entries will never be deallocated, so the ptr still refers to a valid + // entry. + unsafe { (*n_node_ptr).followers = (*current_head).followers + 1 }; + } + // Attempt to add the Entry to the List by setting it as the new Head match self.inner.head.compare_exchange( current_head, @@ -141,6 +155,15 @@ impl ListSnapshot { _marker: PhantomData {}, } } + + /// Get the Length of the current List of entries + pub fn len(&self) -> usize { + if self.head.is_null() { + return 0; + } + + unsafe { (*self.head).followers + 1 } + } } impl<'s> Iterator for SnapshotIter<'s> { diff --git a/src/write.rs b/src/write.rs index 5bf94fd..7a9f098 100644 --- a/src/write.rs +++ b/src/write.rs @@ -250,7 +250,8 @@ where } // make sure we have enough space for all the epochs in the current snapshot - self.last_epochs.resize(epochs.iter().count(), 0); + self.last_epochs.resize(epochs.len(), 0); + 'retry: loop { // read all and see if all have changed (which is likely) for (ii, (ri, epoch)) in epochs.iter().enumerate().enumerate().skip(starti) { From 490377ad60df8d6bec2c1b78797967617d0ea462 Mon Sep 17 00:00:00 2001 From: Lol3rrr Date: Thu, 7 Sep 2023 21:58:29 +0200 Subject: [PATCH 13/13] First Reuse impl Added a first iteration of the reuse of entries in the new HandleList --- src/handle_list.rs | 103 ++++++++++++++++++++++++++++++++++++++++++--- src/read.rs | 11 ++--- src/read/guard.rs | 5 ++- 3 files changed, 105 insertions(+), 14 deletions(-) diff --git a/src/handle_list.rs b/src/handle_list.rs index bff2bf3..3052c56 100644 --- a/src/handle_list.rs +++ b/src/handle_list.rs @@ -1,6 +1,7 @@ use core::{ fmt::{Debug, Formatter}, marker::PhantomData, + sync::atomic::AtomicBool, }; use crate::sync::{Arc, AtomicPtr, AtomicUsize, Ordering}; @@ -40,6 +41,7 @@ pub struct SnapshotIter<'s> { struct ListEntry { data: Arc, + used: AtomicBool, // Stores the number of following entries in the list followers: usize, // We can use a normal Ptr here because we never append or remove Entries and only add new Entries @@ -47,6 +49,13 @@ struct ListEntry { next: *const Self, } +/// The EntryHandle is needed to allow for reuse of entries, after a handle is dropped +#[derive(Debug)] +pub struct EntryHandle { + counter: Arc, + elem: *const ListEntry, +} + impl HandleList { /// Creates a new empty HandleList pub fn new() -> Self { @@ -57,18 +66,67 @@ impl HandleList { } } + fn len(&self) -> usize { + let head = self.inner.head.load(Ordering::SeqCst); + if head.is_null() { + return 0; + } + + // Safety + // The prt is not null and as entries are never deallocated, so the ptr should always be + // valid + unsafe { (*head).followers + 1 } + } + + /// Obtains a new Entry + pub fn get_entry(&self) -> EntryHandle { + if let Some(entry) = self.try_acquire() { + return entry; + } + + self.new_entry() + } + + fn try_acquire(&self) -> Option { + let mut current: *const ListEntry = self.inner.head.load(Ordering::SeqCst); + while !current.is_null() { + // Safety + // The ptr is not null and entries are never deallocated + let current_entry = unsafe { &*current }; + + if !current_entry.used.load(Ordering::SeqCst) { + if current_entry + .used + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + return Some(EntryHandle { + counter: current_entry.data.clone(), + elem: current, + }); + } + } + + current = current_entry.next; + } + + None + } + /// Adds a new Entry to the List and returns the Counter for the Entry - pub fn new_entry(&self) -> Arc { + fn new_entry(&self) -> EntryHandle { let count = Arc::new(AtomicUsize::new(0)); - self.add_counter(count.clone()); - count + self.add_counter(count) } /// Adds a new Counter to the List of Entries, increasing the size of the List - fn add_counter(&self, count: Arc) { + fn add_counter(&self, count: Arc) -> EntryHandle { + let counter = count.clone(); + let n_node = Box::new(ListEntry { data: count, + used: AtomicBool::new(true), followers: 0, next: core::ptr::null(), }); @@ -98,7 +156,12 @@ impl HandleList { Ordering::SeqCst, Ordering::SeqCst, ) { - Ok(_) => return, + Ok(_) => { + return EntryHandle { + counter, + elem: n_node_ptr, + } + } Err(n_head) => { // Store the found Head-Ptr to avoid an extra load at the start of every loop current_head = n_head; @@ -217,6 +280,18 @@ impl Drop for InnerList { } } +impl EntryHandle { + pub fn counter(&self) -> &AtomicUsize { + &self.counter + } +} +impl Drop for EntryHandle { + fn drop(&mut self) { + let elem = unsafe { &*self.elem }; + elem.used.store(false, Ordering::SeqCst); + } +} + #[cfg(test)] mod tests { use super::*; @@ -245,7 +320,7 @@ mod tests { assert_eq!(0, empty_snapshot.iter().count()); let entry = list.new_entry(); - entry.store(1, Ordering::SeqCst); + entry.counter().store(1, Ordering::SeqCst); // Make sure that the Snapshot we got before adding a new Entry is still empty assert_eq!(0, empty_snapshot.iter().count()); @@ -255,8 +330,22 @@ mod tests { let snapshot_entry = second_snapshot.iter().next().unwrap(); assert_eq!( - entry.load(Ordering::SeqCst), + entry.counter().load(Ordering::SeqCst), snapshot_entry.load(Ordering::SeqCst) ); } + + #[test] + fn entry_reuse() { + let list = HandleList::new(); + + assert_eq!(0, list.len()); + + let entry1 = list.get_entry(); + assert_eq!(1, list.len()); + drop(entry1); + + let entry2 = list.get_entry(); + assert_eq!(1, list.len()); + } } diff --git a/src/read.rs b/src/read.rs index a5c439d..fefbcfe 100644 --- a/src/read.rs +++ b/src/read.rs @@ -40,7 +40,8 @@ pub use factory::ReadHandleFactory; pub struct ReadHandle { pub(crate) inner: Arc>, pub(crate) epochs: crate::Epochs, - epoch: Arc, + // epoch: Arc, + epoch: crate::handle_list::EntryHandle, enters: Cell, // `ReadHandle` is _only_ Send if T is Sync. If T is !Sync, then it's not okay for us to expose @@ -62,7 +63,7 @@ impl fmt::Debug for ReadHandle { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ReadHandle") .field("epochs", &self.epochs) - .field("epoch", &self.epoch) + .field("epoch", &self.epoch.counter()) .finish() } } @@ -82,7 +83,7 @@ impl ReadHandle { fn new_with_arc(inner: Arc>, epochs: crate::Epochs) -> Self { // Obtain a new Epoch-Entry - let epoch = epochs.new_entry(); + let epoch = epochs.get_entry(); Self { epochs, @@ -159,7 +160,7 @@ impl ReadHandle { // in all cases, using a pointer we read *after* updating our epoch is safe. // so, update our epoch tracker. - self.epoch.fetch_add(1, Ordering::AcqRel); + self.epoch.counter().fetch_add(1, Ordering::AcqRel); // ensure that the pointer read happens strictly after updating the epoch fence(Ordering::SeqCst); @@ -181,7 +182,7 @@ impl ReadHandle { } else { // the writehandle has been dropped, and so has both copies, // so restore parity and return None - self.epoch.fetch_add(1, Ordering::AcqRel); + self.epoch.counter().fetch_add(1, Ordering::AcqRel); None } } diff --git a/src/read/guard.rs b/src/read/guard.rs index 4f2b40a..f268499 100644 --- a/src/read/guard.rs +++ b/src/read/guard.rs @@ -4,7 +4,8 @@ use core::mem; #[derive(Debug, Copy, Clone)] pub(super) struct ReadHandleState<'rh> { - pub(super) epoch: &'rh AtomicUsize, + // pub(super) epoch: &'rh AtomicUsize, + pub(super) epoch: &'rh crate::handle_list::EntryHandle, pub(super) enters: &'rh Cell, } @@ -120,7 +121,7 @@ impl<'rh, T: ?Sized> Drop for ReadGuard<'rh, T> { self.handle.enters.set(enters); if enters == 0 { // We are the last guard to be dropped -- now release our epoch. - self.handle.epoch.fetch_add(1, Ordering::AcqRel); + self.handle.epoch.counter().fetch_add(1, Ordering::AcqRel); } } }