|
| 1 | +/* |
| 2 | + * Copyright (c) Meta Platforms, Inc. and affiliates. |
| 3 | + * Copyright (c) Jan-Paul Bultmann |
| 4 | + * |
| 5 | + * This source code is licensed under the MIT license found in the |
| 6 | + * LICENSE file in the root directory of this source tree. |
| 7 | + */ |
| 8 | + |
| 9 | +//! Temporary byte arena backed by a file. |
| 10 | +//! |
| 11 | +//! The arena allows staged writing through [`ByteArena::write`]. Each |
| 12 | +//! call returns a mutable [`Buffer`] bound to the arena so only one |
| 13 | +//! writer can exist at a time. Finalizing the buffer via |
| 14 | +//! [`Buffer::finish`] remaps the written range as immutable and |
| 15 | +//! returns [`Bytes`]. |
| 16 | +
|
| 17 | +use std::io::{self, Seek, SeekFrom}; |
| 18 | +use std::marker::PhantomData; |
| 19 | + |
| 20 | +use memmap2; |
| 21 | +use page_size; |
| 22 | +use tempfile::NamedTempFile; |
| 23 | + |
| 24 | +use crate::Bytes; |
| 25 | + |
| 26 | +#[cfg(feature = "zerocopy")] |
| 27 | +use zerocopy::{FromBytes, Immutable}; |
| 28 | + |
| 29 | +/// Alignment helper. |
| 30 | +fn align_up(val: usize, align: usize) -> usize { |
| 31 | + (val + align - 1) & !(align - 1) |
| 32 | +} |
| 33 | + |
| 34 | +/// Arena managing a temporary file. |
| 35 | +#[derive(Debug)] |
| 36 | +pub struct ByteArena { |
| 37 | + /// Temporary file backing the arena. |
| 38 | + file: NamedTempFile, |
| 39 | + /// Current length of initialized data in bytes. |
| 40 | + len: usize, |
| 41 | +} |
| 42 | + |
| 43 | +impl ByteArena { |
| 44 | + /// Create a new empty arena. |
| 45 | + pub fn new() -> io::Result<Self> { |
| 46 | + let file = NamedTempFile::new()?; |
| 47 | + Ok(Self { file, len: 0 }) |
| 48 | + } |
| 49 | + |
| 50 | + /// Start a new write of `elems` elements of type `T`. |
| 51 | + pub fn write<'a, T>(&'a mut self, elems: usize) -> io::Result<Buffer<'a, T>> |
| 52 | + where |
| 53 | + T: FromBytes + Immutable, |
| 54 | + { |
| 55 | + let page = page_size::get(); |
| 56 | + let align = core::mem::align_of::<T>(); |
| 57 | + let len_bytes = core::mem::size_of::<T>() * elems; |
| 58 | + let start = align_up(self.len, align); |
| 59 | + let end = start + len_bytes; |
| 60 | + self.file.as_file_mut().set_len(end as u64)?; |
| 61 | + // Ensure subsequent mappings see the extended size. |
| 62 | + self.file.as_file_mut().seek(SeekFrom::Start(end as u64))?; |
| 63 | + |
| 64 | + // Map must start on a page boundary; round `start` down while |
| 65 | + // keeping track of how far into the mapping the buffer begins. |
| 66 | + let aligned_offset = start & !(page - 1); |
| 67 | + let offset = start - aligned_offset; |
| 68 | + let map_len = end - aligned_offset; |
| 69 | + |
| 70 | + let mmap = unsafe { |
| 71 | + memmap2::MmapOptions::new() |
| 72 | + .offset(aligned_offset as u64) |
| 73 | + .len(map_len) |
| 74 | + .map_mut(self.file.as_file())? |
| 75 | + }; |
| 76 | + Ok(Buffer { |
| 77 | + arena: self, |
| 78 | + mmap, |
| 79 | + start, |
| 80 | + offset, |
| 81 | + elems, |
| 82 | + _marker: PhantomData, |
| 83 | + }) |
| 84 | + } |
| 85 | + |
| 86 | + fn update_len(&mut self, end: usize) { |
| 87 | + self.len = end; |
| 88 | + } |
| 89 | + |
| 90 | + /// Finalize the arena and return immutable bytes for the entire file. |
| 91 | + pub fn finish(self) -> io::Result<Bytes> { |
| 92 | + let file = self.file.into_file(); |
| 93 | + let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? }; |
| 94 | + Ok(Bytes::from_source(mmap)) |
| 95 | + } |
| 96 | + |
| 97 | + /// Persist the temporary arena file to `path` and return the underlying [`File`]. |
| 98 | + pub fn persist<P: AsRef<std::path::Path>>(self, path: P) -> io::Result<std::fs::File> { |
| 99 | + self.file.persist(path).map_err(Into::into) |
| 100 | + } |
| 101 | +} |
| 102 | + |
| 103 | +/// Mutable buffer for writing into a [`ByteArena`]. |
| 104 | +#[derive(Debug)] |
| 105 | +pub struct Buffer<'a, T> { |
| 106 | + /// Arena that owns the underlying file. |
| 107 | + arena: &'a mut ByteArena, |
| 108 | + /// Writable mapping for the current allocation. |
| 109 | + mmap: memmap2::MmapMut, |
| 110 | + /// Start position of this buffer within the arena file in bytes. |
| 111 | + start: usize, |
| 112 | + /// Offset from the beginning of `mmap` to the start of the buffer. |
| 113 | + offset: usize, |
| 114 | + /// Number of elements in the buffer. |
| 115 | + elems: usize, |
| 116 | + /// Marker to tie the buffer to element type `T`. |
| 117 | + _marker: PhantomData<T>, |
| 118 | +} |
| 119 | + |
| 120 | +impl<'a, T> Buffer<'a, T> |
| 121 | +where |
| 122 | + T: FromBytes + Immutable, |
| 123 | +{ |
| 124 | + /// Access the backing slice. |
| 125 | + pub fn as_mut_slice(&mut self) -> &mut [T] { |
| 126 | + unsafe { |
| 127 | + let ptr = self.mmap.as_mut_ptr().add(self.offset) as *mut T; |
| 128 | + core::slice::from_raw_parts_mut(ptr, self.elems) |
| 129 | + } |
| 130 | + } |
| 131 | + |
| 132 | + /// Finalize the buffer and return immutable [`Bytes`]. |
| 133 | + pub fn finish(self) -> io::Result<Bytes> { |
| 134 | + self.mmap.flush()?; |
| 135 | + let len_bytes = self.elems * core::mem::size_of::<T>(); |
| 136 | + let offset = self.offset; |
| 137 | + let arena = self.arena; |
| 138 | + // Convert the writable mapping into a read-only view instead of |
| 139 | + // unmapping and remapping the region. |
| 140 | + let map = self.mmap.make_read_only()?; |
| 141 | + arena.update_len(self.start + len_bytes); |
| 142 | + Ok(Bytes::from_source(map).slice(offset..offset + len_bytes)) |
| 143 | + } |
| 144 | +} |
| 145 | + |
| 146 | +impl<'a, T> core::ops::Deref for Buffer<'a, T> |
| 147 | +where |
| 148 | + T: FromBytes + Immutable, |
| 149 | +{ |
| 150 | + type Target = [T]; |
| 151 | + |
| 152 | + fn deref(&self) -> &Self::Target { |
| 153 | + unsafe { |
| 154 | + let ptr = self.mmap.as_ptr().add(self.offset) as *const T; |
| 155 | + core::slice::from_raw_parts(ptr, self.elems) |
| 156 | + } |
| 157 | + } |
| 158 | +} |
| 159 | + |
| 160 | +impl<'a, T> core::ops::DerefMut for Buffer<'a, T> |
| 161 | +where |
| 162 | + T: FromBytes + Immutable, |
| 163 | +{ |
| 164 | + fn deref_mut(&mut self) -> &mut [T] { |
| 165 | + unsafe { |
| 166 | + let ptr = self.mmap.as_mut_ptr().add(self.offset) as *mut T; |
| 167 | + core::slice::from_raw_parts_mut(ptr, self.elems) |
| 168 | + } |
| 169 | + } |
| 170 | +} |
| 171 | + |
| 172 | +impl<'a, T> AsRef<[T]> for Buffer<'a, T> |
| 173 | +where |
| 174 | + T: FromBytes + Immutable, |
| 175 | +{ |
| 176 | + fn as_ref(&self) -> &[T] { |
| 177 | + self |
| 178 | + } |
| 179 | +} |
| 180 | + |
| 181 | +impl<'a, T> AsMut<[T]> for Buffer<'a, T> |
| 182 | +where |
| 183 | + T: FromBytes + Immutable, |
| 184 | +{ |
| 185 | + fn as_mut(&mut self) -> &mut [T] { |
| 186 | + self |
| 187 | + } |
| 188 | +} |
0 commit comments