Skip to content

Commit c4d0bfc

Browse files
apollo_storage: add mmap file helper methods
1 parent 065a6a8 commit c4d0bfc

File tree

2 files changed

+83
-57
lines changed

2 files changed

+83
-57
lines changed

crates/apollo_storage/src/mmap_file/mmap_file_test.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -150,33 +150,33 @@ fn grow_file() {
150150
let (mut writer, _) =
151151
open_file::<NoVersionValueWrapper<Vec<u8>>>(config.clone(), file_path.clone(), offset)
152152
.unwrap();
153-
// file_size = 4 (growth_step), offset = 0
153+
// file_size = 4 (1 * growth_step) after pre-growth, offset = 0.
154154
let mut file_size = usize::try_from(file.metadata().unwrap().len()).unwrap();
155155
assert_eq!(file_size, config.growth_step);
156156
assert_eq!(offset, 0);
157157

158158
offset += writer.append(&data).len;
159-
// file_size = 8 (2 * growth_step), offset = 3 (serialization_size)
159+
// file_size = 4 (1 * growth_step), offset = 3. No additional growth needed.
160160
file_size = file.metadata().unwrap().len().try_into().unwrap();
161-
assert_eq!(file_size, 2 * config.growth_step);
161+
assert_eq!(file_size, config.growth_step);
162162
assert_eq!(offset, serialization_size);
163163

164164
offset += writer.append(&data).len;
165-
// file_size = 12 (3 * growth_step), offset = 6 (2 * serialization_size)
165+
// file_size = 8 (2 * growth_step), offset = 6.
166166
file_size = file.metadata().unwrap().len().try_into().unwrap();
167-
assert_eq!(file_size, 3 * config.growth_step);
167+
assert_eq!(file_size, 2 * config.growth_step);
168168
assert_eq!(offset, 2 * serialization_size);
169169

170170
offset += writer.append(&data).len;
171-
// file_size = 12 (3 * growth_step), offset = 9 (3 * serialization_size)
171+
// file_size = 12 (3 * growth_step), offset = 9.
172172
file_size = file.metadata().unwrap().len().try_into().unwrap();
173173
assert_eq!(file_size, 3 * config.growth_step);
174174
assert_eq!(offset, 3 * serialization_size);
175175

176176
offset += writer.append(&data).len;
177-
// file_size = 16 (4 * growth_step), offset = 12 (4 * serialization_size)
177+
// file_size = 12 (3 * growth_step), offset = 12.
178178
file_size = file.metadata().unwrap().len().try_into().unwrap();
179-
assert_eq!(file_size, 4 * config.growth_step);
179+
assert_eq!(file_size, 3 * config.growth_step);
180180
assert_eq!(offset, 4 * serialization_size);
181181
}
182182

@@ -187,9 +187,9 @@ fn grow_file() {
187187
.truncate(false)
188188
.open(file_path.clone())
189189
.unwrap();
190-
assert_eq!(usize::try_from(file.metadata().unwrap().len()).unwrap(), 4 * config.growth_step);
190+
assert_eq!(usize::try_from(file.metadata().unwrap().len()).unwrap(), 3 * config.growth_step);
191191
let _ = open_file::<NoVersionValueWrapper<Vec<u8>>>(config.clone(), file_path, offset).unwrap();
192-
assert_eq!(usize::try_from(file.metadata().unwrap().len()).unwrap(), 4 * config.growth_step);
192+
assert_eq!(usize::try_from(file.metadata().unwrap().len()).unwrap(), 3 * config.growth_step);
193193

194194
dir.close().unwrap();
195195
}

crates/apollo_storage/src/mmap_file/mod.rs

Lines changed: 73 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ impl LocationInFile {
134134
}
135135

