Skip to content

Commit fbb2f2a

Browse files
authored
Fix incorrect flush after truncate, other improvements (#99)
* Just flush full memory map * Simplify zeroing * Fix truncate zeroing Zero the full slice of removed data, not just the first 16 bytes * Bump flush offset to flush new changes after truncation * Promote invalid flush range to error * On load, set flush offset, we don't need to flush existing data * Don't set flush offset before flush succeeds * Rust format * Patch flush offset bump, only move offset back to first zero position We must only move it back, and not forward. Data starting at the current flush offset may still not be flushed. * In test fixture, return temporary directory to not clear eagerly * Add truncate test, also asserting flush offset behavior * Always format message, we don't expect to hit it anymore
1 parent 502a775 commit fbb2f2a

File tree

1 file changed

+125
-40
lines changed

1 file changed

+125
-40
lines changed

src/segment.rs

Lines changed: 125 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use log::{debug, error, log_enabled, trace};
2-
use std::cmp::Ordering;
2+
use std::cmp::{Ordering, min};
33
use std::fmt;
44
use std::fs::{self, OpenOptions};
55
use std::io::{Error, ErrorKind, Result};
@@ -282,13 +282,17 @@ impl Segment {
282282
}
283283
}
284284

285-
let segment = Segment {
285+
let mut segment = Segment {
286286
mmap,
287287
path: path.as_ref().to_path_buf(),
288288
index,
289289
crc,
290290
flush_offset: 0,
291291
};
292+
293+
// Bump flush offset next to last entry, we don't need to flush any existing data
294+
segment.flush_offset = segment.size();
295+
292296
debug!("{segment:?}: opened");
293297
Ok(segment)
294298
}
@@ -343,11 +347,8 @@ impl Segment {
343347
);
344348

345349
if padding > 0 {
346-
let zeros: [u8; 8] = [0; 8];
347-
copy_memory(
348-
&zeros[..padding],
349-
&mut self.as_mut_slice()[offset + HEADER_LEN + entry.len()..],
350-
);
350+
let start = offset + HEADER_LEN + entry.len();
351+
self.as_mut_slice()[start..start + padding].fill(0)
351352
}
352353
crc = crc32c::crc32c_append(
353354
!crc.reverse_bits(),
@@ -385,6 +386,8 @@ impl Segment {
385386
}
386387
trace!("{self:?}: truncating from position {from}");
387388

389+
let zero_end = self.size();
390+
388391
// Remove the index entries.
389392
let deleted = self.index.drain(from..).count();
390393
trace!("{self:?}: truncated {deleted} entries");
@@ -397,10 +400,12 @@ impl Segment {
397400
self.crc = self._read_entry_crc(self.index.len() - 1);
398401
}
399402

400-
// And overwrite the existing data so that we will not read the data back after a crash.
401-
let size = self.size();
402-
let zeroes: [u8; 16] = [0; 16];
403-
copy_memory(&zeroes, &mut self.as_mut_slice()[size..]);
403+
// Zero all deleted entries so that we will not read the data back after a crash
404+
let zero_start = self.size();
405+
self.as_mut_slice()[zero_start..zero_end].fill(0);
406+
407+
// Move flush offset back to write new changes on next flush
408+
self.flush_offset = min(self.flush_offset, zero_start);
404409
}
405410

406411
/// Flushes recently written entries to durable storage.
@@ -411,26 +416,26 @@ impl Segment {
411416

412417
match start.cmp(&end) {
413418
Ordering::Equal => {
419+
// nothing to flush
414420
trace!("{self:?}: nothing to flush");
415-
Ok(())
416-
} // nothing to flush
421+
}
417422
Ordering::Less => {
418423
// flush new elements added since last flush
419424
trace!("{self:?}: flushing byte range [{start}, {end})");
420425
let mut view = unsafe { self.mmap.clone() };
421-
self.flush_offset = end;
422426
view.restrict(start, end - start)?;
423-
view.flush()
427+
view.flush()?;
424428
}
425429
Ordering::Greater => {
426430
// most likely truncated in between flushes
427431
// register new flush offset & flush the whole segment
428-
trace!("{self:?}: flushing after truncation");
429-
let view = unsafe { self.mmap.clone() };
430-
self.flush_offset = end;
431-
view.flush()
432+
error!("{self:?}: invalid flush range, flushing everything");
433+
self.mmap.flush()?;
432434
}
433435
}
436+
437+
self.flush_offset = end;
438+
Ok(())
434439
}
435440

436441
/// Flushes recently written entries to durable storage.
@@ -466,14 +471,11 @@ impl Segment {
466471
let view = unsafe { self.mmap.clone() };
467472
self.flush_offset = end;
468473

469-
let log_msg = if log_enabled!(log::Level::Trace) {
470-
format!("{:?}: async flushing after truncation", &self)
471-
} else {
472-
String::new()
473-
};
474+
let log_msg =
475+
format!("{self:?}: invalid flush range, flushing everything asynchronously");
474476

475477
thread::spawn(move || {
476-
trace!("{log_msg}");
478+
error!("{log_msg}");
477479
view.flush()
478480
})
479481
}
@@ -693,6 +695,21 @@ fn padding(len: usize) -> usize {
693695
4usize.wrapping_sub(len) & 7
694696
}
695697

