Skip to content

Commit e229eda

Browse files
authored
Merge pull request #10 from Milerius/feat/spsc-ring-buffer
perf(queue): software prefetch + contiguous batch copy for SPSC rings
2 parents 0c96717 + d0be63e commit e229eda

File tree

14 files changed

+2515
-99
lines changed

14 files changed

+2515
-99
lines changed

Cargo.lock

Lines changed: 409 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/bench/benches/spsc.rs

Lines changed: 452 additions & 70 deletions
Large diffs are not rendered by default.

crates/platform/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ default = []
1212
std = []
1313
asm = []
1414
nightly = []
15+
prefetch = []
1516
perf-counters = ["dep:perf-event2"]
1617

1718
[dependencies]

crates/platform/src/intrinsics/compiler_hints.rs

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,55 @@ pub fn prefetch<T>(ptr: *const T, rw: PrefetchRW, locality: PrefetchLocality) {
3737
#[cfg(target_arch = "x86_64")]
3838
{
3939
use core::arch::x86_64::{
40-
_MM_HINT_NTA, _MM_HINT_T0, _MM_HINT_T1, _MM_HINT_T2, _mm_prefetch,
40+
_MM_HINT_ET0, _MM_HINT_NTA, _MM_HINT_T0, _MM_HINT_T1, _MM_HINT_T2, _mm_prefetch,
4141
};
42-
let _ = rw;
4342
let hint = ptr.cast::<i8>();
4443
// SAFETY: prefetch is a hint and never faults, even on invalid addresses.
4544
// The locality must be a compile-time constant for _mm_prefetch.
4645
unsafe {
47-
match locality {
48-
PrefetchLocality::NoTemporal => _mm_prefetch(hint, _MM_HINT_NTA),
49-
PrefetchLocality::Low => _mm_prefetch(hint, _MM_HINT_T2),
50-
PrefetchLocality::Moderate => _mm_prefetch(hint, _MM_HINT_T1),
51-
PrefetchLocality::High => _mm_prefetch(hint, _MM_HINT_T0),
46+
match (rw, locality) {
47+
// Write prefetch: use ET0 (exclusive) to bring line in Modified
48+
// state, avoiding the subsequent RFO on the actual store.
49+
(PrefetchRW::Write, PrefetchLocality::High) => {
50+
_mm_prefetch(hint, _MM_HINT_ET0);
51+
}
52+
// Non-High write localities fall through to read hints — ET0
53+
// only exists as a single locality level on x86; for other
54+
// localities we use the read hint as a reasonable fallback.
55+
(_, PrefetchLocality::NoTemporal) => _mm_prefetch(hint, _MM_HINT_NTA),
56+
(_, PrefetchLocality::Low) => _mm_prefetch(hint, _MM_HINT_T2),
57+
(_, PrefetchLocality::Moderate) => _mm_prefetch(hint, _MM_HINT_T1),
58+
(_, PrefetchLocality::High) => _mm_prefetch(hint, _MM_HINT_T0),
5259
}
5360
}
5461
}
55-
#[cfg(not(target_arch = "x86_64"))]
62+
#[cfg(target_arch = "aarch64")]
63+
{
64+
let addr = ptr.cast::<u8>();
65+
// SAFETY: PRFM is a hint instruction — it never faults and has no
66+
// side effects beyond cache management. options(nostack, preserves_flags)
67+
// tells LLVM it doesn't touch the stack or condition flags.
68+
unsafe {
69+
match rw {
70+
PrefetchRW::Read => {
71+
core::arch::asm!(
72+
"prfm pldl1keep, [{ptr}]",
73+
ptr = in(reg) addr,
74+
options(nostack, preserves_flags),
75+
);
76+
}
77+
PrefetchRW::Write => {
78+
core::arch::asm!(
79+
"prfm pstl1keep, [{ptr}]",
80+
ptr = in(reg) addr,
81+
options(nostack, preserves_flags),
82+
);
83+
}
84+
}
85+
}
86+
let _ = locality; // Locality encoded in instruction mnemonic (L1KEEP)
87+
}
88+
#[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
5689
{
5790
let _ = (ptr, rw, locality);
5891
}
@@ -109,6 +142,21 @@ mod tests {
109142
prefetch_large(big.as_ptr(), PrefetchRW::Read, PrefetchLocality::Low, 2);
110143
}
111144

145+
#[test]
146+
fn prefetch_write_does_not_crash() {
147+
let mut value: u64 = 42;
148+
prefetch(&raw const value, PrefetchRW::Write, PrefetchLocality::High);
149+
// Verify write after prefetch still works
150+
value = 99;
151+
assert_eq!(value, 99);
152+
}
153+
154+
#[test]
155+
fn prefetch_read_does_not_crash_stack_array() {
156+
let arr = [0u8; 128];
157+
prefetch(arr.as_ptr(), PrefetchRW::Read, PrefetchLocality::High);
158+
}
159+
112160
#[test]
113161
fn enum_values() {
114162
assert_eq!(PrefetchRW::Read as i32, 0);

crates/queue/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,15 @@ std = ["alloc", "mantis-platform/std"]
1313
alloc = []
1414
asm = ["mantis-platform/asm"]
1515
nightly = ["mantis-platform/nightly"]
16+
prefetch = ["mantis-platform/prefetch"]
1617

1718
[dependencies]
1819
mantis-core = { workspace = true }
1920
mantis-platform = { workspace = true }
2021
mantis-types = { workspace = true }
2122

23+
[dev-dependencies]
24+
proptest = "1.11.0"
25+
2226
[lints]
2327
workspace = true

crates/queue/examples/asm_shim.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ pub fn spsc_copy_pop_u64(ring: &mut SpscRingCopy<u64, 1024>, out: &mut u64) -> b
3939
ring.pop(out)
4040
}
4141

42+
#[inline(never)]
43+
pub fn spsc_copy_push_batch_u64(ring: &mut SpscRingCopy<u64, 1024>, src: &[u64]) -> usize {
44+
ring.push_batch(src)
45+
}
46+
47+
#[inline(never)]
48+
pub fn spsc_copy_pop_batch_u64(ring: &mut SpscRingCopy<u64, 1024>, dst: &mut [u64]) -> usize {
49+
ring.pop_batch(dst)
50+
}
51+
4252
fn main() {
4353
let mut ring = SpscRing::<u64, 1024>::new();
4454
std::hint::black_box(spsc_push_u64(&mut ring, 42));
@@ -52,4 +62,9 @@ fn main() {
5262
std::hint::black_box(spsc_copy_push_u64(&mut copy_ring, &42));
5363
let mut copy_out = 0u64;
5464
std::hint::black_box(spsc_copy_pop_u64(&mut copy_ring, &mut copy_out));
65+
66+
let batch_src = [0u64; 8];
67+
std::hint::black_box(spsc_copy_push_batch_u64(&mut copy_ring, &batch_src));
68+
let mut batch_dst = [0u64; 8];
69+
std::hint::black_box(spsc_copy_pop_batch_u64(&mut copy_ring, &mut batch_dst));
5570
}

crates/queue/src/copy_ring/engine.rs

Lines changed: 182 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ where
6363
let head = self.head.load(Ordering::Relaxed);
6464
let next_head = I::wrap(head + 1, self.storage.capacity());
6565

66+
#[cfg(feature = "prefetch")]
67+
crate::raw::prefetch_slot_write(&self.storage, next_head);
68+
6669
if next_head == self.tail_cached.get() {
6770
let tail = self.tail.load(Ordering::Acquire);
6871
self.tail_cached.set(tail);
@@ -83,6 +86,9 @@ where
8386
pub(crate) fn pop(&self, out: &mut T) -> bool {
8487
let tail = self.tail.load(Ordering::Relaxed);
8588

89+
#[cfg(feature = "prefetch")]
90+
crate::raw::prefetch_slot_read(&self.storage, tail);
91+
8692
if tail == self.head_cached.get() {
8793
let head = self.head.load(Ordering::Acquire);
8894
self.head_cached.set(head);
@@ -133,13 +139,29 @@ where
133139
}
134140

135141
let n = src.len().min(free);
136-
let mut idx = head;
137-
for item in &src[..n] {
138-
crate::copy_ring::raw::write_slot_copy::<T, S, CP>(&self.storage, idx, item);
139-
idx = I::wrap(idx + 1, cap);
142+
143+
// Two-chunk contiguous copy bypassing per-element CopyPolicy dispatch.
144+
// `memcpy` auto-vectorizes for bulk transfers; per-element SIMD
145+
// dispatch adds call overhead that dominates for large batches.
146+
//
147+
// `first_chunk`: slots from head to end of backing array (no wrap).
148+
// `second_chunk`: remaining slots written from index 0 (wrap).
149+
let first_chunk = n.min(cap - head);
150+
let second_chunk = n - first_chunk;
151+
152+
// First chunk: slots head..head+first_chunk (no wrap).
153+
// `head < cap` (ring invariant) and `first_chunk <= cap - head`,
154+
// so `head + first_chunk <= cap`. Producer owns this range.
155+
crate::copy_ring::raw::write_batch_copy::<T, S>(&self.storage, head, &src[..first_chunk]);
156+
157+
if second_chunk > 0 {
158+
// Second chunk: wraps to slots 0..second_chunk.
159+
// `second_chunk <= n - first_chunk <= free < cap`,
160+
// so `second_chunk <= cap`. Producer owns this range.
161+
crate::copy_ring::raw::write_batch_copy::<T, S>(&self.storage, 0, &src[first_chunk..n]);
140162
}
141163

142-
self.head.store(idx, Ordering::Release);
164+
self.head.store(I::wrap(head + n, cap), Ordering::Release);
143165
n
144166
}
145167

@@ -173,13 +195,37 @@ where
173195
}
174196

175197
let n = dst.len().min(avail);
176-
let mut idx = tail;
177-
for out in &mut dst[..n] {
178-
crate::copy_ring::raw::read_slot_copy::<T, S, CP>(&self.storage, idx, out);
179-
idx = I::wrap(idx + 1, cap);
198+
199+
// Two-chunk contiguous copy symmetric to push_batch.
200+
// `memcpy` auto-vectorizes for bulk transfers; per-element dispatch
201+
// adds call overhead that dominates for large batches.
202+
//
203+
// `first_chunk`: slots from tail to end of backing array (no wrap).
204+
// `second_chunk`: remaining slots read from index 0 (wrap).
205+
let first_chunk = n.min(cap - tail);
206+
let second_chunk = n - first_chunk;
207+
208+
// First chunk: slots tail..tail+first_chunk (no wrap).
209+
// `tail < cap` (ring invariant) and `first_chunk <= cap - tail`,
210+
// so `tail + first_chunk <= cap`. Consumer owns this range.
211+
crate::copy_ring::raw::read_batch_copy::<T, S>(
212+
&self.storage,
213+
tail,
214+
&mut dst[..first_chunk],
215+
);
216+
217+
if second_chunk > 0 {
218+
// Second chunk: wraps to slots 0..second_chunk.
219+
// `second_chunk <= n - first_chunk <= avail < cap`,
220+
// so `second_chunk <= cap`. Consumer owns this range.
221+
crate::copy_ring::raw::read_batch_copy::<T, S>(
222+
&self.storage,
223+
0,
224+
&mut dst[first_chunk..n],
225+
);
180226
}
181227

182-
self.tail.store(idx, Ordering::Release);
228+
self.tail.store(I::wrap(tail + n, cap), Ordering::Release);
183229
n
184230
}
185231

@@ -212,6 +258,7 @@ where
212258
}
213259

214260
#[cfg(test)]
261+
#[expect(clippy::cast_sign_loss, reason = "test-only usize→u64 conversions")]
215262
mod tests {
216263
extern crate std;
217264
use std::vec;
@@ -361,6 +408,46 @@ mod tests {
361408
assert_eq!(out, vec![100, 101, 102, 103, 104]);
362409
}
363410

411+
#[test]
412+
fn pop_batch_wraparound_contiguous() {
413+
let engine = new_engine(); // capacity=8, usable=7
414+
// Advance tail to near end
415+
let fill: Vec<u64> = (0..6).collect();
416+
engine.push_batch(&fill);
417+
let mut drain = vec![0u64; 6];
418+
engine.pop_batch(&mut drain);
419+
420+
// Push 5 (wraps around), then batch-pop all 5
421+
let wrap_src: Vec<u64> = (300..305).collect();
422+
engine.push_batch(&wrap_src);
423+
424+
let mut out = vec![0u64; 5];
425+
let popped = engine.pop_batch(&mut out);
426+
assert_eq!(popped, 5);
427+
assert_eq!(out, vec![300, 301, 302, 303, 304]);
428+
}
429+
430+
#[test]
431+
fn push_batch_wraparound_contiguous() {
432+
// Advance head to near end of buffer, then batch-push across wrap
433+
let engine = new_engine(); // capacity=8, usable=7
434+
// Fill 6, drain 6 — head and tail now at index 6
435+
let fill: Vec<u64> = (0..6).collect();
436+
engine.push_batch(&fill);
437+
let mut drain = vec![0u64; 6];
438+
engine.pop_batch(&mut drain);
439+
440+
// Now push 5 elements starting at index 6: wraps at index 8 -> 0
441+
let wrap_src: Vec<u64> = (200..205).collect();
442+
let pushed = engine.push_batch(&wrap_src);
443+
assert_eq!(pushed, 5);
444+
445+
let mut out = vec![0u64; 5];
446+
let popped = engine.pop_batch(&mut out);
447+
assert_eq!(popped, 5);
448+
assert_eq!(out, vec![200, 201, 202, 203, 204]);
449+
}
450+
364451
#[test]
365452
fn batch_fifo_ordering() {
366453
let engine = new_engine();
@@ -391,4 +478,89 @@ mod tests {
391478
engine.pop(&mut out);
392479
assert_eq!(engine.len(), 0);
393480
}
481+
482+
#[test]
483+
fn push_pop_batch_differential() {
484+
// Verify contiguous batch produces same results as sequential push/pop
485+
// for various batch sizes and fill levels.
486+
for fill_first in 0..7 {
487+
for batch_size in 1..=7 {
488+
let engine = new_engine(); // capacity=8, usable=7
489+
490+
// Advance head/tail by fill_first positions
491+
for i in 0..fill_first {
492+
assert!(engine.push(&(i as u64)));
493+
}
494+
let mut drain = vec![0u64; fill_first];
495+
engine.pop_batch(&mut drain);
496+
497+
// Batch push
498+
let src: Vec<u64> = (100_u64..100 + batch_size as u64).collect();
499+
let pushed = engine.push_batch(&src);
500+
501+
// Batch pop
502+
let mut dst = vec![0u64; pushed];
503+
let popped = engine.pop_batch(&mut dst);
504+
505+
assert_eq!(popped, pushed, "fill={fill_first} batch={batch_size}");
506+
assert_eq!(
507+
dst,
508+
src[..pushed].to_vec(),
509+
"FIFO violated: fill={fill_first} batch={batch_size}"
510+
);
511+
}
512+
}
513+
}
514+
}
515+
516+
// proptest uses `getcwd` for failure persistence, which Miri's isolation blocks.
517+
#[cfg(all(test, not(miri)))]
518+
mod proptest_tests {
519+
use super::*;
520+
use crate::storage::InlineStorage;
521+
use mantis_core::{ImmediatePush, NoInstr, Pow2Masked};
522+
use proptest::prelude::*;
523+
524+
extern crate std;
525+
use std::vec;
526+
use std::vec::Vec;
527+
528+
type TestEngine = CopyRingEngine<
529+
u64,
530+
InlineStorage<u64, 8>,
531+
Pow2Masked,
532+
ImmediatePush,
533+
NoInstr,
534+
mantis_platform::DefaultCopyPolicy,
535+
>;
536+
537+
fn new_engine() -> TestEngine {
538+
CopyRingEngine::new(InlineStorage::new(), NoInstr)
539+
}
540+
541+
proptest! {
542+
#[test]
543+
fn batch_fifo_preserved(
544+
fill_level in 0usize..7,
545+
batch_size in 1usize..8,
546+
) {
547+
let engine = new_engine();
548+
549+
// Advance to fill_level
550+
for i in 0..fill_level {
551+
engine.push(&(i as u64));
552+
}
553+
let mut drain = vec![0u64; fill_level];
554+
engine.pop_batch(&mut drain);
555+
556+
let src: Vec<u64> = (0..batch_size as u64).collect();
557+
let pushed = engine.push_batch(&src);
558+
559+
let mut dst = vec![0u64; pushed];
560+
let popped = engine.pop_batch(&mut dst);
561+
562+
prop_assert_eq!(popped, pushed);
563+
prop_assert_eq!(dst, src[..pushed].to_vec());
564+
}
565+
}
394566
}

0 commit comments

Comments
 (0)