Skip to content

Commit f8a59f1

Browse files
authored
Add WriteHandle::take (#97)
1 parent a8f42a8 commit f8a59f1

File tree

2 files changed

+159
-7
lines changed

2 files changed

+159
-7
lines changed

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ use crate::sync::{Arc, AtomicUsize, Mutex};
181181
type Epochs = Arc<Mutex<slab::Slab<Arc<AtomicUsize>>>>;
182182

183183
mod write;
184+
pub use crate::write::Taken;
184185
pub use crate::write::WriteHandle;
185186

186187
mod read;

src/write.rs

Lines changed: 158 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
use super::Absorb;
21
use crate::read::ReadHandle;
2+
use crate::Absorb;
33

44
use crate::sync::{fence, Arc, AtomicUsize, MutexGuard, Ordering};
55
use std::collections::VecDeque;
6+
use std::marker::PhantomData;
7+
use std::ops::DerefMut;
68
use std::ptr::NonNull;
79
#[cfg(test)]
810
use std::sync::atomic::AtomicBool;
@@ -38,6 +40,8 @@ where
3840
first: bool,
3941
/// A publish has happened, but the two copies have not been synchronized yet.
4042
second: bool,
43+
/// If we call `Self::take` the drop needs to be different.
44+
taken: bool,
4145
}
4246

4347
// safety: if a `WriteHandle` is sent across a thread boundary, we need to be able to take
@@ -70,16 +74,89 @@ where
7074
}
7175
}
7276

73-
impl<T, O> Drop for WriteHandle<T, O>
77+
/// A **smart pointer** to an owned backing data structure. This makes sure that the
78+
/// data is dropped correctly (using [`Absorb::drop_second`]).
79+
///
80+
/// Additionally it allows for unsafely getting the inner data out using [`into_box()`](Taken::into_box).
81+
pub struct Taken<T: Absorb<O>, O> {
82+
inner: Option<Box<T>>,
83+
_marker: PhantomData<O>,
84+
}
85+
86+
impl<T: Absorb<O> + std::fmt::Debug, O> std::fmt::Debug for Taken<T, O> {
87+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88+
f.debug_struct("Taken")
89+
.field(
90+
"inner",
91+
self.inner
92+
.as_ref()
93+
.expect("inner is only taken in `into_box` which drops self"),
94+
)
95+
.finish()
96+
}
97+
}
98+
99+
impl<T: Absorb<O>, O> Deref for Taken<T, O> {
100+
type Target = T;
101+
102+
fn deref(&self) -> &Self::Target {
103+
self.inner
104+
.as_ref()
105+
.expect("inner is only taken in `into_box` which drops self")
106+
}
107+
}
108+
109+
impl<T: Absorb<O>, O> DerefMut for Taken<T, O> {
110+
fn deref_mut(&mut self) -> &mut Self::Target {
111+
self.inner
112+
.as_mut()
113+
.expect("inner is only taken in `into_box` which drops self")
114+
}
115+
}
116+
117+
impl<T: Absorb<O>, O> Taken<T, O> {
118+
/// This is unsafe because you must call [`Absorb::drop_second`] in
119+
/// case just dropping `T` is not safe and sufficient.
120+
///
121+
/// If you used the default implementation of [`Absorb::drop_second`] (which just calls [`drop`](Drop::drop))
122+
/// you don't need to call [`Absorb::drop_second`].
123+
pub unsafe fn into_box(mut self) -> Box<T> {
124+
self.inner
125+
.take()
126+
.expect("inner is only taken here then self is dropped")
127+
}
128+
}
129+
130+
impl<T: Absorb<O>, O> Drop for Taken<T, O> {
131+
fn drop(&mut self) {
132+
if let Some(inner) = self.inner.take() {
133+
T::drop_second(inner);
134+
}
135+
}
136+
}
137+
138+
impl<T, O> WriteHandle<T, O>
74139
where
75140
T: Absorb<O>,
76141
{
77-
fn drop(&mut self) {
142+
/// Takes out the inner backing data structure if it hasn't been taken yet. Otherwise returns `None`.
143+
///
144+
/// Makes sure that all the pending operations are applied and waits till all the read handles
145+
/// have departed. Then it uses [`Absorb::drop_first`] to drop one of the copies of the data and
146+
/// returns the other copy as a [`Taken`] smart pointer.
147+
fn take_inner(&mut self) -> Option<Taken<T, O>> {
78148
use std::ptr;
149+
// Can only take inner once.
150+
if self.taken {
151+
return None;
152+
}
153+
154+
// Disallow taking again.
155+
self.taken = true;
79156

80157
// first, ensure both copies are up to date
81-
// (otherwise safely dropping deduplicated data is a pain)
82-
if !self.oplog.is_empty() {
158+
// (otherwise safely dropping the possibly duplicated w_handle data is a pain)
159+
if self.first || !self.oplog.is_empty() {
83160
self.publish();
84161
}
85162
if !self.oplog.is_empty() {
@@ -105,12 +182,29 @@ where
105182
// safety: w_handle was initially crated from a `Box`, and is no longer aliased.
106183
Absorb::drop_first(unsafe { Box::from_raw(self.w_handle.as_ptr()) });
107184

108-
// next we drop the r_handle.
185+
// next we take the r_handle and return it as a boxed value.
186+
//
109187
// this is safe, since we know that no readers are using this pointer
110188
// anymore (due to the .wait() following swapping the pointer with NULL).
111189
//
112190
// safety: r_handle was initially crated from a `Box`, and is no longer aliased.
113-
Absorb::drop_second(unsafe { Box::from_raw(r_handle) });
191+
let boxed_r_handle = unsafe { Box::from_raw(r_handle) };
192+
193+
Some(Taken {
194+
inner: Some(boxed_r_handle),
195+
_marker: PhantomData,
196+
})
197+
}
198+
}
199+
200+
impl<T, O> Drop for WriteHandle<T, O>
201+
where
202+
T: Absorb<O>,
203+
{
204+
fn drop(&mut self) {
205+
if let Some(inner) = self.take_inner() {
206+
drop(inner);
207+
}
114208
}
115209
}
116210

@@ -133,6 +227,7 @@ where
133227
refreshes: 0,
134228
first: true,
135229
second: true,
230+
taken: false,
136231
}
137232
}
138233

@@ -330,6 +425,20 @@ where
330425
pub fn raw_write_handle(&mut self) -> NonNull<T> {
331426
self.w_handle
332427
}
428+
429+
/// Returns the backing data structure.
430+
///
431+
/// Makes sure that all the pending operations are applied and waits till all the read handles
432+
/// have departed. Then it uses [`Absorb::drop_first`] to drop one of the copies of the data and
433+
/// returns the other copy as a [`Taken`] smart pointer.
434+
pub fn take(mut self) -> Taken<T, O> {
435+
// It is always safe to `expect` here because `take_inner` is private
436+
// and it is only called here and in the drop impl. Since we have an owned
437+
// `self` we know the drop has not yet been called. And every first call of
438+
// `take_inner` returns `Some`
439+
self.take_inner()
440+
.expect("inner is only taken here then self is dropped")
441+
}
333442
}
334443

