Skip to content

Commit 95c4310

Browse files
generalltimvisee
andauthored
instead of having constantly running thread, segment creator v2 spans… (#98)
* instead of having constantly running thread, segment creator v2 spans threads on demand * fmt * linter fixes * windows clippy * Use sort_by_key --------- Co-authored-by: Tim Visée <tim@visee.me>
1 parent 2b3f605 commit 95c4310

File tree

2 files changed

+203
-129
lines changed

2 files changed

+203
-129
lines changed

src/lib.rs

Lines changed: 7 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use crossbeam_channel::{Receiver, Sender};
1+
use crate::segment_creator::SegmentCreatorV2;
22
use fs4::fs_std::FileExt;
3-
use log::{debug, info, trace, warn};
3+
use log::{debug, info, trace};
44
pub use segment::{Entry, Segment};
55
use std::cmp::Ordering;
66
use std::collections::HashMap;
@@ -17,6 +17,7 @@ use std::thread;
1717

1818
mod mmap_view_sync;
1919
mod segment;
20+
mod segment_creator;
2021
pub mod test_utils;
2122

2223
#[derive(Debug)]
@@ -75,7 +76,7 @@ pub struct Wal {
7576
/// The segment currently being appended to.
7677
open_segment: OpenSegment,
7778
closed_segments: Vec<ClosedSegment>,
78-
creator: SegmentCreator,
79+
creator: SegmentCreatorV2,
7980

8081
/// The number of closed segments to retain.
8182
retain_closed: NonZeroUsize,
@@ -226,7 +227,7 @@ impl Wal {
226227
}
227228
}
228229

229-
let mut creator = SegmentCreator::new(
230+
let mut creator = SegmentCreatorV2::new(
230231
&path,
231232
unused_segments,
232233
options.segment_capacity,
@@ -604,122 +605,15 @@ fn open_dir_entry(entry: fs::DirEntry) -> Result<Option<WalSegment>> {
604605
}
605606
}
606607

607-
struct SegmentCreator {
608-
/// Receive channel for new segments.
609-
rx: Option<Receiver<OpenSegment>>,
610-
/// The segment creator thread.
611-
///
612-
/// Used for retrieving error upon failure.
613-
thread: Option<thread::JoinHandle<Result<()>>>,
614-
}
615-
616-
impl SegmentCreator {
617-
/// Creates a new segment creator.
618-
///
619-
/// The segment creator must be started before new segments will be created.
620-
pub fn new<P>(
621-
dir: P,
622-
existing: Vec<OpenSegment>,
623-
segment_capacity: usize,
624-
segment_queue_len: usize,
625-
) -> SegmentCreator
626-
where
627-
P: AsRef<Path>,
628-
{
629-
let (tx, rx) = crossbeam_channel::bounded(segment_queue_len);
630-
631-
let dir = dir.as_ref().to_path_buf();
632-
let thread = thread::Builder::new()
633-
.name("wal-segment-creator".to_string())
634-
.spawn(move || create_loop(tx, dir, segment_capacity, existing))
635-
.unwrap();
636-
SegmentCreator {
637-
rx: Some(rx),
638-
thread: Some(thread),
639-
}
640-
}
641-
642-
/// Retrieves the next segment.
643-
pub fn next(&mut self) -> Result<OpenSegment> {
644-
self.rx.as_mut().unwrap().recv().map_err(|_| {
645-
match self.thread.take().map(|join_handle| join_handle.join()) {
646-
Some(Ok(Err(error))) => error,
647-
None => Error::other("segment creator thread already failed"),
648-
Some(Ok(Ok(()))) => unreachable!(
649-
"segment creator thread finished without an error,
650-
but the segment creator is still live"
651-
),
652-
Some(Err(_)) => unreachable!("segment creator thread panicked"),
653-
}
654-
})
655-
}
656-
}
657-
658-
impl Drop for SegmentCreator {
659-
fn drop(&mut self) {
660-
drop(self.rx.take());
661-
if let Some(join_handle) = self.thread.take()
662-
&& let Err(error) = join_handle.join()
663-
{
664-
warn!("Error while shutting down segment creator: {error:?}");
665-
}
666-
}
667-
}
668-
669-
fn create_loop(
670-
tx: Sender<OpenSegment>,
671-
mut path: PathBuf,
672-
capacity: usize,
673-
mut existing_segments: Vec<OpenSegment>,
674-
) -> Result<()> {
675-
// Ensure the existing segments are in ID order.
676-
existing_segments.sort_by(|a, b| a.id.cmp(&b.id));
677-
678-
let mut cont = true;
679-
let mut id = 0;
680-
681-
for segment in existing_segments {
682-
id = segment.id;
683-
if tx.send(segment).is_err() {
684-
cont = false;
685-
break;
686-
}
687-
}
688-
689-
// Directory being a file only applies to Linux
690-
#[cfg(not(target_os = "windows"))]
691-
let dir = File::open(&path)?;
692-
693-
while cont {
694-
id += 1;
695-
path.push(format!("open-{id}"));
696-
let segment = OpenSegment {
697-
id,
698-
segment: Segment::create(&path, capacity)?,
699-
};
700-
path.pop();
701-
// Sync the directory, guaranteeing that the segment file is durably
702-
// stored on the filesystem.
703-
#[cfg(not(target_os = "windows"))]
704-
dir.sync_all()?;
705-
cont = tx.send(segment).is_ok();
706-
}
707-
708-
info!("segment creator shutting down");
709-
Ok(())
710-
}
711-
712608
#[cfg(test)]
713609
mod test {
610+
use crate::test_utils::EntryGenerator;
714611
use log::trace;
715612
use quickcheck::TestResult;
716613
use std::{io::Write, num::NonZeroUsize};
717614
use tempfile::Builder;
718615

719-
use crate::segment::Segment;
720-
use crate::test_utils::EntryGenerator;
721-
722-
use super::{OpenSegment, SegmentCreator, Wal, WalOptions};
616+
use super::{Wal, WalOptions};
723617

724618
fn init_logger() {
725619
let _ = env_logger::builder().is_test(true).try_init();
@@ -1330,22 +1224,6 @@ mod test {
13301224
assert!(Wal::open(dir.path()).is_ok());
13311225
}
13321226

1333-
#[test]
1334-
fn test_segment_creator() {
1335-
init_logger();
1336-
let dir = Builder::new().prefix("segment").tempdir().unwrap();
1337-
1338-
let segments = vec![OpenSegment {
1339-
id: 3,
1340-
segment: Segment::create(dir.path().join("open-3"), 1024).unwrap(),
1341-
}];
1342-
1343-
let mut creator = SegmentCreator::new(dir.path(), segments, 1024, 1);
1344-
for i in 3..10 {
1345-
assert_eq!(i, creator.next().unwrap().id);
1346-
}
1347-
}
1348-
13491227
#[test]
13501228
fn test_record_id_preserving() {
13511229
init_logger();

src/segment_creator.rs

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
use crate::{OpenSegment, Segment};
2+
use log::warn;
3+
#[cfg(not(target_os = "windows"))]
4+
use std::fs::File;
5+
use std::io::Error;
6+
use std::path::{Path, PathBuf};
7+
use std::thread;
8+
9+
pub struct SegmentCreatorV2 {
10+
/// Where to create segments
11+
dir: PathBuf,
12+
/// Segments, which are already created, but not yet handed out.
13+
pending_segments: Vec<OpenSegment>,
14+
15+
/// Capacity of segments to create.
16+
segment_capacity: usize,
17+
18+
/// How many segments to create ahead of time.
19+
segment_queue_len: usize,
20+
21+
/// Current id to create
22+
current_id: u64,
23+
24+
/// The segment creator thread.
25+
/// Used to create segments asynchronously.
26+
thread: Option<thread::JoinHandle<std::io::Result<Vec<OpenSegment>>>>,
27+
}
28+
29+
fn create_segments(
30+
path: PathBuf,
31+
start_id: u64,
32+
count: usize,
33+
segment_capacity: usize,
34+
) -> std::io::Result<Vec<OpenSegment>> {
35+
// Directory being a file only applies to Linux
36+
#[cfg(not(target_os = "windows"))]
37+
let dir = File::open(&path)?;
38+
39+
let mut segments = Vec::with_capacity(count);
40+
41+
for i in 0..count {
42+
let segment_id = start_id + i as u64;
43+
let segment_path = path.join(format!("open-{segment_id}"));
44+
let segment = OpenSegment {
45+
id: segment_id,
46+
segment: Segment::create(segment_path, segment_capacity)?,
47+
};
48+
segments.push(segment);
49+
}
50+
51+
#[cfg(not(target_os = "windows"))]
52+
dir.sync_all()?;
53+
54+
Ok(segments)
55+
}
56+
57+
impl SegmentCreatorV2 {
58+
/// Creates a new segment creator.
59+
///
60+
/// The segment creator must be started before new segments will be created.
61+
pub fn new<P>(
62+
dir: P,
63+
mut existing: Vec<OpenSegment>,
64+
segment_capacity: usize,
65+
segment_queue_len: usize,
66+
) -> SegmentCreatorV2
67+
where
68+
P: AsRef<Path>,
69+
{
70+
let dir = dir.as_ref().to_path_buf();
71+
72+
existing.sort_by_key(|segment| segment.id);
73+
74+
let current_id = existing.last().map_or(1, |s| s.id + 1);
75+
76+
let mut result = Self {
77+
dir,
78+
pending_segments: existing,
79+
segment_capacity,
80+
segment_queue_len: segment_queue_len + 1, // Always create at least one segment
81+
current_id,
82+
thread: None,
83+
};
84+
85+
result.schedule_creation();
86+
87+
result
88+
}
89+
90+
fn schedule_creation(&mut self) {
91+
if self.thread.is_none() && self.pending_segments.len() < self.segment_queue_len {
92+
let count = self
93+
.segment_queue_len
94+
.saturating_sub(self.pending_segments.len());
95+
let dir = self.dir.clone();
96+
let start_id = self.current_id;
97+
let segment_capacity = self.segment_capacity;
98+
99+
self.thread = Some(
100+
thread::Builder::new()
101+
.name("wal-segment-creator".to_string())
102+
.spawn(move || create_segments(dir, start_id, count, segment_capacity))
103+
.unwrap(),
104+
);
105+
}
106+
}
107+
108+
fn refill_pending(&mut self) -> std::io::Result<()> {
109+
if let Some(thread) = self.thread.take() {
110+
if self.pending_segments.is_empty() || thread.is_finished() {
111+
let new_segments = thread
112+
.join()
113+
.map_err(|_| Error::other("segment creation thread panicked"))??;
114+
self.current_id += new_segments.len() as u64;
115+
self.pending_segments.extend(new_segments);
116+
} else {
117+
// Put the thread back if it's not finished, and we don't need segments yet.
118+
self.thread = Some(thread);
119+
}
120+
}
121+
122+
Ok(())
123+
}
124+
125+
pub fn next(&mut self) -> std::io::Result<OpenSegment> {
126+
self.refill_pending()?;
127+
128+
if self.pending_segments.is_empty() {
129+
return Err(Error::other("no segments available"));
130+
}
131+
132+
let segment = self.pending_segments.remove(0);
133+
self.schedule_creation();
134+
Ok(segment)
135+
}
136+
}
137+
138+
impl Drop for SegmentCreatorV2 {
139+
fn drop(&mut self) {
140+
if let Some(thread) = self.thread.take() {
141+
match thread.join() {
142+
Ok(Ok(_)) => {}
143+
Ok(Err(err)) => {
144+
warn!("Error while shutting down segment creator: {err:?}");
145+
}
146+
Err(err) => {
147+
warn!("Segment creator thread panicked: {err:?}");
148+
}
149+
}
150+
}
151+
}
152+
}
153+
154+
#[cfg(test)]
155+
mod tests {
156+
use super::*;
157+
use tempfile::Builder;
158+
159+
fn init_logger() {
160+
let _ = env_logger::builder().is_test(true).try_init();
161+
}
162+
163+
#[test]
164+
fn test_segment_creator_v2() {
165+
init_logger();
166+
let dir = Builder::new().prefix("segment").tempdir().unwrap();
167+
168+
let segments = vec![OpenSegment {
169+
id: 3,
170+
segment: Segment::create(dir.path().join("open-3"), 1024).unwrap(),
171+
}];
172+
173+
let mut creator = SegmentCreatorV2::new(dir.path(), segments, 1024, 1);
174+
for i in 3..10 {
175+
assert_eq!(i, creator.next().unwrap().id);
176+
}
177+
178+
// This awaits for the thread to finish
179+
drop(creator);
180+
181+
// List directory contents
182+
let mut entries: Vec<_> = std::fs::read_dir(dir.path())
183+
.unwrap()
184+
.map(|res| res.map(|e| e.file_name()))
185+
.collect::<Result<_, std::io::Error>>()
186+
.unwrap();
187+
188+
entries.sort();
189+
190+
for entry in &entries {
191+
eprintln!("{:?}", entry);
192+
}
193+
194+
assert_eq!(entries.len(), 10 - 3 + 1); // open-3 existed, open-4 to open-9 created + open-10 created ahead
195+
}
196+
}

0 commit comments

Comments
 (0)