Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions crates/apollo_storage/src/mmap_file/mmap_file_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,33 +150,33 @@ fn grow_file() {
let (mut writer, _) =
open_file::<NoVersionValueWrapper<Vec<u8>>>(config.clone(), file_path.clone(), offset)
.unwrap();
// file_size = 4 (growth_step), offset = 0
// file_size = 4 (1 * growth_step) after pre-growth, offset = 0.
let mut file_size = usize::try_from(file.metadata().unwrap().len()).unwrap();
assert_eq!(file_size, config.growth_step);
assert_eq!(offset, 0);

offset += writer.append(&data).len;
// file_size = 8 (2 * growth_step), offset = 3 (serialization_size)
// file_size = 4 (1 * growth_step), offset = 3. No additional growth needed.
file_size = file.metadata().unwrap().len().try_into().unwrap();
assert_eq!(file_size, 2 * config.growth_step);
assert_eq!(file_size, config.growth_step);
assert_eq!(offset, serialization_size);

offset += writer.append(&data).len;
// file_size = 12 (3 * growth_step), offset = 6 (2 * serialization_size)
// file_size = 8 (2 * growth_step), offset = 6.
file_size = file.metadata().unwrap().len().try_into().unwrap();
assert_eq!(file_size, 3 * config.growth_step);
assert_eq!(file_size, 2 * config.growth_step);
assert_eq!(offset, 2 * serialization_size);

offset += writer.append(&data).len;
// file_size = 12 (3 * growth_step), offset = 9 (3 * serialization_size)
// file_size = 12 (3 * growth_step), offset = 9.
file_size = file.metadata().unwrap().len().try_into().unwrap();
assert_eq!(file_size, 3 * config.growth_step);
assert_eq!(offset, 3 * serialization_size);

offset += writer.append(&data).len;
// file_size = 16 (4 * growth_step), offset = 12 (4 * serialization_size)
// file_size = 12 (3 * growth_step), offset = 12.
file_size = file.metadata().unwrap().len().try_into().unwrap();
assert_eq!(file_size, 4 * config.growth_step);
assert_eq!(file_size, 3 * config.growth_step);
assert_eq!(offset, 4 * serialization_size);
}

Expand All @@ -187,9 +187,9 @@ fn grow_file() {
.truncate(false)
.open(file_path.clone())
.unwrap();
assert_eq!(usize::try_from(file.metadata().unwrap().len()).unwrap(), 4 * config.growth_step);
assert_eq!(usize::try_from(file.metadata().unwrap().len()).unwrap(), 3 * config.growth_step);
let _ = open_file::<NoVersionValueWrapper<Vec<u8>>>(config.clone(), file_path, offset).unwrap();
assert_eq!(usize::try_from(file.metadata().unwrap().len()).unwrap(), 4 * config.growth_step);
assert_eq!(usize::try_from(file.metadata().unwrap().len()).unwrap(), 3 * config.growth_step);

dir.close().unwrap();
}
Expand Down
120 changes: 73 additions & 47 deletions crates/apollo_storage/src/mmap_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ impl LocationInFile {
}