698+
/// Returns total number of bytes used on disk to store an entry of length `len`.
699+
///
700+
/// Includes header, padding and CRC.
701+
///
702+
/// | component | type |
703+
/// | ---------------------------- | ---- |
704+
/// | length | u64 |
705+
/// | data | |
706+
/// | padding | |
707+
/// | CRC(length + data + padding) | u32 |
708+
#[cfg(test)]
709+
pub fn entry_size_disk(len: usize) -> usize {
710+
len + entry_overhead(len)
711+
}
712+
696713
/// Returns the overhead of storing an entry of length `len`.
697714
pub fn entry_overhead(len: usize) -> usize {
698715
padding(len) + HEADER_LEN + CRC_LEN
@@ -705,12 +722,15 @@ pub fn segment_overhead() -> usize {
705722

706723
#[cfg(test)]
707724
mod test {
708-
use std::io::ErrorKind;
709-
use tempfile::Builder;
725+
use std::{io::ErrorKind, path::Path};
726+
use tempfile::{Builder, TempDir};
710727

711728
use super::{Segment, padding};
712729

713-
use crate::test_utils::EntryGenerator;
730+
use crate::{
731+
segment::{HEADER_LEN, entry_size_disk},
732+
test_utils::EntryGenerator,
733+
};
714734

715735
#[test]
716736
fn test_pad_len() {
@@ -733,11 +753,17 @@ mod test {
733753
assert_eq!(5, padding(15));
734754
}
735755

736-
fn create_segment(len: usize) -> Segment {
756+
fn create_segment(len: usize) -> (Segment, TempDir) {
737757
let dir = Builder::new().prefix("segment").tempdir().unwrap();
738758
let mut path = dir.path().to_path_buf();
739759
path.push("sync-segment");
740-
Segment::create(path, len).unwrap()
760+
(Segment::create(path, len).unwrap(), dir)
761+
}
762+
763+
fn load_segment(dir: impl AsRef<Path>) -> Segment {
764+
let mut path = dir.as_ref().to_path_buf();
765+
path.push("sync-segment");
766+
Segment::open(path).unwrap()
741767
}
742768

743769
fn init_logger() {
@@ -764,15 +790,74 @@ mod test {
764790
#[test]
765791
fn test_append() {
766792
init_logger();
767-
check_append(&mut create_segment(8));
768-
check_append(&mut create_segment(9));
769-
check_append(&mut create_segment(32));
770-
check_append(&mut create_segment(100));
771-
check_append(&mut create_segment(1023));
772-
check_append(&mut create_segment(1024));
773-
check_append(&mut create_segment(1025));
774-
check_append(&mut create_segment(4096));
775-
check_append(&mut create_segment(8 * 1024 * 1024));
793+
check_append(&mut create_segment(8).0);
794+
check_append(&mut create_segment(9).0);
795+
check_append(&mut create_segment(32).0);
796+
check_append(&mut create_segment(100).0);
797+
check_append(&mut create_segment(1023).0);
798+
check_append(&mut create_segment(1024).0);
799+
check_append(&mut create_segment(1025).0);
800+
check_append(&mut create_segment(4096).0);
801+
check_append(&mut create_segment(8 * 1024 * 1024).0);
802+
}
803+
804+
#[test]
805+
fn test_truncate() {
806+
init_logger();
807+
let (mut segment, dir) = create_segment(4096);
808+
809+
segment.append(&b"0".as_slice()).unwrap();
810+
segment.append(&b"1".as_slice()).unwrap();
811+
segment.append(&b"2".as_slice()).unwrap();
812+
segment.append(&b"3".as_slice()).unwrap();
813+
814+
// Truncate beyond the end is a no-op
815+
assert_eq!(segment.len(), 4);
816+
segment.truncate(4);
817+
assert_eq!(segment.len(), 4);
818+
819+
// Until we flush, flush offset remains zero
820+
assert_eq!(segment.flush_offset, 0);
821+
segment.flush().unwrap();
822+
assert_eq!(segment.flush_offset, HEADER_LEN + entry_size_disk(1) * 4);
823+
824+
// Truncate to keep one entry
825+
segment.truncate(1);
826+
assert_eq!(segment.len(), 1);
827+
assert_eq!(segment.flush_offset, HEADER_LEN + entry_size_disk(1));
828+
829+
// Add a new items (index 2, 3), flush offset remains at index 1 until we flush
830+
segment.append(&b"12345".as_slice()).unwrap();
831+
segment.append(&b"67890".as_slice()).unwrap();
832+
assert_eq!(segment.len(), 3);
833+
assert_eq!(segment.flush_offset, HEADER_LEN + entry_size_disk(1));
834+
835+
// Flush and reload
836+
// This was broken before <https://github.com/qdrant/wal/pull/99>, as it wouldn't fully
837+
// flush the last two appended operations
838+
segment.flush().unwrap();
839+
assert_eq!(
840+
segment.flush_offset,
841+
HEADER_LEN + entry_size_disk(1) + entry_size_disk(5) * 2,
842+
);
843+
let mut segment = load_segment(&dir);
844+
assert_eq!(segment.len(), 3);
845+
assert_eq!(
846+
segment.flush_offset,
847+
HEADER_LEN + entry_size_disk(1) + entry_size_disk(5) * 2,
848+
);
849+
850+
// Truncate all (clear) and assert flush offset is bumped
851+
segment.truncate(0);
852+
assert_eq!(segment.len(), 0);
853+
assert_eq!(segment.flush_offset, HEADER_LEN);
854+
855+
// Flush and reload
856+
segment.flush().unwrap();
857+
assert_eq!(segment.flush_offset, HEADER_LEN);
858+
let segment = load_segment(&dir);
859+
assert_eq!(segment.len(), 0);
860+
assert_eq!(segment.flush_offset, HEADER_LEN);
776861
}
777862

778863
#[test]
@@ -785,7 +870,7 @@ mod test {
785870
#[test]
786871
fn test_entries() {
787872
init_logger();
788-
let mut segment = create_segment(4096);
873+
let (mut segment, _dir) = create_segment(4096);
789874
let entries: &[&[u8]] = &[
790875
b"",
791876
b"0",

0 commit comments

Comments
 (0)