|
1 | 1 | //! DPDK-style bounded MPMC ring queue |
2 | 2 |
|
3 | | -use crate::{error::Error, shmem::MappedRegion, CacheAlignedAtomicSize, VERSION}; |
| 3 | +use crate::{ |
| 4 | + error::Error, normalized_capacity, shmem::MappedRegion, CacheAlignedAtomicSize, VERSION, |
| 5 | +}; |
4 | 6 | use core::{marker::PhantomData, ptr::NonNull, sync::atomic::Ordering}; |
5 | 7 | use std::{ |
6 | 8 | fs::File, |
@@ -313,7 +315,7 @@ unsafe impl<T> Sync for Consumer<T> {} |
313 | 315 | /// page-size requirements. |
314 | 316 | pub const fn minimum_file_size<T: Sized>(capacity: usize) -> usize { |
315 | 317 | let buffer_offset = SharedQueueHeader::buffer_offset::<T>(); |
316 | | - buffer_offset + capacity * core::mem::size_of::<T>() |
| 318 | + buffer_offset + normalized_capacity(capacity) * core::mem::size_of::<T>() |
317 | 319 | } |
318 | 320 |
|
319 | 321 | struct SharedQueue<T> { |
@@ -965,6 +967,17 @@ mod tests { |
965 | 967 | assert_eq!(values, vec![100, 101]); |
966 | 968 | } |
967 | 969 |
|
| 970 | + #[test] |
| 971 | + fn test_minimum_file_size_rounds_up_capacity() { |
| 972 | + let file = create_temp_shmem_file().unwrap(); |
| 973 | + let producer = unsafe { Producer::<u64>::create(&file, minimum_file_size::<u64>(3)) } |
| 974 | + .expect("create failed"); |
| 975 | + let consumer = unsafe { Consumer::<u64>::join(&file) }.expect("join failed"); |
| 976 | + |
| 977 | + assert_eq!(producer.queue.capacity(), 4); |
| 978 | + assert_eq!(consumer.queue.capacity(), 4); |
| 979 | + } |
| 980 | + |
968 | 981 | #[test] |
969 | 982 | fn test_multiple_producers_consumers() { |
970 | 983 | let (file, producer, consumer) = create_test_queue::<Item>(BUFFER_SIZE); |
|
0 commit comments