Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ rand_distr = "0.5"
fs4 = "0.13.1"
# fs4 depends on rustix, but pulling this dependency explicitly into the tree
# to use direct functions on FreeBSD
rustix = { version = "1", features = [ "fs" ]}
rustix = { version = "1", features = ["fs"] }
crossbeam-channel = "0.5.15"

# Binary dependencies
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ mod segment;
mod segment_creator;
pub mod test_utils;

#[cfg(test)]
mod test_segment_recovery;

#[derive(Debug)]
pub struct WalOptions {
/// The segment capacity. Defaults to 32MiB.
Expand Down Expand Up @@ -229,6 +232,7 @@ impl Wal {

let mut creator = SegmentCreatorV2::new(
&path,
open_segment.as_ref(),
unused_segments,
options.segment_capacity,
options.segment_queue_len,
Expand Down
96 changes: 88 additions & 8 deletions src/segment_creator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{OpenSegment, Segment};
use log::warn;
use std::cmp::max;
#[cfg(not(target_os = "windows"))]
use std::fs::File;
use std::io::Error;
Expand Down Expand Up @@ -60,7 +61,8 @@ impl SegmentCreatorV2 {
/// The segment creator must be started before new segments will be created.
pub fn new<P>(
dir: P,
mut existing: Vec<OpenSegment>,
open_segment: Option<&OpenSegment>,
mut unused_segments: Vec<OpenSegment>,
segment_capacity: usize,
segment_queue_len: usize,
) -> SegmentCreatorV2
Expand All @@ -69,15 +71,28 @@ impl SegmentCreatorV2 {
{
let dir = dir.as_ref().to_path_buf();

existing.sort_by_key(|segment| segment.id);
unused_segments.sort_by_key(|segment| segment.id);

let current_id = existing.last().map_or(1, |s| s.id + 1);
// Find the current id by incrementing either last unused segment or open segment
// If neither exists, start at 1
let current_id = unused_segments
.last()
.or(open_segment)
.map_or(1, |s| s.id + 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a debug assertion to ensure any open_segment has a lower ID than all unused_segments.


// Any open segment must have lower ID than all unused segments
debug_assert!(
unused_segments
.iter()
.all(|s| open_segment.as_ref().is_none_or(|o| s.id > o.id)),
"open_segment must have lower ID than all unused_segments",
);

let mut result = Self {
dir,
pending_segments: existing,
pending_segments: unused_segments,
segment_capacity,
segment_queue_len: segment_queue_len + 1, // Always create at least one segment
segment_queue_len: max(segment_queue_len, 1), // Always create at least one segment
current_id,
thread: None,
};
Expand Down Expand Up @@ -165,13 +180,19 @@ mod tests {
init_logger();
let dir = Builder::new().prefix("segment").tempdir().unwrap();

let segments = vec![OpenSegment {
let segment = Some(OpenSegment {
id: 3,
segment: Segment::create(dir.path().join("open-3"), 1024).unwrap(),
});

let unused_segments = vec![OpenSegment {
id: 4,
segment: Segment::create(dir.path().join("open-4"), 1024).unwrap(),
}];

let mut creator = SegmentCreatorV2::new(dir.path(), segments, 1024, 1);
for i in 3..10 {
let mut creator =
SegmentCreatorV2::new(dir.path(), segment.as_ref(), unused_segments, 1024, 1);
for i in 4..10 {
assert_eq!(i, creator.next().unwrap().id);
}

Expand All @@ -191,6 +212,65 @@ mod tests {
eprintln!("{:?}", entry);
}

assert_eq!(entries.len(), 10 - 3 + 1); // open-3 and open-4 existed, open-5 to open-9 created + open-10 created ahead
}

#[test]
fn test_segment_creator_v2_no_unused() {
init_logger();
let dir = Builder::new().prefix("segment").tempdir().unwrap();

let segment = Some(OpenSegment {
id: 3,
segment: Segment::create(dir.path().join("open-3"), 1024).unwrap(),
});

let unused_segments = vec![];

let mut creator =
SegmentCreatorV2::new(dir.path(), segment.as_ref(), unused_segments, 1024, 1);
for i in 4..10 {
let another_segment = creator.next().unwrap();
eprintln!("another_segment = {:#?}", another_segment);
assert_eq!(i, another_segment.id);
}

// This awaits for the thread to finish
drop(creator);

// List directory contents
let mut entries: Vec<_> = std::fs::read_dir(dir.path())
.unwrap()
.map(|res| res.map(|e| e.file_name()))
.collect::<Result<_, std::io::Error>>()
.unwrap();

entries.sort();

for entry in &entries {
eprintln!("{:?}", entry);
}

assert_eq!(entries.len(), 10 - 3 + 1); // open-3 existed, open-4 to open-9 created + open-10 created ahead
}

#[test]
#[should_panic(expected = "open_segment must have lower ID than all unused_segments")]
fn test_segment_creator_v2_invalid_open() {
init_logger();
let dir = Builder::new().prefix("segment").tempdir().unwrap();

let segment = Some(OpenSegment {
id: 4,
segment: Segment::create(dir.path().join("open-3"), 1024).unwrap(),
});

let unused_segments = vec![OpenSegment {
id: 4,
segment: Segment::create(dir.path().join("open-4"), 1024).unwrap(),
}];

// Expected to panic, open segment cannot have same or higher ID than unused segments
SegmentCreatorV2::new(dir.path(), segment.as_ref(), unused_segments, 1024, 1);
}
}
80 changes: 80 additions & 0 deletions src/test_segment_recovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use crate::test_utils::EntryGenerator;
use crate::{Wal, WalOptions};
use std::num::NonZeroUsize;
use tempfile::Builder;

#[test]
fn test_handling_missing_empty_segment() {
let entry_count = 5;

let dir = Builder::new().prefix("wal").tempdir().unwrap();
let options = WalOptions {
segment_capacity: 1024,
segment_queue_len: 0,
retain_closed: NonZeroUsize::new(1).unwrap(),
};

let mut wal = Wal::with_options(dir.path(), &options).unwrap();
let entries = EntryGenerator::new().take(entry_count).collect::<Vec<_>>();

for entry in &entries {
wal.append(entry).unwrap();
}

wal.flush_open_segment().unwrap();

drop(wal);

eprintln!("------------ first WAL created ------------");

let wal = Wal::open(dir.path()).unwrap();

let num = wal.num_entries();

assert_eq!(num, entry_count as u64);

drop(wal);

eprintln!("------------ WAL opened and closed ------------");

// List directory contents

let entries: Vec<_> = std::fs::read_dir(dir.path())
.unwrap()
.map(|res| res.map(|e| e.file_name()))
.collect::<Result<_, std::io::Error>>()
.unwrap();

for entry in &entries {
eprintln!("{:?}", entry);
}

// Remove the last segment file (which is empty)
assert!(
!dir.path().join("open-3").exists(),
"open-3 should not exist",
);
let last_segment_file = dir.path().join("open-2");
std::fs::remove_file(last_segment_file).unwrap();

eprintln!("------------ removed last segment ------------");

let wal = Wal::open(dir.path()).unwrap();

let num = wal.num_entries();

assert_eq!(num, entry_count as u64);

// sleep, to account for background process creating more segments
std::thread::sleep(std::time::Duration::from_millis(100));

drop(wal);

eprintln!("------------ WAL opened and closed again ------------");

let wal = Wal::open(dir.path()).unwrap();

let num = wal.num_entries();

assert_eq!(num, entry_count as u64);
}