Skip to content

Commit 90b9e7b

Browse files
committed
avoid leaking recursive calls to retire when dropping collector
1 parent 3aa927d commit 90b9e7b

File tree

4 files changed

+138
-23
lines changed

4 files changed

+138
-23
lines changed

src/collector.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,28 @@ impl Collector {
262262
unsafe { self.raw.add(ptr, reclaim, Thread::current()) }
263263
}
264264

265+
/// Reclaim any values that have been retired.
266+
///
267+
/// This method reclaims any objects that have been retired across *all* threads.
268+
/// After calling this method, any values that were previous retired, or retired
269+
/// recursively on the current thread during this call, will have been reclaimed.
270+
///
271+
/// # Safety
272+
///
273+
/// This function is **extremely unsafe** to call. It is only sound when no threads are
274+
/// currently active, whether accessing values that have been retired or accessing the
275+
/// collector through any type of guard. This is akin to having a unique reference to the
276+
/// collector. However, this method takes a shared reference, as reclaimers to be run by this
277+
/// thread are allowed to access the collector recursively.
278+
///
279+
/// # Notes
280+
///
281+
/// Note that if reclaimers initialize guards across threads, or initialize owned guards,
282+
/// objects retired through those guards may not be reclaimed.
283+
pub unsafe fn reclaim_all(&self) {
284+
unsafe { self.raw.reclaim_all() };
285+
}
286+
265287
pub(crate) fn ptr_eq(this: &Collector, other: &Collector) -> bool {
266288
ptr::eq(this.unique, other.unique)
267289
}

src/deferred.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::{AsLink, Collector, Link};
1717
/// concurrently. It is not meant to be used to amortize the cost of retirement, which is done
1818
/// through thread-local batches controlled with [`Collector::batch_size`], as access from a single-thread
1919
/// can be more expensive than is required. Deferred batches are useful when you need to control when
20-
/// a batch of nodes is retired directly, a relatively rare use case.
20+
/// a batch of objects is retired directly, a relatively rare use case.
2121
///
2222
/// # Examples
2323
///
@@ -31,13 +31,13 @@ use crate::{AsLink, Collector, Link};
3131
/// .map(|i| AtomicPtr::new(collector.link_boxed(i)))
3232
/// .collect::<Arc<[_]>>();
3333
///
34-
/// // create a batch of nodes to retire
34+
/// // create a batch of objects to retire
3535
/// let mut batch = Deferred::new();
3636
///
3737
/// for item in items.iter() {
3838
/// // make the item unreachable with an atomic swap
3939
/// let old = item.swap(std::ptr::null_mut(), Ordering::AcqRel);
40-
/// // don't retire just yet, add the node to the batch
40+
/// // don't retire just yet, add the object to the batch
4141
/// unsafe { batch.defer(old) };
4242
/// }
4343
///
@@ -51,7 +51,7 @@ pub struct Deferred {
5151
}
5252

