Skip to content

Commit ad8eef7

Browse files
committed
changed enqueue and dequeue counters to be Cell usize rather than AtomicUsize
1 parent 7b956d2 commit ad8eef7

File tree

1 file changed

+54
-54
lines changed

1 file changed

+54
-54
lines changed

src/shardedringbuf.rs

Lines changed: 54 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
use crossbeam_utils::CachePadded;
88
use fastrand::usize as frand;
99
use std::{
10-
cell::UnsafeCell,
10+
cell::{Cell, UnsafeCell},
1111
fmt::{Debug, Write},
1212
mem::MaybeUninit,
1313
ptr,
@@ -60,9 +60,9 @@ struct InnerRingBuffer<T> {
6060
/// Box containing the content of the buffer
6161
items: Box<[UnsafeCell<MaybeUninit<T>>]>,
6262
/// Where to enqueue at in the Box
63-
enqueue_index: AtomicUsize,
63+
enqueue_index: Cell<usize>,
6464
/// Where to dequeue at in the Box
65-
dequeue_index: AtomicUsize,
65+
dequeue_index: Cell<usize>,
6666
}
6767

6868
/// Implements the InnerRingBuffer functions
@@ -78,17 +78,17 @@ impl<T> InnerRingBuffer<T> {
7878
}
7979
vec.into_boxed_slice()
8080
},
81-
enqueue_index: AtomicUsize::new(0),
82-
dequeue_index: AtomicUsize::new(0),
81+
enqueue_index: Cell::new(0),
82+
dequeue_index: Cell::new(0),
8383
}
8484
}
8585