335444
// allow using write handle for reads
@@ -468,6 +577,48 @@ mod tests {
468577
assert_eq!(w.oplog.len(), 2);
469578
}
470579

580+
#[test]
581+
fn take_test() {
582+
// publish twice then take with no pending operations
583+
let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
584+
w.append(CounterAddOp(1));
585+
w.publish();
586+
w.append(CounterAddOp(1));
587+
w.publish();
588+
assert_eq!(*w.take(), 4);
589+
590+
// publish twice then pending operation published by take
591+
let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
592+
w.append(CounterAddOp(1));
593+
w.publish();
594+
w.append(CounterAddOp(1));
595+
w.publish();
596+
w.append(CounterAddOp(2));
597+
assert_eq!(*w.take(), 6);
598+
599+
// normal publish then pending operations published by take
600+
let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
601+
w.append(CounterAddOp(1));
602+
w.publish();
603+
w.append(CounterAddOp(1));
604+
assert_eq!(*w.take(), 4);
605+
606+
// pending operations published by take
607+
let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
608+
w.append(CounterAddOp(1));
609+
assert_eq!(*w.take(), 3);
610+
611+
// emptry op queue
612+
let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
613+
w.append(CounterAddOp(1));
614+
w.publish();
615+
assert_eq!(*w.take(), 3);
616+
617+
// no operations
618+
let (w, _r) = crate::new_from_empty::<i32, _>(2);
619+
assert_eq!(*w.take(), 2);
620+
}
621+
471622
#[test]
472623
fn wait_test() {
473624
use std::sync::{Arc, Barrier};

0 commit comments

Comments
 (0)