diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b219c9..5d82b88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,16 @@ # Changelog ## Unreleased +- added `ByteArena` for staged file writes with `Buffer::finish()` to return `Bytes` +- `ByteArena::write` now accepts a zerocopy type instead of an alignment constant +- `ByteArena` reuses previous pages so allocations align only to the element type +- `Buffer::finish` converts the writable mapping to read-only instead of remapping +- documented all fields in `ByteArena` and `Buffer` +- documented ByteArena usage under advanced usage with proper heading +- added `ByteArena::persist` to rename the temporary file +- removed the old `ByteBuffer` type in favor of `ByteArena` +- added tests covering `ByteArena` writes, typed buffers and persistence +- added test verifying alignment padding between differently aligned writes - split Kani verification into `verify.sh` and streamline `preflight.sh` - clarify that `verify.sh` runs on a dedicated system and document avoiding async code - install `rustfmt` and the Kani verifier automatically via `cargo install` diff --git a/Cargo.toml b/Cargo.toml index 6287150..b150812 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,8 @@ description = "A small library abstracting over bytes owning types in an extensi bytes = { version = "1.10.1", optional = true } ownedbytes = { version = "0.9.0", optional = true } memmap2 = { version = "0.9.5", optional = true } +tempfile = "3.20" +page_size = "0.6" zerocopy = { version = "0.8.26", optional = true, features = ["derive"] } pyo3 = { version = "0.25.1", optional = true } winnow = { version = "0.7.12", optional = true } diff --git a/README.md b/README.md index 2680911..0e62794 100644 --- a/README.md +++ b/README.md @@ -101,6 +101,27 @@ fn read_header(file: &std::fs::File) -> std::io::Result(4).unwrap(); +buffer.copy_from_slice(b"test"); +let bytes = buffer.finish().unwrap(); +assert_eq!(bytes.as_ref(), b"test".as_ref()); +let all = arena.finish().unwrap(); +assert_eq!(all.as_ref(), b"test".as_ref()); +``` + +Call `arena.persist(path)` to keep the temporary file instead of mapping it. + +The arena only aligns allocations to the element type and may share pages +between adjacent buffers to minimize wasted space. + ## Features By default the crate enables the `mmap` and `zerocopy` features. diff --git a/src/arena.rs b/src/arena.rs new file mode 100644 index 0000000..fa53519 --- /dev/null +++ b/src/arena.rs @@ -0,0 +1,188 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * Copyright (c) Jan-Paul Bultmann + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +//! Temporary byte arena backed by a file. +//! +//! The arena allows staged writing through [`ByteArena::write`]. Each +//! call returns a mutable [`Buffer`] bound to the arena so only one +//! writer can exist at a time. Finalizing the buffer via +//! [`Buffer::finish`] remaps the written range as immutable and +//! returns [`Bytes`]. + +use std::io::{self, Seek, SeekFrom}; +use std::marker::PhantomData; + +use memmap2; +use page_size; +use tempfile::NamedTempFile; + +use crate::Bytes; + +#[cfg(feature = "zerocopy")] +use zerocopy::{FromBytes, Immutable}; + +/// Alignment helper. +fn align_up(val: usize, align: usize) -> usize { + (val + align - 1) & !(align - 1) +} + +/// Arena managing a temporary file. +#[derive(Debug)] +pub struct ByteArena { + /// Temporary file backing the arena. + file: NamedTempFile, + /// Current length of initialized data in bytes. + len: usize, +} + +impl ByteArena { + /// Create a new empty arena. + pub fn new() -> io::Result { + let file = NamedTempFile::new()?; + Ok(Self { file, len: 0 }) + } + + /// Start a new write of `elems` elements of type `T`. + pub fn write<'a, T>(&'a mut self, elems: usize) -> io::Result> + where + T: FromBytes + Immutable, + { + let page = page_size::get(); + let align = core::mem::align_of::(); + let len_bytes = core::mem::size_of::() * elems; + let start = align_up(self.len, align); + let end = start + len_bytes; + self.file.as_file_mut().set_len(end as u64)?; + // Ensure subsequent mappings see the extended size. + self.file.as_file_mut().seek(SeekFrom::Start(end as u64))?; + + // Map must start on a page boundary; round `start` down while + // keeping track of how far into the mapping the buffer begins. + let aligned_offset = start & !(page - 1); + let offset = start - aligned_offset; + let map_len = end - aligned_offset; + + let mmap = unsafe { + memmap2::MmapOptions::new() + .offset(aligned_offset as u64) + .len(map_len) + .map_mut(self.file.as_file())? + }; + Ok(Buffer { + arena: self, + mmap, + start, + offset, + elems, + _marker: PhantomData, + }) + } + + fn update_len(&mut self, end: usize) { + self.len = end; + } + + /// Finalize the arena and return immutable bytes for the entire file. + pub fn finish(self) -> io::Result { + let file = self.file.into_file(); + let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? }; + Ok(Bytes::from_source(mmap)) + } + + /// Persist the temporary arena file to `path` and return the underlying [`File`]. + pub fn persist>(self, path: P) -> io::Result { + self.file.persist(path).map_err(Into::into) + } +} + +/// Mutable buffer for writing into a [`ByteArena`]. +#[derive(Debug)] +pub struct Buffer<'a, T> { + /// Arena that owns the underlying file. + arena: &'a mut ByteArena, + /// Writable mapping for the current allocation. + mmap: memmap2::MmapMut, + /// Start position of this buffer within the arena file in bytes. + start: usize, + /// Offset from the beginning of `mmap` to the start of the buffer. + offset: usize, + /// Number of elements in the buffer. + elems: usize, + /// Marker to tie the buffer to element type `T`. + _marker: PhantomData, +} + +impl<'a, T> Buffer<'a, T> +where + T: FromBytes + Immutable, +{ + /// Access the backing slice. + pub fn as_mut_slice(&mut self) -> &mut [T] { + unsafe { + let ptr = self.mmap.as_mut_ptr().add(self.offset) as *mut T; + core::slice::from_raw_parts_mut(ptr, self.elems) + } + } + + /// Finalize the buffer and return immutable [`Bytes`]. + pub fn finish(self) -> io::Result { + self.mmap.flush()?; + let len_bytes = self.elems * core::mem::size_of::(); + let offset = self.offset; + let arena = self.arena; + // Convert the writable mapping into a read-only view instead of + // unmapping and remapping the region. + let map = self.mmap.make_read_only()?; + arena.update_len(self.start + len_bytes); + Ok(Bytes::from_source(map).slice(offset..offset + len_bytes)) + } +} + +impl<'a, T> core::ops::Deref for Buffer<'a, T> +where + T: FromBytes + Immutable, +{ + type Target = [T]; + + fn deref(&self) -> &Self::Target { + unsafe { + let ptr = self.mmap.as_ptr().add(self.offset) as *const T; + core::slice::from_raw_parts(ptr, self.elems) + } + } +} + +impl<'a, T> core::ops::DerefMut for Buffer<'a, T> +where + T: FromBytes + Immutable, +{ + fn deref_mut(&mut self) -> &mut [T] { + unsafe { + let ptr = self.mmap.as_mut_ptr().add(self.offset) as *mut T; + core::slice::from_raw_parts_mut(ptr, self.elems) + } + } +} + +impl<'a, T> AsRef<[T]> for Buffer<'a, T> +where + T: FromBytes + Immutable, +{ + fn as_ref(&self) -> &[T] { + self + } +} + +impl<'a, T> AsMut<[T]> for Buffer<'a, T> +where + T: FromBytes + Immutable, +{ + fn as_mut(&mut self) -> &mut [T] { + self + } +} diff --git a/src/buffer.rs b/src/buffer.rs deleted file mode 100644 index 17ac996..0000000 --- a/src/buffer.rs +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright (c) Meta Platforms, Inc. and affiliates. - * Copyright (c) Jan-Paul Bultmann - * - * This source code is licensed under the MIT license found in the - * LICENSE file in the root directory of this source tree. -*/ - -//! Owned byte buffer with fixed alignment. - -use core::alloc::Layout; -use core::ops::{Deref, DerefMut}; -use core::ptr::{self, NonNull}; - -/// A raw byte buffer with a fixed alignment. -/// -/// `ByteBuffer` owns its allocation and guarantees that the backing -/// memory is aligned to `ALIGN` bytes. -#[derive(Debug)] -pub struct ByteBuffer { - ptr: NonNull, - len: usize, - cap: usize, -} - -unsafe impl Send for ByteBuffer {} -unsafe impl Sync for ByteBuffer {} - -impl ByteBuffer { - const _ASSERT_POWER_OF_TWO: () = assert!(ALIGN.is_power_of_two(), "ALIGN must be power-of-two"); - /// Create an empty buffer. - pub const fn new() -> Self { - Self { - ptr: NonNull::dangling(), - len: 0, - cap: 0, - } - } - - /// Create a buffer with the given capacity. - pub fn with_capacity(cap: usize) -> Self { - if cap == 0 { - return Self::new(); - } - unsafe { - let layout = Layout::from_size_align_unchecked(cap, ALIGN); - let ptr = std::alloc::alloc(layout); - let ptr = NonNull::new(ptr).expect("alloc failed"); - Self { ptr, len: 0, cap } - } - } - - /// Current length of the buffer. - pub fn len(&self) -> usize { - self.len - } - - /// Capacity of the buffer. - pub fn capacity(&self) -> usize { - self.cap - } - - /// Ensure that the buffer can hold at least `total` bytes. - /// - /// Does nothing if the current capacity is already sufficient. - pub fn reserve_total(&mut self, total: usize) { - if total <= self.cap { - return; - } - unsafe { - let old = Layout::from_size_align_unchecked(self.cap.max(1), ALIGN); - let new_layout = Layout::from_size_align_unchecked(total, ALIGN); - let new_ptr = if self.cap == 0 { - std::alloc::alloc(new_layout) - } else { - std::alloc::realloc(self.ptr.as_ptr(), old, total) - }; - let new_ptr = NonNull::new(new_ptr).expect("realloc failed"); - self.ptr = new_ptr; - } - self.cap = total; - } - - #[inline] - fn reserve_more(&mut self, additional: usize) { - let needed = self.len.checked_add(additional).expect("overflow"); - if needed <= self.cap { - return; - } - let new_cap = core::cmp::max(self.cap * 2, needed); - unsafe { - let old = Layout::from_size_align_unchecked(self.cap.max(1), ALIGN); - let new_layout = Layout::from_size_align_unchecked(new_cap, ALIGN); - let new_ptr = if self.cap == 0 { - std::alloc::alloc(new_layout) - } else { - std::alloc::realloc(self.ptr.as_ptr(), old, new_cap) - }; - let new_ptr = NonNull::new(new_ptr).expect("realloc failed"); - self.ptr = new_ptr; - } - self.cap = new_cap; - } - - /// Push data to the end of the buffer. - #[cfg(not(feature = "zerocopy"))] - pub fn push(&mut self, byte: u8) { - self.reserve_more(1); - unsafe { - ptr::write(self.ptr.as_ptr().add(self.len), byte); - } - self.len += 1; - } - - /// Push data to the end of the buffer. - #[cfg(feature = "zerocopy")] - pub fn push(&mut self, value: T) - where - T: zerocopy::IntoBytes + zerocopy::Immutable, - { - let bytes = zerocopy::IntoBytes::as_bytes(&value); - self.reserve_more(bytes.len()); - unsafe { - ptr::copy_nonoverlapping(bytes.as_ptr(), self.ptr.as_ptr().add(self.len), bytes.len()); - } - self.len += bytes.len(); - } - - /// Returns a raw pointer to the buffer's memory. - pub fn as_ptr(&self) -> *const u8 { - self.ptr.as_ptr() - } -} - -impl Drop for ByteBuffer { - fn drop(&mut self) { - if self.cap != 0 { - unsafe { - let layout = Layout::from_size_align_unchecked(self.cap, ALIGN); - std::alloc::dealloc(self.ptr.as_ptr(), layout); - } - } - } -} - -impl Deref for ByteBuffer { - type Target = [u8]; - fn deref(&self) -> &[u8] { - unsafe { core::slice::from_raw_parts(self.ptr.as_ptr(), self.len) } - } -} - -impl DerefMut for ByteBuffer { - fn deref_mut(&mut self) -> &mut [u8] { - unsafe { core::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) } - } -} - -impl AsRef<[u8]> for ByteBuffer { - fn as_ref(&self) -> &[u8] { - self - } -} - -impl AsMut<[u8]> for ByteBuffer { - fn as_mut(&mut self) -> &mut [u8] { - self - } -} diff --git a/src/lib.rs b/src/lib.rs index 2acd048..e0858fe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,8 @@ #![doc = include_str!("../README.md")] #![warn(missing_docs)] -pub mod buffer; +#[cfg(all(feature = "mmap", feature = "zerocopy"))] +pub mod arena; /// Core byte container types and traits. pub mod bytes; mod sources; @@ -29,7 +30,8 @@ pub mod winnow; #[cfg(test)] mod tests; -pub use crate::buffer::ByteBuffer; +#[cfg(all(feature = "mmap", feature = "zerocopy"))] +pub use crate::arena::{Buffer, ByteArena}; pub use crate::bytes::ByteOwner; pub use crate::bytes::ByteSource; pub use crate::bytes::Bytes; diff --git a/src/sources.rs b/src/sources.rs index eb1c04d..68a15f2 100644 --- a/src/sources.rs +++ b/src/sources.rs @@ -42,7 +42,7 @@ use zerocopy::Immutable; use zerocopy::IntoBytes; #[allow(unused_imports)] -use crate::{buffer::ByteBuffer, bytes::ByteOwner, ByteSource}; +use crate::{bytes::ByteOwner, ByteSource}; #[cfg(feature = "zerocopy")] unsafe impl ByteSource for &'static [T] @@ -133,18 +133,6 @@ unsafe impl ByteSource for Vec { } } -unsafe impl ByteSource for ByteBuffer { - type Owner = Self; - - fn as_bytes(&self) -> &[u8] { - self.as_ref() - } - - fn get_owner(self) -> Self::Owner { - self - } -} - unsafe impl ByteSource for String { type Owner = Self; diff --git a/src/tests.rs b/src/tests.rs index 2495784..3d77959 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -303,39 +303,116 @@ fn test_cow_zerocopy_borrowed_source() { ); } +#[cfg(all(feature = "mmap", feature = "zerocopy"))] #[test] -fn test_bytebuffer_push_and_bytes() { - use crate::ByteBuffer; +fn test_arena_single_write() { + use crate::arena::ByteArena; - let mut buf: ByteBuffer<8> = ByteBuffer::with_capacity(2); - buf.push(1u8); - buf.push(2u8); - buf.push(3u8); - assert_eq!(buf.as_ref(), &[1, 2, 3]); + let mut arena = ByteArena::new().expect("arena"); + let mut buffer = arena.write::(4).expect("write"); + buffer.as_mut_slice().copy_from_slice(b"test"); + let bytes = buffer.finish().expect("finish buffer"); + assert_eq!(bytes.as_ref(), b"test"); - let bytes: Bytes = buf.into(); - assert_eq!(bytes.as_ref(), &[1, 2, 3]); + let all = arena.finish().expect("finish arena"); + assert_eq!(all.as_ref(), b"test"); } +#[cfg(all(feature = "mmap", feature = "zerocopy"))] #[test] -fn test_bytebuffer_alignment() { - use crate::ByteBuffer; +fn test_arena_multiple_writes() { + use crate::arena::ByteArena; + + let mut arena = ByteArena::new().expect("arena"); + + let mut a = arena.write::(5).expect("write"); + a.as_mut_slice().copy_from_slice(b"first"); + let bytes_a = a.finish().expect("finish"); + assert_eq!(bytes_a.as_ref(), b"first"); - let mut buf: ByteBuffer<64> = ByteBuffer::with_capacity(1); - buf.push(1u8); - assert_eq!((buf.as_ptr() as usize) % 64, 0); + let mut b = arena.write::(6).expect("write"); + b.as_mut_slice().copy_from_slice(b"second"); + let bytes_b = b.finish().expect("finish"); + assert_eq!(bytes_b.as_ref(), b"second"); + + let all = arena.finish().expect("finish arena"); + assert_eq!(all.as_ref(), b"firstsecond"); } +#[cfg(all(feature = "mmap", feature = "zerocopy"))] #[test] -fn test_bytebuffer_reserve_total() { - use crate::ByteBuffer; - - let mut buf: ByteBuffer<8> = ByteBuffer::new(); - buf.reserve_total(10); - assert!(buf.capacity() >= 10); - for _ in 0..10 { - buf.push(1u8); +fn test_arena_typed() { + use crate::arena::ByteArena; + + #[derive(zerocopy::FromBytes, zerocopy::Immutable, Clone, Copy)] + #[repr(C)] + struct Pair { + a: u16, + b: u32, } - assert_eq!(buf.len(), 10); - assert!(buf.capacity() >= 10); + + let mut arena = ByteArena::new().expect("arena"); + let mut buffer = arena.write::(2).expect("write"); + buffer.as_mut_slice()[0] = Pair { a: 1, b: 2 }; + buffer.as_mut_slice()[1] = Pair { a: 3, b: 4 }; + let bytes = buffer.finish().expect("finish"); + + let expected = unsafe { + core::slice::from_raw_parts( + [Pair { a: 1, b: 2 }, Pair { a: 3, b: 4 }].as_ptr() as *const u8, + 2 * core::mem::size_of::(), + ) + }; + assert_eq!(bytes.as_ref(), expected); +} + +#[cfg(all(feature = "mmap", feature = "zerocopy"))] +#[test] +fn test_arena_persist() { + use crate::arena::ByteArena; + use std::fs; + + let dir = tempfile::tempdir().expect("dir"); + let path = dir.path().join("persist.bin"); + + let mut arena = ByteArena::new().expect("arena"); + let mut buffer = arena.write::(7).expect("write"); + buffer.as_mut_slice().copy_from_slice(b"persist"); + buffer.finish().expect("finish buffer"); + + let _file = arena.persist(&path).expect("persist file"); + let data = fs::read(&path).expect("read"); + assert_eq!(data.as_slice(), b"persist"); +} + +#[cfg(all(feature = "mmap", feature = "zerocopy"))] +#[test] +fn test_arena_alignment_padding() { + use crate::arena::ByteArena; + + let mut arena = ByteArena::new().expect("arena"); + + let mut a = arena.write::(1).expect("write"); + a.as_mut_slice()[0] = 1; + let bytes_a = a.finish().expect("finish a"); + assert_eq!(bytes_a.as_ref(), &[1]); + + let mut b = arena.write::(1).expect("write"); + b.as_mut_slice()[0] = 0x01020304; + let bytes_b = b.finish().expect("finish b"); + assert_eq!(bytes_b.as_ref(), &0x01020304u32.to_ne_bytes()); + + let mut c = arena.write::(1).expect("write"); + c.as_mut_slice()[0] = 0x0506; + let bytes_c = c.finish().expect("finish c"); + assert_eq!(bytes_c.as_ref(), &0x0506u16.to_ne_bytes()); + + let all = arena.finish().expect("finish arena"); + + let mut expected = Vec::new(); + expected.extend_from_slice(&[1]); + expected.extend_from_slice(&[0; 3]); + expected.extend_from_slice(&0x01020304u32.to_ne_bytes()); + expected.extend_from_slice(&0x0506u16.to_ne_bytes()); + assert_eq!(all.as_ref(), expected.as_slice()); }