Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ fn close_segment(mut segment: OpenSegment, start_index: u64) -> Result<ClosedSeg
.path()
.with_file_name(format!("closed-{start_index}"));
segment.segment.rename(new_path)?;
segment.segment.close();
Ok(ClosedSegment {
start_index,
segment: segment.segment,
Expand Down Expand Up @@ -621,7 +622,10 @@ mod test {
use crate::test_utils::EntryGenerator;
use log::trace;
use quickcheck::TestResult;
use std::{io::Write, num::NonZeroUsize};
use std::{
io::Write,
num::{NonZeroU8, NonZeroUsize},
};
use tempfile::Builder;

use super::{Wal, WalOptions};
Expand Down Expand Up @@ -960,8 +964,10 @@ mod test {
#[test]
fn check_prefix_truncate() {
init_logger();
fn prefix_truncate(entry_count: u8, until: u8, retain_closed: NonZeroUsize) -> TestResult {
trace!("prefix truncate; entry_count: {entry_count}, until: {until}");
fn prefix_truncate(entry_count: u8, until: u8, retain_closed: NonZeroU8) -> TestResult {
trace!(
"prefix truncate; entry_count: {entry_count}, until: {until}, retain_closed: {retain_closed}",
);
if until > entry_count {
return TestResult::discard();
}
Expand All @@ -971,7 +977,7 @@ mod test {
&WalOptions {
segment_capacity: 80,
segment_queue_len: 3,
retain_closed,
retain_closed: NonZeroUsize::from(retain_closed),
},
)
.unwrap();
Expand All @@ -980,7 +986,7 @@ mod test {
.collect::<Vec<_>>();

let mut has_ever_reached_max = false;
let retain_closed = retain_closed.get();
let retain_closed = retain_closed.get() as usize;

for entry in &entries {
wal.append(entry).unwrap();
Expand All @@ -993,7 +999,12 @@ mod test {

let retained = if has_ever_reached_max {
// If it ever reaches max, it should stay there
wal.closed_segments.len() == retain_closed
if until < entry_count {
// If `until` is (much) lower we might retain more to satisfy prefix_truncate
wal.closed_segments.len() >= retain_closed
} else {
wal.closed_segments.len() == retain_closed
}
Comment on lines +1002 to +1007
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

until is randomized and might be very small. If so, we might retain more segments than configured to satisfy the prefix_truncate above.

This fixes the test by also allowing us to keep more retained segments.

} else {
wal.closed_segments.len() < retain_closed
};
Expand All @@ -1003,7 +1014,7 @@ mod test {
num_entries <= entry_count && num_entries >= entry_count - until && retained,
)
}
quickcheck::quickcheck(prefix_truncate as fn(u8, u8, NonZeroUsize) -> TestResult);
quickcheck::quickcheck(prefix_truncate as fn(u8, u8, NonZeroU8) -> TestResult);
}

#[test]
Expand Down
10 changes: 10 additions & 0 deletions src/mmap_view_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ pub struct MmapViewSync {
}

impl MmapViewSync {
pub fn close(&self) {
#[cfg(target_os = "linux")]
unsafe {
let mmap = &mut *self.inner.get();
if let Err(err) = mmap.unchecked_advise(memmap2::UncheckedAdvice::DontNeed) {
log::warn!("Erorr clearing closed wal segment: {err:?}");
}
}
}

pub fn from_file(file: &File, offset: usize, capacity: usize) -> Result<MmapViewSync> {
let mmap = unsafe {
MmapOptions::new()
Expand Down
4 changes: 4 additions & 0 deletions src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ pub struct Segment {
}

impl Segment {
pub fn close(&self) {
self.mmap.close()
}

/// Creates a new segment.
///
/// The initial capacity must be at least 8 bytes.
Expand Down