Skip to content

Commit 41fca91

Browse files
committed
mostly updated mlf_shardedringbuf
1 parent 352551e commit 41fca91

File tree

5 files changed

+129
-63
lines changed

5 files changed

+129
-63
lines changed

benches/kanal_async.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ async fn kanal_async_with_msg_vec(
158158
fn benchmark_kanal_async(c: &mut Criterion) {
159159
const MAX_THREADS: [usize; 1] = [8];
160160
const CAPACITY: usize = 128;
161-
const TASKS: [usize; 1] = [100000];
161+
const TASKS: [usize; 1] = [100];
162162

163163
for thread_num in MAX_THREADS {
164164
let runtime = tokio::runtime::Builder::new_multi_thread()

benches/srb_pin.rs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,15 +195,44 @@ async fn mlfsrb_pin(capacity: usize, shards: usize, task_count: usize) {
195195
// );
196196
// enq_tasks.push(handle);
197197
// }
198+
// for i in 0..task_count {
199+
// let handle = mlf_spawn_enqueuer_with_iterator(rb.clone(), i, 0..1);
200+
// enq_tasks.push(handle);
201+
// }
202+
203+
// for i in 0..shards {
204+
// let handle = mlf_spawn_dequeuer_unbounded(rb.clone(), i, |x| {
205+
// // test_func(x as u128);
206+
// // println!("{:?}", x);
207+
// });
208+
// deq_tasks.push(handle);
209+
// }
210+
198211
for i in 0..task_count {
199-
let handle = mlf_spawn_enqueuer_with_iterator(rb.clone(), i, 0..1);
212+
let handle = tokio::spawn({
213+
let rb_clone = rb.clone();
214+
async move {
215+
rb_clone.enqueue(i).await;
216+
}
217+
});
200218
enq_tasks.push(handle);
201219
}
202220

203221
for i in 0..shards {
204-
let handle = mlf_spawn_dequeuer_unbounded(rb.clone(), i, |x| {
205-
// test_func(x as u128);
206-
// println!("{:?}", x);
222+
let handle = tokio::spawn({
223+
let rb_clone = rb.clone();
224+
async move {
225+
loop {
226+
// let item = rb_clone.dequeue().await;
227+
let item = rb_clone.dequeue_in_shard(i).await;
228+
match item {
229+
None => break,
230+
Some(val) => {
231+
232+
}
233+
}
234+
}
235+
}
207236
});
208237
deq_tasks.push(handle);
209238
}
@@ -545,7 +574,7 @@ fn benchmark_pin(c: &mut Criterion) {
545574
// const SHARDS: [usize; 5] = [1, 2, 4, 8, 16];
546575
// const TASKS: [usize; 5] = [1, 2, 4, 8, 16];
547576
const SHARDS: [usize; 1] = [1];
548-
const TASKS: [usize; 1] = [100000];
577+
const TASKS: [usize; 1] = [100];
549578

550579
// const MSG_COUNT: usize = 250000;
551580
// // let msg = BigData { buf: Box::new([0; 1 * 1024]) };

src/mlf_shardedringbuf.rs

Lines changed: 77 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,18 @@ pub struct MLFShardedRingBuf<T> {
3535
/// Total capacity of the buffer
3636
capacity: AtomicUsize,
3737
/// Multiple InnerRingBuffer structure based on num of shards
38-
/// CachePadded to prevent false sharing
39-
// inner_rb: Box<[CachePadded<InnerRingBuffer<T>>]>,
4038
inner_rb: Box<[InnerRingBuffer<T>]>,
4139

42-
// pub(crate) job_space_shard_notifs: Box<[Notify]>,
43-
// pub(crate) job_post_shard_notifs: Box<[Notify]>,
40+
/// Used as a monotonic increasing counter to determine which
41+
/// shard should an enqueuer task place an item in
42+
/// Cache Padded to prevent false sharing if spawning an enqueuer
43+
/// task happens across multiple threads
44+
shard_enq: CachePadded<AtomicUsize>,
45+
/// Used as a monotonic increasing counter to determine which
46+
/// shard should a dequeuer task place an item in
47+
/// Cache Padded to prevent false sharing if spawning an dequeuer
48+
/// task happens across multiple threads
49+
shard_deq: CachePadded<AtomicUsize>,
4450
}
4551

4652
// An inner ring buffer to contain the items, enqueue and dequeue index for ShardedRingBuf struct
@@ -49,10 +55,13 @@ struct InnerRingBuffer<T> {
4955
/// Box of Slots containing the content of the buffer
5056
/// Cache Padded to avoid false sharing
5157
items: Box<[CachePadded<Slot<T>>]>,
52-
/// Where to enqueue at in the Box
58+
/// Where to enqueue at in the items Box
59+
/// Cache Padded to avoid false sharing
5360
enqueue_index: CachePadded<AtomicUsize>,
54-
/// Where to dequeue at in the Box
55-
// dequeue_index: AtomicUsize,
61+
/// Where to dequeue at in the items Box
62+
/// Cache Padded to avoid false sharing (though since this
63+
/// is the last item, cache padding doesn't really matter
64+
/// too much)
5665
dequeue_index: CachePadded<AtomicUsize>,
5766
}
5867

@@ -63,7 +72,9 @@ struct Slot<T> {
6372
item: UnsafeCell<MaybeUninit<Option<T>>>,
6473
/// 0: empty, 1: full, 2: in progress (deq), 3: in progress (enq)
6574
state: AtomicU8,
75+
/// used to notify an enqueuer task that's waiting on this slot
6676
enq_notify: Notify,
77+
/// used to notify an dequeuer task that's waiting on this slot
6778
deq_notify: Notify
6879
}
6980

@@ -94,7 +105,6 @@ impl<T> InnerRingBuffer<T> {
94105
vec.into_boxed_slice()
95106
},
96107
enqueue_index: CachePadded::new(AtomicUsize::new(0)),
97-
// dequeue_index: AtomicUsize::new(0),
98108
dequeue_index: CachePadded::new(AtomicUsize::new(0)),
99109
}
100110
}
@@ -122,12 +132,8 @@ impl<T> MLFShardedRingBuf<T> {
122132
let mut vec = Vec::with_capacity(shards);
123133
for _ in 0..shards {
124134
if remainder == 0 {
125-
// vec.push(CachePadded::new(InnerRingBuffer::new(capacity_per_shard)));
126135
vec.push(InnerRingBuffer::new(capacity_per_shard));
127136
} else {
128-
// vec.push(CachePadded::new(InnerRingBuffer::new(
129-
// capacity_per_shard + 1,
130-
// )));
131137
vec.push(InnerRingBuffer::new(
132138
capacity_per_shard + 1,
133139
));
@@ -136,22 +142,8 @@ impl<T> MLFShardedRingBuf<T> {
136142
}
137143
vec.into_boxed_slice()
138144
},
139-
// job_post_shard_notifs: {
140-
// let mut vec = Vec::with_capacity(shards);
141-
// for _ in 0..shards {
142-
// vec.push(Notify::new());
143-
// }
144-
// vec.into_boxed_slice()
145-
// },
146-
147-
// job_space_shard_notifs: {
148-
// let mut vec = Vec::with_capacity(shards);
149-
150-
// for _ in 0..shards {
151-
// vec.push(Notify::new());
152-
// }
153-
// vec.into_boxed_slice()
154-
// },
145+
shard_enq: CachePadded::new(AtomicUsize::new(0)),
146+
shard_deq: CachePadded::new(AtomicUsize::new(0))
155147
}
156148
}
157149

@@ -190,6 +182,7 @@ impl<T> MLFShardedRingBuf<T> {
190182
self.capacity.load(Ordering::Relaxed)
191183
}
192184

185+
/// Helper function to take a slot at a specific shard in the ring buffer
193186
#[inline]
194187
async fn take_slot(&self, acquire: Acquire, shard_ind: usize) -> (usize, usize) {
195188
let inner = &self.inner_rb[shard_ind];
@@ -280,16 +273,37 @@ impl<T> MLFShardedRingBuf<T> {
280273
self.release_slot(current.0, current.1, Acquire::Enqueue);
281274
}
282275

283-
/// Adds an item of type T to the RingBuffer, *blocking* the thread until there is space to add the item.
284-
///
285-
/// Time Complexity: O(s_t) where s_t is the time it takes to acquire a shard
276+
/// Adds an item of type T to the ring buffer at a provided shard. If the user
277+
/// provides a shard index greater than the existing number of shards in the
278+
/// buffer, it will perform wrap around (% number of existing shards).
286279
///
280+
/// Time Complexity: O(s_t) where s_t is the time it takes to acquire a slot in a shard
281+
/// (this is usually pretty fast)
282+
///
287283
/// Space complexity: O(1)
288-
pub(crate) async fn enqueue(&self, item: T, shard_ind: usize) {
284+
pub async fn enqueue_in_shard(&self, item: T, shard_ind: usize) {
289285
let shard_ind = shard_ind % self.get_num_of_shards();
290286
self.enqueue_item(Some(item), shard_ind).await;
291287
}
292288

289+
/// Adds an item of type T to the ring buffer. It uses the ring buffer's shard_enq
290+
/// field and mods it with the number of existing shards for the buffer to determine
291+
/// which shard this enqueue operation will occur at. As a result, if you have multiple
292+
/// shards and one enqueuer task repeatedly using enqueue(), it will sweep across the
293+
/// shards and place an item in each.
294+
///
295+
/// If you intend to have an enqueuer task map to a specific shard, use enqueue_in_shard()
296+
/// for more control.
297+
///
298+
/// Time Complexity: O(s_t) where s_t is the time it takes to acquire a slot in the shard.
299+
/// (this is usually pretty fast)
300+
///
301+
/// Space complexity: O(1)
302+
pub async fn enqueue(&self, item: T) {
303+
let shard_ind = self.shard_enq.fetch_add(1, Ordering::Relaxed) % self.get_num_of_shards();
304+
self.enqueue_item(Some(item), shard_ind).await;
305+
}
306+
293307
/// Grab the inner ring buffer shard, dequeue the item, update the dequeue index
294308
#[inline(always)]
295309
fn dequeue_in_slot(&self, shard_ind: usize, slot_ind: usize) -> Option<T> {
@@ -303,26 +317,49 @@ impl<T> MLFShardedRingBuf<T> {
303317
unsafe { (*item_cell).assume_init_read() }
304318
}
305319

306-
/// Retrieves an item of type T from the RingBuffer if an item exists in the buffer.
307-
/// If the ring buffer is set with a poisoned flag or received a poison pill,
320+
/// Retrieves an item of type T from the ring buffer. If the user
321+
/// provides a shard index greater than the existing number of shards in the
322+
/// buffer, it will perform wrap around (% number of existing shards). On a poison pill,
308323
/// this method will return None.
309324
///
310-
/// Time Complexity: O(s_t) where s_t is the time it takes to acquire a shard
325+
/// Time Complexity: O(s_t) where s_t is the time it takes to acquire a slot in the shard
311326
///
312327
/// Space Complexity: O(1)
313328
#[inline(always)]
314-
pub(crate) async fn dequeue(&self, shard_ind: usize) -> Option<T> {
329+
pub async fn dequeue_in_shard(&self, shard_ind: usize) -> Option<T> {
315330
let shard_ind = shard_ind % self.get_num_of_shards();
316331
let current = self.take_slot(Acquire::Dequeue, shard_ind).await;
317332
let item = self.dequeue_in_slot(current.0, current.1);
318333
self.release_slot(current.0, current.1, Acquire::Dequeue);
319334
item
320335
}
321336

322-
/// Sets the poison flag of the ring buffer to true. This will prevent enqueuers
323-
/// from enqueuing anymore jobs if this method is called while enqueues are occuring.
324-
/// However you can use this if you want graceful exit of dequeuers tasks completing
325-
/// all available jobs enqueued first before exiting.
337+
/// Retrieves an item of type T from the ring buffer. It uses the ring buffer's shard_deq
338+
/// field and mods it with the number of existing shards for the buffer to determine
339+
/// which shard this dequeue operation will occur at. As a result, if you have multiple
340+
/// shards and one dequeuer task repeatedly using dequeue(), it will sweep across the
341+
/// shards and place an item in each.
342+
///
343+
/// On a poison pill, this method will return None.
344+
///
345+
/// If you intend to have an dequeuer task map to a specific shard, use dequeue_in_shard()
346+
/// for more control.
347+
///
348+
/// Time Complexity: O(s_t) where s_t is the time it takes to acquire a slot in the shard
349+
///
350+
/// Space Complexity: O(1)
351+
#[inline(always)]
352+
pub async fn dequeue(&self) -> Option<T> {
353+
let shard_ind = self.shard_deq.fetch_add(1, Ordering::Relaxed) % self.get_num_of_shards();
354+
let current = self.take_slot(Acquire::Dequeue, shard_ind).await;
355+
let item = self.dequeue_in_slot(current.0, current.1);
356+
self.release_slot(current.0, current.1, Acquire::Dequeue);
357+
item
358+
}
359+
360+
/// Enqueues a poison pill at a specific shard. If you intend to make your dequeuer
361+
/// task unbounded with its dequeue operations, this should be the method you use
362+
/// to break out of the infinite loop.
326363
///
327364
/// Time Complexity: O(1)
328365
///

src/shardedringbuf.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -88,21 +88,21 @@ impl<T> InnerRingBuffer<T> {
8888
}
8989
}
9090

91-
/// Helper function to see if a given index inside this buffer does
92-
/// indeed contain a valid item. Used in Drop Trait.
93-
#[inline(always)]
94-
fn is_item_in_shard(&self, item_ind: usize) -> bool {
95-
let enqueue_ind = self.enqueue_index.load(Ordering::Relaxed) % self.items.len();
96-
let dequeue_ind = self.dequeue_index.load(Ordering::Relaxed) % self.items.len();
97-
98-
if enqueue_ind > dequeue_ind {
99-
item_ind < enqueue_ind && item_ind >= dequeue_ind
100-
} else if enqueue_ind < dequeue_ind {
101-
item_ind >= dequeue_ind || item_ind < enqueue_ind
102-
} else {
103-
false
104-
}
105-
}
91+
// /// Helper function to see if a given index inside this buffer does
92+
// /// indeed contain a valid item. Used in Drop Trait.
93+
// #[inline(always)]
94+
// fn is_item_in_shard(&self, item_ind: usize) -> bool {
95+
// let enqueue_ind = self.enqueue_index.load(Ordering::Relaxed) % self.items.len();
96+
// let dequeue_ind = self.dequeue_index.load(Ordering::Relaxed) % self.items.len();
97+
98+
// if enqueue_ind > dequeue_ind {
99+
// item_ind < enqueue_ind && item_ind >= dequeue_ind
100+
// } else if enqueue_ind < dequeue_ind {
101+
// item_ind >= dequeue_ind || item_ind < enqueue_ind
102+
// } else {
103+
// false
104+
// }
105+
// }
106106
}
107107

108108
impl<T> ShardedRingBuf<T> {

src/task_local_spawn.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ where
180180
let enq_fut = async move {
181181
let mut counter = 0;
182182
for item in items {
183-
buffer.enqueue(item, shard_ind).await;
183+
buffer.enqueue_in_shard(item, shard_ind).await;
184184
counter += 1;
185185
}
186186

@@ -515,7 +515,7 @@ where
515515
let deq_fut = async move {
516516
let mut counter = 0;
517517
loop {
518-
let deq_item = buffer.dequeue(shard_ind).await;
518+
let deq_item = buffer.dequeue_in_shard(shard_ind).await;
519519
match deq_item {
520520
Some(item) => {
521521
f(item);

0 commit comments

Comments
 (0)