Skip to content

Commit c811bf9

Browse files
committed
Break meta into separate cache lines
1 parent e4d46ec commit c811bf9

File tree

4 files changed

+187
-135
lines changed

4 files changed

+187
-135
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ core_affinity = "0.8"
2727
[dependencies]
2828
cfg-if = "1.0"
2929
wait_on_address = { version = "0.1", optional = true }
30+
yep-cache-line-size = "0.9"
3031

3132
[[bench]]
3233
name = "spsc-throughput"

src/queue.rs

Lines changed: 63 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::sync::atomic::Ordering;
22

3-
use crate::queue_meta::YCQueueU64Meta;
3+
use crate::queue_meta::{YCQueueCursor, cursor_advance, cursor_index};
44

55
use crate::utils::get_bit;
66
use crate::{YCQueueError, YCQueueSharedMeta, utils};
@@ -28,6 +28,8 @@ pub struct YCQueue<'a> {
2828
slots: Vec<Option<&'a mut [u8]>>,
2929
slot_count: u16,
3030
slot_size: u16,
31+
// slot count is a power of two, so we can store its exponent for fast calculations
32+
slot_count_exp: u16,
3133
}
3234

3335
impl<'a> YCQueue<'a> {
@@ -70,11 +72,17 @@ impl<'a> YCQueue<'a> {
7072
return Err(YCQueueError::InvalidArgs);
7173
}
7274

75+
if !slot_count.is_power_of_two() {
76+
return Err(YCQueueError::InvalidArgs);
77+
}
78+
let slot_count_exp = slot_count.trailing_zeros() as u16;
79+
7380
Ok(YCQueue {
7481
shared_metadata,
7582
slots,
7683
slot_count: slot_count as u16,
7784
slot_size: slot_size as u16,
85+
slot_count_exp,
7886
})
7987
}
8088

@@ -252,9 +260,16 @@ impl<'a> YCQueue<'a> {
252260
Ok(())
253261
}
254262

