Skip to content

Commit 49499ea

Browse files
authored
feat: magic in headers; proper atomic sync (#53)
1 parent 96ccab2 commit 49499ea

File tree

3 files changed

+46
-29
lines changed

3 files changed

+46
-29
lines changed

src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::fmt::Display;
22

33
#[derive(Debug)]
44
pub enum Error {
5+
InvalidMagic,
56
InvalidVersion { expected: u32, actual: u32 },
67
InvalidBufferSize,
78
Io(std::io::Error),
@@ -13,6 +14,7 @@ impl std::error::Error for Error {}
1314
impl Display for Error {
1415
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1516
match self {
17+
Self::InvalidMagic => write!(f, "invalid magic"),
1618
Self::InvalidVersion { expected, actual } => write!(
1719
f,
1820
"invalid version; expected={}.{}; found={}.{}",

src/mpmc.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
//! DPDK-style bounded MPMC ring queue
22
33
use crate::{error::Error, shmem::MappedRegion, CacheAlignedAtomicSize, VERSION};
4-
use core::{
5-
marker::PhantomData,
6-
ptr::NonNull,
7-
sync::atomic::{AtomicU32, Ordering},
4+
use core::{marker::PhantomData, ptr::NonNull, sync::atomic::Ordering};
5+
use std::{
6+
fs::File,
7+
sync::{atomic::AtomicU64, Arc},
88
};
9-
use std::{fs::File, sync::Arc};
9+
10+
const MAGIC: u64 = 0x7368_6171_6d70_6d63; // b"shaqmpmc"
1011

1112
pub struct Producer<T> {
1213
queue: SharedQueue<T>,
@@ -443,7 +444,7 @@ impl<T> SharedQueue<T> {
443444
header: NonNull<SharedQueueHeader>,
444445
) -> Result<Self, Error> {
445446
let header_ref = unsafe { header.as_ref() };
446-
let buffer_mask = header_ref.buffer_mask;
447+
let buffer_mask = header_ref.buffer_mask as usize;
447448
let buffer_size_in_items = buffer_mask.wrapping_add(1);
448449
if !buffer_size_in_items.is_power_of_two()
449450
|| buffer_size_in_items == 0
@@ -484,6 +485,11 @@ impl<T> SharedQueue<T> {
484485

485486
#[repr(C)]
486487
struct SharedQueueHeader {
488+
// Cold metadata cacheline.
489+
magic: AtomicU64,
490+
version: u32,
491+
buffer_mask: u32,
492+
487493
/// Producer reservation cursor.
488494
///
489495
/// Producers atomically advance this with CAS to claim slots, but claimed
@@ -506,8 +512,6 @@ struct SharedQueueHeader {
506512
/// Consumers advance this in-order after dropping/reading claimed slots.
507513
/// Producers use it to determine how much free space is available.
508514
consumer_release: CacheAlignedAtomicSize,
509-
buffer_mask: usize,
510-
version: AtomicU32,
511515
}
512516

513517
impl SharedQueueHeader {
@@ -567,8 +571,9 @@ impl SharedQueueHeader {
567571
header.producer_publication.store(0, Ordering::Release);
568572
header.consumer_reservation.store(0, Ordering::Release);
569573
header.consumer_release.store(0, Ordering::Release);
570-
header.buffer_mask = buffer_size_in_items - 1;
571-
header.version.store(VERSION, Ordering::SeqCst);
574+
header.buffer_mask = (buffer_size_in_items - 1) as u32;
575+
header.version = VERSION;
576+
header.magic.store(MAGIC, Ordering::Release);
572577
}
573578

574579
fn join<T: Sized>(file: &File) -> Result<(Arc<MappedRegion>, NonNull<Self>), Error> {
@@ -581,14 +586,16 @@ impl SharedQueueHeader {
581586
// memory is aligned to the page size, which is sufficient for the
582587
// alignment of `SharedQueueHeader`.
583588
let header = unsafe { header.as_ref() };
584-
let actual_version = header.version.load(Ordering::SeqCst);
585-
if actual_version != VERSION {
589+
if header.magic.load(Ordering::Acquire) != MAGIC {
590+
return Err(Error::InvalidMagic);
591+
}
592+
if header.version != VERSION {
586593
return Err(Error::InvalidVersion {
587594
expected: VERSION,
588-
actual: actual_version,
595+
actual: header.version,
589596
});
590597
}
591-
let buffer_size_in_items = header.buffer_mask.wrapping_add(1);
598+
let buffer_size_in_items = (header.buffer_mask as usize).wrapping_add(1);
592599
if buffer_size_in_items != Self::calculate_buffer_size_in_items::<T>(file_size)? {
593600
return Err(Error::InvalidBufferSize);
594601
}

src/spsc.rs

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ use core::ptr::NonNull;
33
use std::{
44
fs::File,
55
sync::{
6-
atomic::{AtomicU32, Ordering},
6+
atomic::{AtomicU64, Ordering},
77
Arc,
88
},
99
};
1010

11+
const MAGIC: u64 = 0x7368_6171_7370_7363; // b"shaqspsc"
12+
1113
/// Calculates the minimum file size required for a queue with given capacity.
1214
/// Note that file size MAY need to be increased beyond this to account for
1315
/// page-size requirements.
@@ -114,11 +116,9 @@ impl<T: Sized> Producer<T> {
114116
/// All reserved positions must be fully initialized before calling `commit`.
115117
/// Pointers should be dropped before calling `commit`.
116118
pub unsafe fn reserve(&mut self) -> Option<NonNull<T>> {
117-
// If write is >= read + buffer_size, the queue is written one iteration
119+
// If write is > read + buffer_mask, the queue is written one iteration
118120
// ahead of the consumer, and we cannot reserve more space.
119-
if self.queue.cached_write.wrapping_sub(self.queue.cached_read)
120-
>= self.queue.header().buffer_size
121-
{
121+
if self.queue.cached_write.wrapping_sub(self.queue.cached_read) > self.queue.buffer_mask {
122122
return None;
123123
}
124124

@@ -294,10 +294,9 @@ impl<T: Sized> SharedQueue<T> {
294294
header: NonNull<SharedQueueHeader>,
295295
) -> Result<Self, Error> {
296296
// SAFETY: `header` is non-null and aligned properly.
297-
let size = unsafe { header.as_ref().buffer_size };
297+
let size = unsafe { (header.as_ref().buffer_mask as usize).wrapping_add(1) };
298298

299299
if !size.is_power_of_two()
300-
|| size == 0
301300
|| SharedQueueHeader::calculate_buffer_size_in_items::<T>(region.file_size())? != size
302301
{
303302
return Err(Error::InvalidBufferSize);
@@ -374,10 +373,14 @@ impl<T: Sized> SharedQueue<T> {
374373
/// Header in shared memory for the queue.
375374
#[repr(C)]
376375
struct SharedQueueHeader {
376+
// Cold cache line.
377+
magic: AtomicU64,
378+
version: u32,
379+
buffer_mask: u32,
380+
381+
// Hot cache lines.
377382
write: CacheAlignedAtomicSize,
378383
read: CacheAlignedAtomicSize,
379-
buffer_size: usize,
380-
version: AtomicU32,
381384
}
382385

383386
impl SharedQueueHeader {
@@ -436,8 +439,9 @@ impl SharedQueueHeader {
436439
let header = unsafe { header.as_mut() };
437440
header.write.store(0, Ordering::Release);
438441
header.read.store(0, Ordering::Release);
439-
header.buffer_size = buffer_size_in_items;
440-
header.version.store(VERSION, Ordering::SeqCst);
442+
header.buffer_mask = (buffer_size_in_items - 1) as u32;
443+
header.version = VERSION;
444+
header.magic.store(MAGIC, Ordering::Release);
441445
}
442446

443447
fn join<T: Sized>(file: &File) -> Result<(Arc<MappedRegion>, NonNull<Self>), Error> {
@@ -450,14 +454,18 @@ impl SharedQueueHeader {
450454
// memory is aligned to the page size, which is sufficient for the
451455
// alignment of `SharedQueueHeader`.
452456
let header = unsafe { header.as_ref() };
453-
let actual_version = header.version.load(Ordering::SeqCst);
454-
if actual_version != VERSION {
457+
if header.magic.load(Ordering::Acquire) != MAGIC {
458+
return Err(Error::InvalidMagic);
459+
}
460+
if header.version != VERSION {
455461
return Err(Error::InvalidVersion {
456462
expected: VERSION,
457-
actual: actual_version,
463+
actual: header.version,
458464
});
459465
}
460-
if header.buffer_size != Self::calculate_buffer_size_in_items::<T>(file_size)? {
466+
if (header.buffer_mask as usize).wrapping_add(1)
467+
!= Self::calculate_buffer_size_in_items::<T>(file_size)?
468+
{
461469
return Err(Error::InvalidBufferSize);
462470
}
463471
}

0 commit comments

Comments
 (0)