Skip to content
Merged
160 changes: 124 additions & 36 deletions src/segment.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use log::{debug, error, log_enabled, trace};
use std::cmp::Ordering;
use std::cmp::{Ordering, min};
use std::fmt;
use std::fs::{self, OpenOptions};
use std::io::{Error, ErrorKind, Result};
Expand Down Expand Up @@ -282,13 +282,17 @@ impl Segment {
}
}

let segment = Segment {
let mut segment = Segment {
mmap,
path: path.as_ref().to_path_buf(),
index,
crc,
flush_offset: 0,
};

// Bump flush offset next to last entry, we don't need to flush any existing data
segment.flush_offset = segment.size();

debug!("{segment:?}: opened");
Ok(segment)
}
Expand Down Expand Up @@ -343,11 +347,8 @@ impl Segment {
);

if padding > 0 {
let zeros: [u8; 8] = [0; 8];
copy_memory(
&zeros[..padding],
&mut self.as_mut_slice()[offset + HEADER_LEN + entry.len()..],
);
let start = offset + HEADER_LEN + entry.len();
self.as_mut_slice()[start..start + padding].fill(0)
}
crc = crc32c::crc32c_append(
!crc.reverse_bits(),
Expand Down Expand Up @@ -385,6 +386,8 @@ impl Segment {
}
trace!("{self:?}: truncating from position {from}");

let zero_end = self.size();

// Remove the index entries.
let deleted = self.index.drain(from..).count();
trace!("{self:?}: truncated {deleted} entries");
Expand All @@ -397,10 +400,12 @@ impl Segment {
self.crc = self._read_entry_crc(self.index.len() - 1);
}

// And overwrite the existing data so that we will not read the data back after a crash.
let size = self.size();
let zeroes: [u8; 16] = [0; 16];
copy_memory(&zeroes, &mut self.as_mut_slice()[size..]);
// Zero all deleted entries so that we will not read the data back after a crash
let zero_start = self.size();
self.as_mut_slice()[zero_start..zero_end].fill(0);

// Move flush offset back to write new changes on next flush
self.flush_offset = min(self.flush_offset, zero_start);
}

/// Flushes recently written entries to durable storage.
Expand All @@ -411,26 +416,26 @@ impl Segment {

match start.cmp(&end) {
Ordering::Equal => {
// nothing to flush
trace!("{self:?}: nothing to flush");
Ok(())
} // nothing to flush
}
Ordering::Less => {
// flush new elements added since last flush
trace!("{self:?}: flushing byte range [{start}, {end})");
let mut view = unsafe { self.mmap.clone() };
self.flush_offset = end;
view.restrict(start, end - start)?;
view.flush()
view.flush()?;
}
Ordering::Greater => {
// most likely truncated in between flushes
// register new flush offset & flush the whole segment
trace!("{self:?}: flushing after truncation");
let view = unsafe { self.mmap.clone() };
self.flush_offset = end;
view.flush()
error!("{self:?}: invalid flush range, flushing everything");
self.mmap.flush()?;
}
}

self.flush_offset = end;
Ok(())
}

/// Flushes recently written entries to durable storage.
Expand Down Expand Up @@ -467,13 +472,13 @@ impl Segment {
self.flush_offset = end;

let log_msg = if log_enabled!(log::Level::Trace) {
format!("{:?}: async flushing after truncation", &self)
format!("{self:?}: invalid flush range, flushing everything")
} else {
String::new()
};

thread::spawn(move || {
trace!("{log_msg}");
error!("{log_msg}");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe warn? error should mean something unexpected happened, but as I understood, this path is possible

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of wait, is that expected that with new changes this path is an actual error?

Copy link
Member Author

@timvisee timvisee Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new changes it should be impossible to hit this branch. That's why I promoted it to an error.

Still want to demote it to a warning?

view.flush()
})
}
Expand Down Expand Up @@ -693,6 +698,21 @@ fn padding(len: usize) -> usize {
4usize.wrapping_sub(len) & 7
}

/// Returns total number of bytes used on disk to store an entry of length `len`.
///
/// Includes header, padding and CRC.
///
/// | component | type |
/// | ---------------------------- | ---- |
/// | length | u64 |
/// | data | |
/// | padding | |
/// | CRC(length + data + padding) | u32 |
#[cfg(test)]
pub fn entry_size_disk(len: usize) -> usize {
len + entry_overhead(len)
}