255-
/// Snapshot the packed metadata that tracks indices and in-flight count.
256-
fn get_u64_meta(&self) -> YCQueueU64Meta {
257-
YCQueueU64Meta::from_u64(self.shared_metadata.u64_meta.load(Ordering::Acquire))
263+
fn cursor_index(&self, cursor: YCQueueCursor) -> u16 {
264+
cursor_index(cursor, self.slot_count_exp)
265+
}
266+
267+
fn producer_cursor(&self) -> YCQueueCursor {
268+
self.shared_metadata.producer_cursor.load(Ordering::Acquire)
269+
}
270+
271+
fn consumer_cursor(&self) -> YCQueueCursor {
272+
self.shared_metadata.consumer_cursor.load(Ordering::Acquire)
258273
}
259274

260275
/// Returns the number of slots that have been produced (or are being produced into) but not yet consumed.
@@ -278,7 +293,9 @@ impl<'a> YCQueue<'a> {
278293
/// ```
279294
#[inline]
280295
pub fn in_flight_count(&self) -> u16 {
281-
self.get_u64_meta().in_flight
296+
(self.producer_cursor() - self.consumer_cursor())
297+
.try_into()
298+
.expect("in_flight_count overflow")
282299
}
283300

284301
/// Returns the circular index that will be reserved by the next producer call.
@@ -287,7 +304,7 @@ impl<'a> YCQueue<'a> {
287304
/// The slot index measured modulo the queue capacity.
288305
#[inline]
289306
pub fn produce_idx(&self) -> u16 {
290-
self.get_u64_meta().produce_idx
307+
self.cursor_index(self.producer_cursor())
291308
}
292309

293310
/// Returns the circular index that will be reserved by the next consumer call.
@@ -296,7 +313,7 @@ impl<'a> YCQueue<'a> {
296313
/// The slot index measured modulo the queue capacity.
297314
#[inline]
298315
pub fn consume_idx(&self) -> u16 {
299-
self.get_u64_meta().consume_idx
316+
self.cursor_index(self.consumer_cursor())
300317
}
301318

302319
/// Returns the total number of slots managed by this queue.
@@ -349,17 +366,22 @@ impl<'a> YCQueue<'a> {
349366
return Err(YCQueueError::InvalidArgs);
350367
}
351368

352-
let start_index = loop {
353-
let value = self.shared_metadata.u64_meta.load(Ordering::Acquire);
354-
let mut meta = YCQueueU64Meta::from_u64(value);
369+
let old_producer = loop {
370+
let consumer = self.shared_metadata.consumer_cursor.load(Ordering::Acquire);
371+
let producer = self.shared_metadata.producer_cursor.load(Ordering::Acquire);
372+
let producer_limit = cursor_advance(consumer, self.slot_count);
355373

356-
if meta.in_flight as u32 + num_slots as u32 > self.slot_count as u32 {
374+
if (!best_effort && cursor_advance(producer, num_slots) > producer_limit)
375+
|| (best_effort && producer >= producer_limit)
376+
{
357377
return Err(YCQueueError::OutOfSpace);
358378
}
359379

360-
// make sure all the slots we want are owned by the producer
361-
let available_slots =
362-
self.check_owner(meta.produce_idx, num_slots, YCQueueOwner::Producer);
380+
let available_slots = self.check_owner(
381+
self.cursor_index(producer),
382+
num_slots,
383+
YCQueueOwner::Producer,
384+
);
363385

364386
if (!best_effort && available_slots != num_slots)
365387
|| (best_effort && available_slots == 0)
@@ -369,32 +391,24 @@ impl<'a> YCQueue<'a> {
369391

370392
debug_assert!(available_slots > 0);
371393
debug_assert!(available_slots <= num_slots);
372-
373394
num_slots = available_slots;
374-
let produce_idx = meta.produce_idx;
375-
meta.in_flight += num_slots;
376-
meta.produce_idx += num_slots;
377-
// wrap around if needed
378-
if meta.produce_idx >= self.slot_count {
379-
// TODO: tag as cold path
380-
meta.produce_idx -= self.slot_count;
381-
}
382395

383-
let new_value = meta.to_u64();
384-
match self.shared_metadata.u64_meta.compare_exchange(
385-
value,
386-
new_value,
396+
let next_producer = cursor_advance(producer, available_slots);
397+
match self.shared_metadata.producer_cursor.compare_exchange(
398+
producer,
399+
next_producer,
387400
Ordering::AcqRel,
388401
Ordering::Acquire,
389402
) {
390-
Ok(_) => break produce_idx,
403+
Ok(_) => break producer,
391404
Err(_) => continue,
392405
}
393406
};
394407

395408
let mut slots = Vec::with_capacity(num_slots as usize);
396-
let mut index = start_index;
409+
let mut cursor = old_producer;
397410
for _ in 0..num_slots {
411+
let index = self.cursor_index(cursor);
398412
debug_assert_eq!(self.get_owner(index), YCQueueOwner::Producer);
399413

400414
let slot_data = self.slots[index as usize].take();
@@ -403,12 +417,7 @@ impl<'a> YCQueue<'a> {
403417
None => panic!("We double-loaned out produce index {index:?}"),
404418
}
405419

406-
index += 1;
407-
// wrap around if needed
408-
if index >= self.slot_count {
409-
// TODO: tag as cold path
410-
index -= self.slot_count;
411-
}
420+
cursor = cursor_advance(cursor, 1);
412421
}
413422

414423
Ok(slots)
@@ -600,16 +609,21 @@ impl<'a> YCQueue<'a> {
600609
return Err(YCQueueError::InvalidArgs);
601610
}
602611

603-
let start_index = loop {
604-
let value = self.shared_metadata.u64_meta.load(Ordering::Acquire);
605-
let mut meta = YCQueueU64Meta::from_u64(value);
612+
let old_consumer = loop {
613+
let producer = self.shared_metadata.producer_cursor.load(Ordering::Acquire);
614+
let consumer = self.shared_metadata.consumer_cursor.load(Ordering::Acquire);
606615

607-
if meta.in_flight < num_slots {
616+
if (!best_effort && cursor_advance(consumer, num_slots) > producer)
617+
|| (best_effort && consumer >= producer)
618+
{
608619
return Err(YCQueueError::EmptyQueue);
609620
}
610621

611-
let available_slots =
612-
self.check_owner(meta.consume_idx, num_slots, YCQueueOwner::Consumer);
622+
let available_slots = self.check_owner(
623+
self.cursor_index(consumer),
624+
num_slots,
625+
YCQueueOwner::Consumer,
626+
);
613627
if (!best_effort && available_slots != num_slots)
614628
|| (best_effort && available_slots == 0)
615629
{
@@ -620,29 +634,23 @@ impl<'a> YCQueue<'a> {
620634
debug_assert!(available_slots <= num_slots);
621635
num_slots = available_slots;
622636

623-
let consume_idx = meta.consume_idx;
624-
let mut next_idx = consume_idx as u32 + num_slots as u32;
625-
if next_idx >= self.slot_count as u32 {
626-
next_idx -= self.slot_count as u32;
627-
}
628-
meta.consume_idx = next_idx as u16;
629-
meta.in_flight -= num_slots;
637+
let new_consumer = cursor_advance(consumer, num_slots);
630638

631-
let new_value = meta.to_u64();
632-
match self.shared_metadata.u64_meta.compare_exchange(
633-
value,
634-
new_value,
639+
match self.shared_metadata.consumer_cursor.compare_exchange(
640+
consumer,
641+
new_consumer,
635642
Ordering::AcqRel,
636643
Ordering::Acquire,
637644
) {
638-
Ok(_) => break consume_idx,
645+
Ok(_) => break consumer,
639646
Err(_) => continue,
640647
}
641648
};
642649

643650
let mut slots = Vec::with_capacity(num_slots as usize);
644-
let mut index = start_index;
651+
let mut cursor = old_consumer;
645652
for _ in 0..num_slots {
653+
let index = self.cursor_index(cursor);
646654
debug_assert_eq!(self.get_owner(index), YCQueueOwner::Consumer);
647655

648656
let slot_data = self.slots[index as usize].take();
@@ -651,10 +659,7 @@ impl<'a> YCQueue<'a> {
651659
None => panic!("We double-loaned out consume index {index:?}"),
652660
}
653661

654-
index += 1;
655-
if index >= self.slot_count {
656-
index -= self.slot_count;
657-
}
662+
cursor = cursor_advance(cursor, 1);
658663
}
659664

660665
Ok(slots)

0 commit comments

Comments
 (0)