Skip to content

Commit ba4057f

Browse files
apollo_storage: add mmap file helper methods
1 parent 065a6a8 commit ba4057f

File tree

1 file changed

+61
-22
lines changed
  • crates/apollo_storage/src/mmap_file

1 file changed

+61
-22
lines changed

crates/apollo_storage/src/mmap_file/mod.rs

Lines changed: 61 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ impl LocationInFile {
134134
}
135135

136136
/// Represents a memory mapped append only file.
137+
/// This struct is not thread-safe on its own. All methods take `&mut self` and assume
138+
/// exclusive access. This struct should only be accessed through [`FileHandler`], which
139+
/// wraps it in `Arc<Mutex<>>` to provide synchronized access. Direct usage of `MMapFile`
140+
/// without external synchronization will lead to data races.
137141
#[derive(Debug)]
138142
struct MMapFile<V: ValueSerde> {
139143
config: MmapFileConfig,
@@ -146,12 +150,23 @@ struct MMapFile<V: ValueSerde> {
146150
}
147151

148152
impl<V: ValueSerde> MMapFile<V> {
149-
/// Grows the file by the growth step.
150-
fn grow(&mut self) {
153+
/// Grows the file to accommodate at least the target size.
154+
/// Grows in multiples of growth_step for efficiency.
155+
fn grow_to_target(&mut self, target_size: usize) {
156+
if self.size >= target_size {
157+
return;
158+
}
151159
self.flush();
152-
let new_size = self.size + self.config.growth_step;
160+
// Calculate how many growth steps needed, rounding up.
161+
let growth_needed = target_size - self.size;
162+
let growth_steps = growth_needed.div_ceil(self.config.growth_step);
163+
let growth_size = growth_steps * self.config.growth_step;
164+
let new_size = self.size + growth_size;
153165
let new_size_u64 = u64::try_from(new_size).expect("usize should fit in u64");
154-
debug!("Growing file to size: {}", new_size);
166+
debug!(
167+
"Growing file to size: {} (target: {}, growth: {})",
168+
new_size, target_size, growth_size
169+
);
155170
self.file.set_len(new_size_u64).expect("Failed to set the file size");
156171
self.size = new_size;
157172
}
@@ -162,6 +177,40 @@ impl<V: ValueSerde> MMapFile<V> {
162177
self.mmap.flush().expect("Failed to flush the mmap");
163178
self.should_flush = false;
164179
}
180+
181+
/// Writes serialized data to the mmap at the current offset.
182+
/// Returns the location where data was written and updates the offset.
183+
fn write_at_current_offset(&mut self, serialized: &[u8]) -> LocationInFile {
184+
let len = serialized.len();
185+
let offset = self.offset;
186+
187+
trace!("Inserting object at offset: {}", offset);
188+
189+
// Ensure we have enough space before writing.
190+
let final_offset = offset + len;
191+
self.grow_to_target(final_offset);
192+
193+
// Copy data to mmap
194+
let mmap_slice = &mut self.mmap[offset..final_offset];
195+
assert!(
196+
mmap_slice.len() == len,
197+
"Mmap slice length mismatch: expected={}, actual={}",
198+
len,
199+
mmap_slice.len()
200+
);
201+
mmap_slice.copy_from_slice(serialized);
202+
203+
// Start async flush of the written range (non-blocking, improves durability)
204+
self.mmap
205+
.flush_async_range(offset, len)
206+
.expect("Failed to asynchronously flush the mmap after inserting");
207+
208+
// Update offset
209+
self.offset += len;
210+
self.should_flush = true;
211+
212+
LocationInFile { offset, len }
213+
}
165214
}
166215

167216
/// Open a memory mapped file, create it if it doesn't exist.
@@ -214,11 +263,13 @@ impl<V: ValueSerde> FileHandler<V, RW> {
214263
fn grow_file_if_needed(&mut self, offset: usize) {
215264
let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned");
216265
if mmap_file.size < offset + mmap_file.config.max_object_size {
266+
let target_size = offset + mmap_file.config.max_object_size;
217267
debug!(
218-
"Attempting to grow file. File size: {}, offset: {}, max_object_size: {}",
219-
mmap_file.size, offset, mmap_file.config.max_object_size
268+
"Attempting to grow file. File size: {}, offset: {}, max_object_size: {}, target: \
269+
{}",
270+
mmap_file.size, offset, mmap_file.config.max_object_size, target_size
220271
);
221-
mmap_file.grow();
272+
mmap_file.grow_to_target(target_size);
222273
}
223274
}
224275
}
@@ -227,22 +278,10 @@ impl<V: ValueSerde + Debug> Writer<V> for FileHandler<V, RW> {
227278
fn append(&mut self, val: &V::Value) -> LocationInFile {
228279
trace!("Inserting object: {:?}", val);
229280
let serialized = V::serialize(val).expect("Should be able to serialize");
230-
let len = serialized.len();
231-
let offset;
232-
{
281+
let location = {
233282
let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned");
234-
offset = mmap_file.offset;
235-
trace!("Inserting object at offset: {}", offset);
236-
let mmap_slice = &mut mmap_file.mmap[offset..];
237-
mmap_slice[..len].copy_from_slice(&serialized);
238-
mmap_file
239-
.mmap
240-
.flush_async_range(offset, len)
241-
.expect("Failed to asynchronously flush the mmap after inserting");
242-
mmap_file.offset += len;
243-
mmap_file.should_flush = true;
244-
}
245-
let location = LocationInFile { offset, len };
283+
mmap_file.write_at_current_offset(&serialized)
284+
};
246285
self.grow_file_if_needed(location.next_offset());
247286
location
248287
}

0 commit comments

Comments
 (0)