136136
/// Represents a memory mapped append only file.
137+
/// This struct is not thread-safe on its own. All methods take `&mut self` and assume
138+
/// exclusive access. This struct should only be accessed through [`FileHandler`], which
139+
/// wraps it in `Arc<Mutex<>>` to provide synchronized access. Direct usage of `MMapFile`
140+
/// without external synchronization will lead to data races.
137141
#[derive(Debug)]
138142
struct MMapFile<V: ValueSerde> {
139143
config: MmapFileConfig,
@@ -146,12 +150,45 @@ struct MMapFile<V: ValueSerde> {
146150
}
147151

148152
impl<V: ValueSerde> MMapFile<V> {
149-
/// Grows the file by the growth step.
150-
fn grow(&mut self) {
153+
/// Creates a new MMapFile and pre-allocates initial space to avoid growing on first write.
154+
fn new(
155+
config: MmapFileConfig,
156+
file: File,
157+
mmap: MmapMut,
158+
offset: usize,
159+
) -> MmapFileResult<Self> {
160+
let size = file.metadata()?.len();
161+
let mut mmap_file = Self {
162+
config,
163+
file,
164+
mmap,
165+
size: size.try_into().expect("size should fit in usize"),
166+
offset,
167+
should_flush: false,
168+
_value_type: PhantomData {},
169+
};
170+
// Pre-allocate space to avoid growing on first write
171+
mmap_file.grow_to_target(mmap_file.config.max_object_size);
172+
Ok(mmap_file)
173+
}
174+
175+
/// Grows the file to accommodate at least the target size.
176+
/// Grows in multiples of growth_step for efficiency.
177+
fn grow_to_target(&mut self, target_size: usize) {
178+
if self.size >= target_size {
179+
return;
180+
}
151181
self.flush();
152-
let new_size = self.size + self.config.growth_step;
182+
// Calculate how many growth steps needed, rounding up.
183+
let growth_needed = target_size - self.size;
184+
let growth_steps = growth_needed.div_ceil(self.config.growth_step);
185+
let growth_size = growth_steps * self.config.growth_step;
186+
let new_size = self.size + growth_size;
153187
let new_size_u64 = u64::try_from(new_size).expect("usize should fit in u64");
154-
debug!("Growing file to size: {}", new_size);
188+
debug!(
189+
"Growing file to size: {} (target: {}, growth: {})",
190+
new_size, target_size, growth_size
191+
);
155192
self.file.set_len(new_size_u64).expect("Failed to set the file size");
156193
self.size = new_size;
157194
}
@@ -162,6 +199,34 @@ impl<V: ValueSerde> MMapFile<V> {
162199
self.mmap.flush().expect("Failed to flush the mmap");
163200
self.should_flush = false;
164201
}
202+
203+
/// Writes serialized data to the mmap at the current offset.
204+
/// Returns the location where data was written and updates the offset.
205+
fn write_at_current_offset(&mut self, serialized: &[u8]) -> LocationInFile {
206+
let len = serialized.len();
207+
let offset = self.offset;
208+
209+
trace!("Inserting object at offset: {}", offset);
210+
211+
// Ensure we have enough space before writing.
212+
let final_offset = offset + len;
213+
self.grow_to_target(final_offset);
214+
215+
// Copy data to mmap
216+
let mmap_slice = &mut self.mmap[offset..final_offset];
217+
mmap_slice.copy_from_slice(serialized);
218+
219+
// Start async flush of the written range (non-blocking, improves durability)
220+
self.mmap
221+
.flush_async_range(offset, len)
222+
.expect("Failed to asynchronously flush the mmap after inserting");
223+
224+
// Update offset
225+
self.offset += len;
226+
self.should_flush = true;
227+
228+
LocationInFile { offset, len }
229+
}
165230
}
166231

167232
/// Open a memory mapped file, create it if it doesn't exist.
@@ -172,26 +237,16 @@ pub(crate) fn open_file<V: ValueSerde>(
172237
offset: usize,
173238
) -> MmapFileResult<(FileHandler<V, RW>, FileHandler<V, RO>)> {
174239
let file = OpenOptions::new().read(true).write(true).create(true).truncate(false).open(path)?;
175-
let size = file.metadata()?.len();
176240
let mmap = unsafe { MmapOptions::new().len(config.max_size).map_mut(&file)? };
177241
let mmap_ptr = mmap.as_ptr();
178-
let mmap_file = MMapFile {
179-
config,
180-
file,
181-
mmap,
182-
size: size.try_into().expect("size should fit in usize"),
183-
offset,
184-
should_flush: false,
185-
_value_type: PhantomData {},
186-
};
242+
let mmap_file = MMapFile::new(config, file, mmap, offset)?;
187243
let shared_mmap_file = Arc::new(Mutex::new(mmap_file));
188244

189-
let mut write_file_handler: FileHandler<V, RW> = FileHandler {
245+
let write_file_handler: FileHandler<V, RW> = FileHandler {
190246
memory_ptr: mmap_ptr,
191247
mmap_file: shared_mmap_file.clone(),
192248
_mode: PhantomData,
193249
};
194-
write_file_handler.grow_file_if_needed(0);
195250

196251
let read_file_handler: FileHandler<V, RO> =
197252
FileHandler { memory_ptr: mmap_ptr, mmap_file: shared_mmap_file, _mode: PhantomData };
@@ -210,41 +265,12 @@ pub(crate) struct FileHandler<V: ValueSerde, Mode: TransactionKind> {
210265
unsafe impl<V: ValueSerde, Mode: TransactionKind> Send for FileHandler<V, Mode> {}
211266
unsafe impl<V: ValueSerde, Mode: TransactionKind> Sync for FileHandler<V, Mode> {}
212267

213-
impl<V: ValueSerde> FileHandler<V, RW> {
214-
fn grow_file_if_needed(&mut self, offset: usize) {
215-
let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned");
216-
if mmap_file.size < offset + mmap_file.config.max_object_size {
217-
debug!(
218-
"Attempting to grow file. File size: {}, offset: {}, max_object_size: {}",
219-
mmap_file.size, offset, mmap_file.config.max_object_size
220-
);
221-
mmap_file.grow();
222-
}
223-
}
224-
}
225-
226268
impl<V: ValueSerde + Debug> Writer<V> for FileHandler<V, RW> {
227269
fn append(&mut self, val: &V::Value) -> LocationInFile {
228270
trace!("Inserting object: {:?}", val);
229271
let serialized = V::serialize(val).expect("Should be able to serialize");
230-
let len = serialized.len();
231-
let offset;
232-
{
233-
let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned");
234-
offset = mmap_file.offset;
235-
trace!("Inserting object at offset: {}", offset);
236-
let mmap_slice = &mut mmap_file.mmap[offset..];
237-
mmap_slice[..len].copy_from_slice(&serialized);
238-
mmap_file
239-
.mmap
240-
.flush_async_range(offset, len)
241-
.expect("Failed to asynchronously flush the mmap after inserting");
242-
mmap_file.offset += len;
243-
mmap_file.should_flush = true;
244-
}
245-
let location = LocationInFile { offset, len };
246-
self.grow_file_if_needed(location.next_offset());
247-
location
272+
let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned");
273+
mmap_file.write_at_current_offset(&serialized)
248274
}
249275

250276
fn flush(&self) {

0 commit comments

Comments
 (0)