Skip to content

Commit 0b64cde

Browse files
committed
prevent absent unused segment from overwriting existing segments
1 parent baee8ba commit 0b64cde

File tree

4 files changed

+141
-9
lines changed

4 files changed

+141
-9
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ rand_distr = "0.5"
2222
fs4 = "0.13.1"
2323
# fs4 depends on rustix, but pulling this dependency explicitly into the tree
2424
# to use direct functions on FreeBSD
25-
rustix = { version = "1", features = [ "fs" ]}
25+
rustix = { version = "1", features = ["fs"] }
2626
crossbeam-channel = "0.5.15"
2727

2828
# Binary dependencies

src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ mod segment;
2020
mod segment_creator;
2121
pub mod test_utils;
2222

23+
#[cfg(test)]
24+
mod test_segment_recovery;
25+
2326
#[derive(Debug)]
2427
pub struct WalOptions {
2528
/// The segment capacity. Defaults to 32MiB.
@@ -229,6 +232,7 @@ impl Wal {
229232

230233
let mut creator = SegmentCreatorV2::new(
231234
&path,
235+
open_segment.as_ref(),
232236
unused_segments,
233237
options.segment_capacity,
234238
options.segment_queue_len,

src/segment_creator.rs

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::{OpenSegment, Segment};
22
use log::warn;
3+
use std::cmp::max;
34
#[cfg(not(target_os = "windows"))]
45
use std::fs::File;
56
use std::io::Error;
@@ -60,7 +61,8 @@ impl SegmentCreatorV2 {
6061
/// The segment creator must be started before new segments will be created.
6162
pub fn new<P>(
6263
dir: P,
63-
mut existing: Vec<OpenSegment>,
64+
open_segment: Option<&OpenSegment>,
65+
mut unused_segments: Vec<OpenSegment>,
6466
segment_capacity: usize,
6567
segment_queue_len: usize,
6668
) -> SegmentCreatorV2
@@ -69,15 +71,20 @@ impl SegmentCreatorV2 {
6971
{
7072
let dir = dir.as_ref().to_path_buf();
7173

72-
existing.sort_by_key(|segment| segment.id);
74+
unused_segments.sort_by_key(|segment| segment.id);
7375

74-
let current_id = existing.last().map_or(1, |s| s.id + 1);
76+
// Find the current id by incrementing either last unused segment or open segment
77+
// If neither exists, start at 1
78+
let current_id = unused_segments
79+
.last()
80+
.or(open_segment)
81+
.map_or(1, |s| s.id + 1);
7582

7683
let mut result = Self {
7784
dir,
78-
pending_segments: existing,
85+
pending_segments: unused_segments,
7986
segment_capacity,
80-
segment_queue_len: segment_queue_len + 1, // Always create at least one segment
87+
segment_queue_len: max(segment_queue_len, 1), // Always create at least one segment
8188
current_id,
8289
thread: None,
8390
};
@@ -165,13 +172,19 @@ mod tests {
165172
init_logger();
166173
let dir = Builder::new().prefix("segment").tempdir().unwrap();
167174

168-
let segments = vec![OpenSegment {
175+
let segment = Some(OpenSegment {
169176
id: 3,
170177
segment: Segment::create(dir.path().join("open-3"), 1024).unwrap(),
178+
});
179+
180+
let unused_segments = vec![OpenSegment {
181+
id: 4,
182+
segment: Segment::create(dir.path().join("open-4"), 1024).unwrap(),
171183
}];
172184

173-
let mut creator = SegmentCreatorV2::new(dir.path(), segments, 1024, 1);
174-
for i in 3..10 {
185+
let mut creator =
186+
SegmentCreatorV2::new(dir.path(), segment.as_ref(), unused_segments, 1024, 1);
187+
for i in 4..10 {
175188
assert_eq!(i, creator.next().unwrap().id);
176189
}
177190

@@ -191,6 +204,45 @@ mod tests {
191204
eprintln!("{:?}", entry);
192205
}
193206

207+
assert_eq!(entries.len(), 10 - 3 + 1); // open-3 and open-4 existed, open-5 to open-9 created + open-10 created ahead
208+
}
209+
210+
#[test]
211+
fn test_segment_creator_v2_no_unused() {
212+
init_logger();
213+
let dir = Builder::new().prefix("segment").tempdir().unwrap();
214+
215+
let segment = Some(OpenSegment {
216+
id: 3,
217+
segment: Segment::create(dir.path().join("open-3"), 1024).unwrap(),
218+
});
219+
220+
let unused_segments = vec![];
221+
222+
let mut creator =
223+
SegmentCreatorV2::new(dir.path(), segment.as_ref(), unused_segments, 1024, 1);
224+
for i in 4..10 {
225+
let another_segment = creator.next().unwrap();
226+
eprintln!("another_segment = {:#?}", another_segment);
227+
assert_eq!(i, another_segment.id);
228+
}
229+
230+
// This awaits for the thread to finish
231+
drop(creator);
232+
233+
// List directory contents
234+
let mut entries: Vec<_> = std::fs::read_dir(dir.path())
235+
.unwrap()
236+
.map(|res| res.map(|e| e.file_name()))
237+
.collect::<Result<_, std::io::Error>>()
238+
.unwrap();
239+
240+
entries.sort();
241+
242+
for entry in &entries {
243+
eprintln!("{:?}", entry);
244+
}
245+
194246
assert_eq!(entries.len(), 10 - 3 + 1); // open-3 existed, open-4 to open-9 created + open-10 created ahead
195247
}
196248
}

src/test_segment_recovery.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
use crate::test_utils::EntryGenerator;
2+
use crate::{Wal, WalOptions};
3+
use std::num::NonZeroUsize;
4+
use tempfile::Builder;
5+
6+
#[test]
7+
fn test_handling_missing_empty_segment() {
8+
let entry_count = 5;
9+
10+
let dir = Builder::new().prefix("wal").tempdir().unwrap();
11+
let options = WalOptions {
12+
segment_capacity: 1024,
13+
segment_queue_len: 0,
14+
retain_closed: NonZeroUsize::new(1).unwrap(),
15+
};
16+
17+
let mut wal = Wal::with_options(dir.path(), &options).unwrap();
18+
let entries = EntryGenerator::new().take(entry_count).collect::<Vec<_>>();
19+
20+
for entry in &entries {
21+
wal.append(entry).unwrap();
22+
}
23+
24+
wal.flush_open_segment().unwrap();
25+
26+
drop(wal);
27+
28+
eprintln!("------------ first WAL created ------------");
29+
30+
let wal = Wal::open(dir.path()).unwrap();
31+
32+
let num = wal.num_entries();
33+
34+
assert_eq!(num, entry_count as u64);
35+
36+
drop(wal);
37+
38+
eprintln!("------------ WAL opened and closed ------------");
39+
40+
// List directory contents
41+
42+
let entries: Vec<_> = std::fs::read_dir(dir.path())
43+
.unwrap()
44+
.map(|res| res.map(|e| e.file_name()))
45+
.collect::<Result<_, std::io::Error>>()
46+
.unwrap();
47+
48+
for entry in &entries {
49+
eprintln!("{:?}", entry);
50+
}
51+
52+
// Remove the last segment file (which is empty)
53+
let last_segment_file = dir.path().join("open-2");
54+
std::fs::remove_file(last_segment_file).unwrap();
55+
56+
eprintln!("------------ removed last segment ------------");
57+
58+
let wal = Wal::open(dir.path()).unwrap();
59+
60+
let num = wal.num_entries();
61+
62+
assert_eq!(num, entry_count as u64);
63+
64+
// sleep, to account for background process creating more segments
65+
std::thread::sleep(std::time::Duration::from_millis(100));
66+
67+
drop(wal);
68+
69+
eprintln!("------------ WAL opened and closed again ------------");
70+
71+
let wal = Wal::open(dir.path()).unwrap();
72+
73+
let num = wal.num_entries();
74+
75+
assert_eq!(num, entry_count as u64);
76+
}

0 commit comments

Comments
 (0)