/// Represents a memory mapped append only file.
/// This struct is not thread-safe on its own. All methods take `&mut self` and assume
/// exclusive access. This struct should only be accessed through [`FileHandler`], which
/// wraps it in `Arc<Mutex<>>` to provide synchronized access. Direct usage of `MMapFile`
/// without external synchronization will lead to data races.
#[derive(Debug)]
struct MMapFile<V: ValueSerde> {
config: MmapFileConfig,
Expand All @@ -146,12 +150,45 @@ struct MMapFile<V: ValueSerde> {
}

impl<V: ValueSerde> MMapFile<V> {
/// Grows the file by the growth step.
fn grow(&mut self) {
/// Creates a new MMapFile and pre-allocates initial space to avoid growing on first write.
fn new(
config: MmapFileConfig,
file: File,
mmap: MmapMut,
offset: usize,
) -> MmapFileResult<Self> {
let size = file.metadata()?.len();
let mut mmap_file = Self {
config,
file,
mmap,
size: size.try_into().expect("size should fit in usize"),
offset,
should_flush: false,
_value_type: PhantomData {},
};
// Pre-allocate space to avoid growing on first write
mmap_file.grow_to_target(mmap_file.config.max_object_size);
Ok(mmap_file)
}

/// Grows the file to accommodate at least the target size.
/// Grows in multiples of growth_step for efficiency.
fn grow_to_target(&mut self, target_size: usize) {
if self.size >= target_size {
return;
}
self.flush();
let new_size = self.size + self.config.growth_step;
// Calculate how many growth steps needed, rounding up.
let growth_needed = target_size - self.size;
let growth_steps = growth_needed.div_ceil(self.config.growth_step);
let growth_size = growth_steps * self.config.growth_step;
let new_size = self.size + growth_size;
let new_size_u64 = u64::try_from(new_size).expect("usize should fit in u64");
debug!("Growing file to size: {}", new_size);
debug!(
"Growing file to size: {} (target: {}, growth: {})",
new_size, target_size, growth_size
);
self.file.set_len(new_size_u64).expect("Failed to set the file size");
self.size = new_size;
}
Expand All @@ -162,6 +199,34 @@ impl<V: ValueSerde> MMapFile<V> {
self.mmap.flush().expect("Failed to flush the mmap");
self.should_flush = false;
}

/// Writes serialized data to the mmap at the current offset.
/// Returns the location where data was written and updates the offset.
fn write_at_current_offset(&mut self, serialized: &[u8]) -> LocationInFile {
let len = serialized.len();
let offset = self.offset;

trace!("Inserting object at offset: {}", offset);

// Ensure we have enough space before writing.
let final_offset = offset + len;
self.grow_to_target(final_offset);

// Copy data to mmap
let mmap_slice = &mut self.mmap[offset..final_offset];
mmap_slice.copy_from_slice(serialized);

// Start async flush of the written range (non-blocking, improves durability)
self.mmap
.flush_async_range(offset, len)
.expect("Failed to asynchronously flush the mmap after inserting");

// Update offset
self.offset += len;
self.should_flush = true;

LocationInFile { offset, len }
}
}

/// Open a memory mapped file, create it if it doesn't exist.
Expand All @@ -172,26 +237,16 @@ pub(crate) fn open_file<V: ValueSerde>(
offset: usize,
) -> MmapFileResult<(FileHandler<V, RW>, FileHandler<V, RO>)> {
let file = OpenOptions::new().read(true).write(true).create(true).truncate(false).open(path)?;
let size = file.metadata()?.len();
let mmap = unsafe { MmapOptions::new().len(config.max_size).map_mut(&file)? };
let mmap_ptr = mmap.as_ptr();
let mmap_file = MMapFile {
config,
file,
mmap,
size: size.try_into().expect("size should fit in usize"),
offset,
should_flush: false,
_value_type: PhantomData {},
};
let mmap_file = MMapFile::new(config, file, mmap, offset)?;
let shared_mmap_file = Arc::new(Mutex::new(mmap_file));

let mut write_file_handler: FileHandler<V, RW> = FileHandler {
let write_file_handler: FileHandler<V, RW> = FileHandler {
memory_ptr: mmap_ptr,
mmap_file: shared_mmap_file.clone(),
_mode: PhantomData,
};
write_file_handler.grow_file_if_needed(0);

let read_file_handler: FileHandler<V, RO> =
FileHandler { memory_ptr: mmap_ptr, mmap_file: shared_mmap_file, _mode: PhantomData };
Expand All @@ -210,41 +265,12 @@ pub(crate) struct FileHandler<V: ValueSerde, Mode: TransactionKind> {
unsafe impl<V: ValueSerde, Mode: TransactionKind> Send for FileHandler<V, Mode> {}
unsafe impl<V: ValueSerde, Mode: TransactionKind> Sync for FileHandler<V, Mode> {}

impl<V: ValueSerde> FileHandler<V, RW> {
fn grow_file_if_needed(&mut self, offset: usize) {
let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned");
if mmap_file.size < offset + mmap_file.config.max_object_size {
debug!(
"Attempting to grow file. File size: {}, offset: {}, max_object_size: {}",
mmap_file.size, offset, mmap_file.config.max_object_size
);
mmap_file.grow();
}
}
}

impl<V: ValueSerde + Debug> Writer<V> for FileHandler<V, RW> {
fn append(&mut self, val: &V::Value) -> LocationInFile {
trace!("Inserting object: {:?}", val);
let serialized = V::serialize(val).expect("Should be able to serialize");
let len = serialized.len();
let offset;
{
let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned");
offset = mmap_file.offset;
trace!("Inserting object at offset: {}", offset);
let mmap_slice = &mut mmap_file.mmap[offset..];
mmap_slice[..len].copy_from_slice(&serialized);
mmap_file
.mmap
.flush_async_range(offset, len)
.expect("Failed to asynchronously flush the mmap after inserting");
mmap_file.offset += len;
mmap_file.should_flush = true;
}
let location = LocationInFile { offset, len };
self.grow_file_if_needed(location.next_offset());
location
let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned");
mmap_file.write_at_current_offset(&serialized)
}

fn flush(&self) {
Expand Down
Loading