Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ pub(crate) const VERSION_MAJOR: u16 = 2;
pub(crate) const VERSION_PATCH: u16 = 0;
pub(crate) const VERSION: u32 = (VERSION_MAJOR as u32) << 16 | VERSION_PATCH as u32;

pub(crate) const fn normalized_capacity(capacity: usize) -> usize {
if capacity == 0 {
0
} else {
capacity.next_power_of_two()
}
}

/// `AtomicUsize` with 64-byte alignment for better performance.
#[derive(Default)]
#[repr(C, align(64))]
Expand Down
17 changes: 15 additions & 2 deletions src/mpmc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! DPDK-style bounded MPMC ring queue

use crate::{error::Error, shmem::MappedRegion, CacheAlignedAtomicSize, VERSION};
use crate::{
error::Error, normalized_capacity, shmem::MappedRegion, CacheAlignedAtomicSize, VERSION,
};
use core::{marker::PhantomData, ptr::NonNull, sync::atomic::Ordering};
use std::{
fs::File,
Expand Down Expand Up @@ -313,7 +315,7 @@ unsafe impl<T> Sync for Consumer<T> {}
/// page-size requirements.
pub const fn minimum_file_size<T: Sized>(capacity: usize) -> usize {
let buffer_offset = SharedQueueHeader::buffer_offset::<T>();
buffer_offset + capacity * core::mem::size_of::<T>()
buffer_offset + normalized_capacity(capacity) * core::mem::size_of::<T>()
}

struct SharedQueue<T> {
Expand Down Expand Up @@ -965,6 +967,17 @@ mod tests {
assert_eq!(values, vec![100, 101]);
}

#[test]
fn test_minimum_file_size_rounds_up_capacity() {
let file = create_temp_shmem_file().unwrap();
let producer = unsafe { Producer::<u64>::create(&file, minimum_file_size::<u64>(3)) }
.expect("create failed");
let consumer = unsafe { Consumer::<u64>::join(&file) }.expect("join failed");

assert_eq!(producer.queue.capacity(), 4);
assert_eq!(consumer.queue.capacity(), 4);
}

#[test]
fn test_multiple_producers_consumers() {
let (file, producer, consumer) = create_test_queue::<Item>(BUFFER_SIZE);
Expand Down
15 changes: 13 additions & 2 deletions src/spsc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{error::Error, shmem::MappedRegion, CacheAlignedAtomicSize, VERSION};
use crate::{
error::Error, normalized_capacity, shmem::MappedRegion, CacheAlignedAtomicSize, VERSION,
};
use core::ptr::NonNull;
use std::{
fs::File,
Expand All @@ -15,7 +17,7 @@ const MAGIC: u64 = 0x7368_6171_7370_7363; // b"shaqspsc"
/// page-size requirements.
pub const fn minimum_file_size<T: Sized>(capacity: usize) -> usize {
let buffer_offset = SharedQueueHeader::buffer_offset::<T>();
buffer_offset + capacity * core::mem::size_of::<T>()
buffer_offset + normalized_capacity(capacity) * core::mem::size_of::<T>()
}

/// Producer side of the SPSC shared queue.
Expand Down Expand Up @@ -612,4 +614,13 @@ mod tests {
assert_eq!(*val, 7);
consumer.finalize();
}

#[test]
fn test_minimum_file_size_rounds_up_capacity() {
let file = create_temp_shmem_file().unwrap();
let producer = unsafe { Producer::<u64>::create(&file, minimum_file_size::<u64>(3)) }
.expect("create failed");

assert_eq!(producer.capacity(), 4);
}
}