Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
# Changelog

## Unreleased
- added `ByteArena` for staged file writes with `Buffer::finish()` to return `Bytes`
- `ByteArena::write` now accepts a zerocopy type instead of an alignment constant
- `ByteArena` reuses previous pages so allocations align only to the element type
- `Buffer::finish` converts the writable mapping to read-only instead of remapping
- documented all fields in `ByteArena` and `Buffer`
- documented ByteArena usage under advanced usage with proper heading
- added `ByteArena::persist` to rename the temporary file
- removed the old `ByteBuffer` type in favor of `ByteArena`
- added tests covering `ByteArena` writes, typed buffers and persistence
- added test verifying alignment padding between differently aligned writes
- split Kani verification into `verify.sh` and streamline `preflight.sh`
- clarify that `verify.sh` runs on a dedicated system and document avoiding async code
- install `rustfmt` and the Kani verifier automatically via `cargo install`
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ description = "A small library abstracting over bytes owning types in an extensi
bytes = { version = "1.10.1", optional = true }
ownedbytes = { version = "0.9.0", optional = true }
memmap2 = { version = "0.9.5", optional = true }
tempfile = "3.20"
page_size = "0.6"
zerocopy = { version = "0.8.26", optional = true, features = ["derive"] }
pyo3 = { version = "0.25.1", optional = true }
winnow = { version = "0.7.12", optional = true }
Expand Down
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,27 @@ fn read_header(file: &std::fs::File) -> std::io::Result<anybytes::view::View<Hea
To map only a portion of a file use the unsafe helper
`Bytes::map_file_region(file, offset, len)`.

### Byte Arena

Use `ByteArena` to incrementally build immutable bytes on disk:

```rust
use anybytes::arena::ByteArena;

let mut arena = ByteArena::new().unwrap();
let mut buffer = arena.write::<u8>(4).unwrap();
buffer.copy_from_slice(b"test");
let bytes = buffer.finish().unwrap();
assert_eq!(bytes.as_ref(), b"test".as_ref());
let all = arena.finish().unwrap();
assert_eq!(all.as_ref(), b"test".as_ref());
```

Call `arena.persist(path)` to keep the temporary file instead of mapping it.

The arena only aligns allocations to the element type and may share pages
between adjacent buffers to minimize wasted space.

## Features

By default the crate enables the `mmap` and `zerocopy` features.
Expand Down
188 changes: 188 additions & 0 deletions src/arena.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* Copyright (c) Jan-Paul Bultmann
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

//! Temporary byte arena backed by a file.
//!
//! The arena allows staged writing through [`ByteArena::write`]. Each
//! call returns a mutable [`Buffer`] bound to the arena so only one
//! writer can exist at a time. Finalizing the buffer via
//! [`Buffer::finish`] remaps the written range as immutable and
//! returns [`Bytes`].

use std::io::{self, Seek, SeekFrom};
use std::marker::PhantomData;

use memmap2;
use page_size;
use tempfile::NamedTempFile;

use crate::Bytes;

#[cfg(feature = "zerocopy")]
use zerocopy::{FromBytes, Immutable};

/// Alignment helper.
fn align_up(val: usize, align: usize) -> usize {
(val + align - 1) & !(align - 1)
}

/// Arena managing a temporary file.
#[derive(Debug)]
pub struct ByteArena {
/// Temporary file backing the arena.
file: NamedTempFile,
/// Current length of initialized data in bytes.
len: usize,
}