8686
/// Helper function to see if a given index inside this buffer does
8787
/// indeed contain a valid item. Used in Drop Trait.
8888
#[inline(always)]
8989
fn is_item_in_shard(&self, item_ind: usize) -> bool {
90-
let enqueue_ind = self.enqueue_index.load(Ordering::Relaxed) % self.items.len();
91-
let dequeue_ind = self.dequeue_index.load(Ordering::Relaxed) % self.items.len();
90+
let enqueue_ind = self.enqueue_index.get() % self.items.len();
91+
let dequeue_ind = self.dequeue_index.get() % self.items.len();
9292

9393
if enqueue_ind > dequeue_ind {
9494
item_ind < enqueue_ind && item_ind >= dequeue_ind
@@ -371,10 +371,9 @@ impl<T> ShardedRingBuf<T> {
371371
#[inline(always)]
372372
fn enqueue_in_shard(&self, shard_ind: usize, item: T) {
373373
let inner = &self.inner_rb[shard_ind];
374-
// we use fetch add here because we want to obtain the previous value
375-
// to dequeue while also incrementing this counter (separate load and store
376-
// incurs more cost)
377-
let enqueue_index = inner.enqueue_index.fetch_add(1, Ordering::Relaxed) % inner.items.len();
374+
375+
let enqueue_index = inner.enqueue_index.get() % inner.items.len();
376+
inner.enqueue_index.set(inner.enqueue_index.get() + 1);
378377
let item_cell = inner.items[enqueue_index].get();
379378
// SAFETY: Only one thread will perform this operation and write to this
380379
// item cell
@@ -416,13 +415,13 @@ impl<T> ShardedRingBuf<T> {
416415
}
417416
loop {
418417
let inner = &self.inner_rb[0];
419-
let enq_counter = inner.enqueue_index.load(Ordering::Relaxed);
420-
let deq_counter = inner.dequeue_index.load(Ordering::Relaxed);
418+
let enq_counter = inner.enqueue_index.get();
419+
let deq_counter = inner.dequeue_index.get();
421420
let jobs = enq_counter.wrapping_sub(deq_counter);
422421

423422
if jobs != inner.items.len() {
424-
let enqueue_index =
425-
inner.enqueue_index.fetch_add(1, Ordering::Relaxed) % inner.items.len();
423+
let enqueue_index = inner.enqueue_index.get() % inner.items.len();
424+
inner.enqueue_index.set(inner.enqueue_index.get() + 1);
426425
let item_cell = inner.items[enqueue_index].get();
427426
unsafe {
428427
(*item_cell).write(item);
@@ -459,7 +458,8 @@ impl<T> ShardedRingBuf<T> {
459458
// we use fetch add here because we want to obtain the previous value
460459
// to dequeue while also incrementing this counter (separate load and store
461460
// incurs more cost)
462-
let dequeue_index = inner.dequeue_index.fetch_add(1, Ordering::Relaxed) % inner.items.len();
461+
let dequeue_index = inner.dequeue_index.get() % inner.items.len();
462+
inner.dequeue_index.set(inner.dequeue_index.get() + 1);
463463

464464
let item_cell = inner.items[dequeue_index].get();
465465

@@ -505,14 +505,14 @@ impl<T> ShardedRingBuf<T> {
505505
}
506506

507507
let inner = &self.inner_rb[0];
508-
let enq_counter = inner.enqueue_index.load(Ordering::Relaxed);
509-
let deq_counter = inner.dequeue_index.load(Ordering::Relaxed);
508+
let enq_counter = inner.enqueue_index.get();
509+
let deq_counter = inner.dequeue_index.get();
510510

511511
let jobs = enq_counter.wrapping_sub(deq_counter);
512512

513513
if jobs != 0 {
514-
let dequeue_index =
515-
inner.dequeue_index.fetch_add(1, Ordering::Relaxed) % inner.items.len();
514+
let dequeue_index = inner.dequeue_index.get() % inner.items.len();
515+
inner.dequeue_index.set(inner.dequeue_index.get() + 1);
516516
// SAFETY: Only one thread will claim this slot and perform this operation
517517
// And it's guaranteed that an item will exist here
518518
let item = unsafe { (*inner.items[dequeue_index].get()).assume_init_read() };
@@ -574,13 +574,13 @@ impl<T> ShardedRingBuf<T> {
574574
if self.poisoned.load(Ordering::Relaxed) && self.is_empty() {
575575
return None;
576576
}
577-
let enq_counter = inner.enqueue_index.load(Ordering::Relaxed);
578-
let deq_counter = inner.dequeue_index.load(Ordering::Relaxed);
577+
let enq_counter = inner.enqueue_index.get();
578+
let deq_counter = inner.dequeue_index.get();
579579
let jobs = enq_counter.wrapping_sub(deq_counter);
580580

581581
if jobs != 0 {
582-
let dequeue_index =
583-
inner.dequeue_index.fetch_add(1, Ordering::Relaxed) % inner.items.len();
582+
let dequeue_index = inner.dequeue_index.get() % inner.items.len();
583+
inner.dequeue_index.set(inner.dequeue_index.get() + 1);
584584
// SAFETY: Only one thread will claim this slot and perform this operation
585585
// And it's guaranteed that an item will exist here
586586
let item =
@@ -686,8 +686,8 @@ impl<T> ShardedRingBuf<T> {
686686
// reset each shard's inner ring buffer
687687
for shard in 0..self.shard_locks.len() {
688688
let inner = &self.inner_rb[shard];
689-
let mut drop_index = inner.dequeue_index.load(Ordering::Relaxed) % inner.items.len();
690-
let stop_index = inner.enqueue_index.load(Ordering::Relaxed) % inner.items.len();
689+
let mut drop_index = inner.dequeue_index.get() % inner.items.len();
690+
let stop_index = inner.enqueue_index.get() % inner.items.len();
691691
while drop_index != stop_index {
692692
// SAFETY: This will only clear out initialized values that have not
693693
// been dequeued. Note here that this method uses Relaxed loads.
@@ -698,10 +698,10 @@ impl<T> ShardedRingBuf<T> {
698698
}
699699
self.inner_rb[shard]
700700
.enqueue_index
701-
.store(0, Ordering::Relaxed);
701+
.set(0);
702702
self.inner_rb[shard]
703703
.dequeue_index
704-
.store(0, Ordering::Relaxed);
704+
.set(0);
705705
}
706706
}
707707

@@ -727,8 +727,8 @@ impl<T> ShardedRingBuf<T> {
727727
// reset each shard's inner ring buffer
728728
for (shard_ind, _guard) in guards.into_iter().enumerate() {
729729
let inner = &self.inner_rb[shard_ind];
730-
let mut drop_index = inner.dequeue_index.load(Ordering::Acquire) % inner.items.len();
731-
let stop_index = inner.enqueue_index.load(Ordering::Acquire) % inner.items.len();
730+
let mut drop_index = inner.dequeue_index.get() % inner.items.len();
731+
let stop_index = inner.enqueue_index.get() % inner.items.len();
732732
while drop_index != stop_index {
733733
// SAFETY: This will only clear out initialized values that have not
734734
// been dequeued.
@@ -741,10 +741,10 @@ impl<T> ShardedRingBuf<T> {
741741
}
742742
self.inner_rb[shard_ind]
743743
.enqueue_index
744-
.store(0, Ordering::Release);
744+
.set(0);
745745
self.inner_rb[shard_ind]
746746
.dequeue_index
747-
.store(0, Ordering::Release);
747+
.set(0);
748748
}
749749
}
750750

@@ -799,8 +799,8 @@ impl<T> ShardedRingBuf<T> {
799799
let inner = &self.inner_rb[shard_ind];
800800
// use these values as monotonic counter than indices
801801
let (enq_ind, deq_ind) = (
802-
inner.enqueue_index.load(Ordering::Relaxed),
803-
inner.dequeue_index.load(Ordering::Relaxed),
802+
inner.enqueue_index.get(),
803+
inner.dequeue_index.get(),
804804
);
805805
let jobs = enq_ind.wrapping_sub(deq_ind);
806806
jobs == 0
@@ -819,8 +819,8 @@ impl<T> ShardedRingBuf<T> {
819819
let inner = &self.inner_rb[shard_ind];
820820
// use these values as monotonic counter than indices
821821
let (enq_ind, deq_ind) = (
822-
inner.enqueue_index.load(Ordering::Relaxed),
823-
inner.dequeue_index.load(Ordering::Relaxed),
822+
inner.enqueue_index.get(),
823+
inner.dequeue_index.get(),
824824
);
825825
let jobs = enq_ind.wrapping_sub(deq_ind);
826826
jobs == 0
@@ -880,8 +880,8 @@ impl<T> ShardedRingBuf<T> {
880880
let item_len = inner.items.len();
881881
// use these values as monotonic counter than indices
882882
let (enq_ind, deq_ind) = (
883-
inner.enqueue_index.load(Ordering::Relaxed),
884-
inner.dequeue_index.load(Ordering::Relaxed),
883+
inner.enqueue_index.get(),
884+
inner.dequeue_index.get(),
885885
);
886886
let jobs = enq_ind.wrapping_sub(deq_ind);
887887
jobs == item_len
@@ -901,8 +901,8 @@ impl<T> ShardedRingBuf<T> {
901901
let item_len = inner.items.len();
902902
// use these values as monotonic counter than indices
903903
let (enq_ind, deq_ind) = (
904-
inner.enqueue_index.load(Ordering::Relaxed),
905-
inner.dequeue_index.load(Ordering::Relaxed),
904+
inner.enqueue_index.get(),
905+
inner.dequeue_index.get(),
906906
);
907907
let jobs = enq_ind.wrapping_sub(deq_ind);
908908
jobs == item_len
@@ -925,7 +925,7 @@ impl<T> ShardedRingBuf<T> {
925925

926926
// grab enq val
927927
let inner = &self.inner_rb[shard_ind];
928-
let enq_ind = inner.enqueue_index.load(Ordering::Relaxed) % inner.items.len();
928+
let enq_ind = inner.enqueue_index.get() % inner.items.len();
929929

930930
Some(enq_ind)
931931
}
@@ -947,7 +947,7 @@ impl<T> ShardedRingBuf<T> {
947947

948948
// grab enq val
949949
let inner = &self.inner_rb[shard_ind];
950-
let enq_ind = inner.enqueue_index.load(Ordering::Relaxed) % inner.items.len();
950+
let enq_ind = inner.enqueue_index.get() % inner.items.len();
951951

952952
Some(enq_ind)
953953
}
@@ -969,7 +969,7 @@ impl<T> ShardedRingBuf<T> {
969969

970970
// grab deq ind val
971971
let inner = &self.inner_rb[shard_ind];
972-
let deq_ind = inner.dequeue_index.load(Ordering::Relaxed) % inner.items.len();
972+
let deq_ind = inner.dequeue_index.get() % inner.items.len();
973973

974974
Some(deq_ind)
975975
}
@@ -991,7 +991,7 @@ impl<T> ShardedRingBuf<T> {
991991

992992
// grab deq ind val
993993
let inner = &self.inner_rb[shard_ind];
994-
let deq_ind = inner.dequeue_index.load(Ordering::Relaxed) % inner.items.len();
994+
let deq_ind = inner.dequeue_index.get() % inner.items.len();
995995

996996
Some(deq_ind)
997997
}
@@ -1010,8 +1010,8 @@ impl<T> ShardedRingBuf<T> {
10101010

10111011
let inner = &self.inner_rb[shard_ind];
10121012
let (enq_count, deq_count) = (
1013-
inner.enqueue_index.load(Ordering::Relaxed),
1014-
inner.dequeue_index.load(Ordering::Relaxed),
1013+
inner.enqueue_index.get(),
1014+
inner.dequeue_index.get(),
10151015
);
10161016
let jobs = enq_count.wrapping_sub(deq_count);
10171017
Some(jobs)
@@ -1034,8 +1034,8 @@ impl<T> ShardedRingBuf<T> {
10341034

10351035
let inner = &self.inner_rb[shard_ind];
10361036
let (enq_count, deq_count) = (
1037-
inner.enqueue_index.load(Ordering::Relaxed),
1038-
inner.dequeue_index.load(Ordering::Relaxed),
1037+
inner.enqueue_index.get(),
1038+
inner.dequeue_index.get(),
10391039
);
10401040
let jobs = enq_count.wrapping_sub(deq_count);
10411041
Some(jobs)
@@ -1052,8 +1052,8 @@ impl<T> ShardedRingBuf<T> {
10521052

10531053
for shard in &self.inner_rb {
10541054
let (enq_count, deq_count) = (
1055-
shard.enqueue_index.load(Ordering::Relaxed),
1056-
shard.dequeue_index.load(Ordering::Relaxed),
1055+
shard.enqueue_index.get(),
1056+
shard.dequeue_index.get(),
10571057
);
10581058
let jobs = enq_count.wrapping_sub(deq_count);
10591059
count.push(jobs);
@@ -1082,8 +1082,8 @@ impl<T> ShardedRingBuf<T> {
10821082
for (shard_ind, _guard) in guards.into_iter().enumerate() {
10831083
let shard = &self.inner_rb[shard_ind];
10841084
let (enq_count, deq_count) = (
1085-
shard.enqueue_index.load(Ordering::Relaxed),
1086-
shard.dequeue_index.load(Ordering::Relaxed),
1085+
shard.enqueue_index.get(),
1086+
shard.dequeue_index.get(),
10871087
);
10881088
let jobs = enq_count.wrapping_sub(deq_count);
10891089
count.push(jobs);
@@ -1096,8 +1096,8 @@ impl<T> ShardedRingBuf<T> {
10961096
#[inline(always)]
10971097
fn is_item_in_shard(&self, item_ind: usize, shard_ind: usize) -> bool {
10981098
let inner = &self.inner_rb[shard_ind];
1099-
let enqueue_ind = inner.enqueue_index.load(Ordering::Relaxed) % inner.items.len();
1100-
let dequeue_ind = inner.dequeue_index.load(Ordering::Relaxed) % inner.items.len();
1099+
let enqueue_ind = inner.enqueue_index.get() % inner.items.len();
1100+
let dequeue_ind = inner.dequeue_index.get() % inner.items.len();
11011101

11021102
if enqueue_ind > dequeue_ind {
11031103
item_ind < enqueue_ind && item_ind >= dequeue_ind

0 commit comments

Comments
 (0)