/// Returns the overhead of storing an entry of length `len`.
pub fn entry_overhead(len: usize) -> usize {
padding(len) + HEADER_LEN + CRC_LEN
Expand All @@ -705,12 +725,15 @@ pub fn segment_overhead() -> usize {

#[cfg(test)]
mod test {
use std::io::ErrorKind;
use tempfile::Builder;
use std::{io::ErrorKind, path::Path};
use tempfile::{Builder, TempDir};

use super::{Segment, padding};

use crate::test_utils::EntryGenerator;
use crate::{
segment::{HEADER_LEN, entry_size_disk},
test_utils::EntryGenerator,
};

#[test]
fn test_pad_len() {
Expand All @@ -733,11 +756,17 @@ mod test {
assert_eq!(5, padding(15));
}

fn create_segment(len: usize) -> Segment {
fn create_segment(len: usize) -> (Segment, TempDir) {
let dir = Builder::new().prefix("segment").tempdir().unwrap();
let mut path = dir.path().to_path_buf();
path.push("sync-segment");
Segment::create(path, len).unwrap()
(Segment::create(path, len).unwrap(), dir)
}

fn load_segment(dir: impl AsRef<Path>) -> Segment {
let mut path = dir.as_ref().to_path_buf();
path.push("sync-segment");
Segment::open(path).unwrap()
}

fn init_logger() {
Expand All @@ -764,15 +793,74 @@ mod test {
#[test]
fn test_append() {
init_logger();
check_append(&mut create_segment(8));
check_append(&mut create_segment(9));
check_append(&mut create_segment(32));
check_append(&mut create_segment(100));
check_append(&mut create_segment(1023));
check_append(&mut create_segment(1024));
check_append(&mut create_segment(1025));
check_append(&mut create_segment(4096));
check_append(&mut create_segment(8 * 1024 * 1024));
check_append(&mut create_segment(8).0);
check_append(&mut create_segment(9).0);
check_append(&mut create_segment(32).0);
check_append(&mut create_segment(100).0);
check_append(&mut create_segment(1023).0);
check_append(&mut create_segment(1024).0);
check_append(&mut create_segment(1025).0);
check_append(&mut create_segment(4096).0);
check_append(&mut create_segment(8 * 1024 * 1024).0);
}

#[test]
fn test_truncate() {
init_logger();
let (mut segment, dir) = create_segment(4096);

segment.append(&b"0".as_slice()).unwrap();
segment.append(&b"1".as_slice()).unwrap();
segment.append(&b"2".as_slice()).unwrap();
segment.append(&b"3".as_slice()).unwrap();

// Truncate beyond the end is a no-op
assert_eq!(segment.len(), 4);
segment.truncate(4);
assert_eq!(segment.len(), 4);

// Until we flush, flush offset remains zero
assert_eq!(segment.flush_offset, 0);
segment.flush().unwrap();
assert_eq!(segment.flush_offset, HEADER_LEN + entry_size_disk(1) * 4);

// Truncate to keep one entry
segment.truncate(1);
assert_eq!(segment.len(), 1);
assert_eq!(segment.flush_offset, HEADER_LEN + entry_size_disk(1));

// Add a new items (index 2, 3), flush offset remains at index 1 until we flush
segment.append(&b"12345".as_slice()).unwrap();
segment.append(&b"67890".as_slice()).unwrap();
assert_eq!(segment.len(), 3);
assert_eq!(segment.flush_offset, HEADER_LEN + entry_size_disk(1));

// Flush and reload
// This was broken before <https://github.com/qdrant/wal/pull/99>, as it wouldn't fully
// flush the last two appended operations
segment.flush().unwrap();
assert_eq!(
segment.flush_offset,
HEADER_LEN + entry_size_disk(1) + entry_size_disk(5) * 2,
);
let mut segment = load_segment(&dir);
assert_eq!(segment.len(), 3);
assert_eq!(
segment.flush_offset,
HEADER_LEN + entry_size_disk(1) + entry_size_disk(5) * 2,
);

// Truncate all (clear) and assert flush offset is bumped
segment.truncate(0);
assert_eq!(segment.len(), 0);
assert_eq!(segment.flush_offset, HEADER_LEN);

// Flush and reload
segment.flush().unwrap();
assert_eq!(segment.flush_offset, HEADER_LEN);
let segment = load_segment(&dir);
assert_eq!(segment.len(), 0);
assert_eq!(segment.flush_offset, HEADER_LEN);
}

#[test]
Expand All @@ -785,7 +873,7 @@ mod test {
#[test]
fn test_entries() {
init_logger();
let mut segment = create_segment(4096);
let (mut segment, _dir) = create_segment(4096);
let entries: &[&[u8]] = &[
b"",
b"0",
Expand Down