impl ByteArena {
/// Create a new empty arena.
pub fn new() -> io::Result<Self> {
let file = NamedTempFile::new()?;
Ok(Self { file, len: 0 })
}

/// Start a new write of `elems` elements of type `T`.
pub fn write<'a, T>(&'a mut self, elems: usize) -> io::Result<Buffer<'a, T>>
where
T: FromBytes + Immutable,
{
let page = page_size::get();
let align = core::mem::align_of::<T>();
let len_bytes = core::mem::size_of::<T>() * elems;
let start = align_up(self.len, align);
let end = start + len_bytes;
self.file.as_file_mut().set_len(end as u64)?;
// Ensure subsequent mappings see the extended size.
self.file.as_file_mut().seek(SeekFrom::Start(end as u64))?;

// Map must start on a page boundary; round `start` down while
// keeping track of how far into the mapping the buffer begins.
let aligned_offset = start & !(page - 1);
let offset = start - aligned_offset;
let map_len = end - aligned_offset;

let mmap = unsafe {
memmap2::MmapOptions::new()
.offset(aligned_offset as u64)
.len(map_len)
.map_mut(self.file.as_file())?
};
Ok(Buffer {
arena: self,
mmap,
start,
offset,
elems,
_marker: PhantomData,
})
}

fn update_len(&mut self, end: usize) {
self.len = end;
}

/// Finalize the arena and return immutable bytes for the entire file.
pub fn finish(self) -> io::Result<Bytes> {
let file = self.file.into_file();
let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? };
Ok(Bytes::from_source(mmap))
}

/// Persist the temporary arena file to `path` and return the underlying [`File`].
pub fn persist<P: AsRef<std::path::Path>>(self, path: P) -> io::Result<std::fs::File> {
self.file.persist(path).map_err(Into::into)
}
}

/// Mutable buffer for writing into a [`ByteArena`].
#[derive(Debug)]
pub struct Buffer<'a, T> {
/// Arena that owns the underlying file.
arena: &'a mut ByteArena,
/// Writable mapping for the current allocation.
mmap: memmap2::MmapMut,
/// Start position of this buffer within the arena file in bytes.
start: usize,
/// Offset from the beginning of `mmap` to the start of the buffer.
offset: usize,
/// Number of elements in the buffer.
elems: usize,
/// Marker to tie the buffer to element type `T`.
_marker: PhantomData<T>,
}

impl<'a, T> Buffer<'a, T>
where
T: FromBytes + Immutable,
{
/// Access the backing slice.
pub fn as_mut_slice(&mut self) -> &mut [T] {
unsafe {
let ptr = self.mmap.as_mut_ptr().add(self.offset) as *mut T;
core::slice::from_raw_parts_mut(ptr, self.elems)
}
}

/// Finalize the buffer and return immutable [`Bytes`].
pub fn finish(self) -> io::Result<Bytes> {
self.mmap.flush()?;
let len_bytes = self.elems * core::mem::size_of::<T>();
let offset = self.offset;
let arena = self.arena;
// Convert the writable mapping into a read-only view instead of
// unmapping and remapping the region.
let map = self.mmap.make_read_only()?;
arena.update_len(self.start + len_bytes);
Ok(Bytes::from_source(map).slice(offset..offset + len_bytes))
}
}

impl<'a, T> core::ops::Deref for Buffer<'a, T>
where
T: FromBytes + Immutable,
{
type Target = [T];

fn deref(&self) -> &Self::Target {
unsafe {
let ptr = self.mmap.as_ptr().add(self.offset) as *const T;
core::slice::from_raw_parts(ptr, self.elems)
}
}
}

impl<'a, T> core::ops::DerefMut for Buffer<'a, T>
where
T: FromBytes + Immutable,
{
fn deref_mut(&mut self) -> &mut [T] {
unsafe {
let ptr = self.mmap.as_mut_ptr().add(self.offset) as *mut T;
core::slice::from_raw_parts_mut(ptr, self.elems)
}
}
}

impl<'a, T> AsRef<[T]> for Buffer<'a, T>
where
T: FromBytes + Immutable,
{
fn as_ref(&self) -> &[T] {
self
}
}

impl<'a, T> AsMut<[T]> for Buffer<'a, T>
where
T: FromBytes + Immutable,
{
fn as_mut(&mut self) -> &mut [T] {
self
}
}
Loading