5353
impl Deferred {
54-
/// Create a new batch of deferred nodes.
54+
/// Create a new batch of deferred objects.
5555
pub const fn new() -> Deferred {
5656
Deferred {
5757
head: AtomicPtr::new(ptr::null_mut()),
@@ -98,14 +98,14 @@ impl Deferred {
9898
}
9999

100100
/// Retires a batch of values, running `reclaim` when no threads hold a reference to any
101-
/// nodes in the batch.
101+
/// objects in the batch.
102102
///
103103
/// Note that this method is disconnected from any guards on the current thread,
104104
/// so the pointers may be reclaimed immediately.
105105
///
106106
/// # Safety
107107
///
108-
/// The safety requirements of [`Collector::retire`] apply to each node in the batch.
108+
/// The safety requirements of [`Collector::retire`] apply to each object in the batch.
109109
///
110110
/// [`Collector::retire`]: crate::Collector::retire
111111
pub unsafe fn retire_all(&mut self, collector: &Collector, reclaim: unsafe fn(*mut Link)) {
@@ -114,7 +114,7 @@ impl Deferred {
114114
unsafe { collector.raw.add_batch(self, reclaim, Thread::current()) }
115115
}
116116

117-
/// Run a function for each node in the batch.
117+
/// Run a function for each object in the batch.
118118
///
119119
/// This function does not consume the batch and can be called multiple
120120
/// times, **before retirement**.

src/raw.rs

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,12 @@ impl Collector {
255255
// safety: local batches are only accessed by the current thread
256256
let batch = unsafe { (*local_batch).get_or_init(self.batch_size) };
257257

258+
// if we are in a recursive call during drop, reclaim immediately
259+
if batch == LocalBatch::DROP {
260+
unsafe { reclaim(ptr.cast::<Link>()) }
261+
return;
262+
}
263+
258264
// `ptr` is guaranteed to be a valid pointer that can be cast to a node (`T: AsLink`)
259265
//
260266
// any other thread with a reference to the pointer only has a shared
@@ -301,6 +307,12 @@ impl Collector {
301307
// safety: local batches are only accessed by the current thread
302308
let batch = unsafe { (*local_batch).get_or_init(self.batch_size) };
303309

310+
// if we are in a recursive call during drop, reclaim immediately
311+
if batch == LocalBatch::DROP {
312+
deferred.for_each(|ptr| unsafe { reclaim(ptr.cast::<Link>()) });
313+
return;
314+
}
315+
304316
// keep track of the oldest node in the batch
305317
let min_epoch = *deferred.min_epoch.get_mut();
306318

@@ -368,7 +380,7 @@ impl Collector {
368380
let batch = unsafe { (*local_batch).batch };
369381

370382
// there is nothing to retire
371-
if batch.is_null() {
383+
if batch.is_null() || batch == LocalBatch::DROP {
372384
return;
373385
}
374386

@@ -536,6 +548,35 @@ impl Collector {
536548
}
537549
}
538550

551+
// Reclaim all values in the collector, including recursive calls to retire.
552+
//
553+
// # Safety
554+
//
555+
// No threads may be accessing the collector or any values that have been retired.
556+
pub unsafe fn reclaim_all(&self) {
557+
for local_batch in self.batches.iter() {
558+
let local_batch = local_batch.value.get();
559+
560+
unsafe {
561+
let batch = (*local_batch).batch;
562+
563+
// there is nothing to reclaim
564+
if batch.is_null() {
565+
continue;
566+
}
567+
568+
// tell any recursive calls to `retire` to reclaim immediately
569+
(*local_batch).batch = LocalBatch::DROP;
570+
571+
// safety: we have &mut self and the batch is non-null
572+
Collector::free_batch(batch);
573+
574+
// reset the batch
575+
(*local_batch).batch = ptr::null_mut();
576+
}
577+
}
578+
}
579+
539580
// Free a reservation list.
540581
//
541582
// # Safety
@@ -556,20 +597,8 @@ impl Collector {
556597

557598
impl Drop for Collector {
558599
fn drop(&mut self) {
559-
for batch in self.batches.iter() {
560-
unsafe {
561-
// safety: we have &mut self
562-
let batch = (*batch.value.get()).batch;
563-
564-
// there is nothing to reclaim
565-
if batch.is_null() {
566-
continue;
567-
}
568-
569-
// safety: we have &mut self and the batch is non-null
570-
Collector::free_batch(batch);
571-
}
572-
}
600+
// safety: we have &mut self
601+
unsafe { self.reclaim_all() };
573602
}
574603
}
575604

@@ -651,7 +680,7 @@ impl Entry {
651680
//
652681
// While null indicates an empty list, INACTIVE indicates the thread has no active
653682
// guards and is not accessing any objects.
654-
pub const INACTIVE: *mut Entry = -1_isize as usize as _;
683+
pub const INACTIVE: *mut Entry = usize::MAX as _;
655684
}
656685

657686
// A pointer to a batch, unique to the current thread.
@@ -668,6 +697,10 @@ impl Default for LocalBatch {
668697
}
669698

670699
impl LocalBatch {
700+
// This is set during a call to reclaim_all, signalling recursive calls to retire
701+
// to reclaim immediately.
702+
const DROP: *mut Batch = usize::MAX as _;
703+
671704
// Return a pointer to the batch, initializing it if the batch was null.
672705
fn get_or_init(&mut self, capacity: usize) -> *mut Batch {
673706
if self.batch.is_null() {

tests/lib.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,66 @@ fn recursive_retire() {
177177
}
178178
}
179179

180+
#[test]
181+
fn reclaim_all() {
182+
let collector = Collector::new().batch_size(2);
183+
184+
for _ in 0..cfg::ITER {
185+
let dropped = Arc::new(AtomicUsize::new(0));
186+
187+
let items = (0..cfg::ITEMS)
188+
.map(|_| AtomicPtr::new(collector.link_boxed(DropTrack(dropped.clone()))))
189+
.collect::<Vec<_>>();
190+
191+
for item in items {
192+
unsafe {
193+
collector.retire(
194+
item.load(Ordering::Relaxed),
195+
reclaim::boxed::<Linked<DropTrack>>,
196+
)
197+
};
198+
}
199+
200+
unsafe { collector.reclaim_all() };
201+
assert_eq!(dropped.load(Ordering::Relaxed), cfg::ITEMS);
202+
}
203+
}
204+
205+
#[test]
206+
fn recursive_retire_reclaim_all() {
207+
struct Recursive {
208+
_value: usize,
209+
collector: *mut Collector,
210+
pointers: Vec<*mut Linked<DropTrack>>,
211+
}
212+
213+
unsafe {
214+
// make sure retire runs in drop, not immediately
215+
let collector = Box::into_raw(Box::new(Collector::new().batch_size(cfg::ITEMS * 2)));
216+
let dropped = Arc::new(AtomicUsize::new(0));
217+
218+
let ptr = (*collector).link_boxed(Recursive {
219+
_value: 0,
220+
collector,
221+
pointers: (0..cfg::ITEMS)
222+
.map(|_| (*collector).link_boxed(DropTrack(dropped.clone())))
223+
.collect(),
224+
});
225+
226+
(*collector).retire(ptr, |link| {
227+
let value = Box::from_raw(link.cast::<Linked<Recursive>>());
228+
let collector = value.value.collector;
229+
for pointer in value.value.pointers {
230+
(*collector).retire(pointer, reclaim::boxed::<Linked<DropTrack>>);
231+
}
232+
});
233+
234+
(*collector).reclaim_all();
235+
assert_eq!(dropped.load(Ordering::Relaxed), cfg::ITEMS);
236+
let _ = Box::from_raw(collector);
237+
}
238+
}
239+
180240
#[test]
181241
fn deferred() {
182242
let collector = Arc::new(Collector::new().batch_size(2));

0 commit comments

Comments
 (0)