Skip to content
Merged
45 changes: 25 additions & 20 deletions src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,17 @@ impl Segment {
}
}

let segment = Segment {
let mut segment = Segment {
mmap,
path: path.as_ref().to_path_buf(),
index,
crc,
flush_offset: 0,
};

// Bump flush offset next to last entry, we don't need to flush any existing data
segment.flush_offset = segment.size();

debug!("{segment:?}: opened");
Ok(segment)
}
Expand Down Expand Up @@ -343,11 +347,8 @@ impl Segment {
);

if padding > 0 {
let zeros: [u8; 8] = [0; 8];
copy_memory(
&zeros[..padding],
&mut self.as_mut_slice()[offset + HEADER_LEN + entry.len()..],
);
let start = offset + HEADER_LEN + entry.len();
self.as_mut_slice()[start..start + padding].fill(0)
}
crc = crc32c::crc32c_append(
!crc.reverse_bits(),
Expand Down Expand Up @@ -385,6 +386,8 @@ impl Segment {
}
trace!("{self:?}: truncating from position {from}");

let zero_end = self.size();

// Remove the index entries.
let deleted = self.index.drain(from..).count();
trace!("{self:?}: truncated {deleted} entries");
Expand All @@ -397,10 +400,12 @@ impl Segment {
self.crc = self._read_entry_crc(self.index.len() - 1);
}

// And overwrite the existing data so that we will not read the data back after a crash.
let size = self.size();
let zeroes: [u8; 16] = [0; 16];
copy_memory(&zeroes, &mut self.as_mut_slice()[size..]);
// Zero all deleted entries so that we will not read the data back after a crash
let zero_start = self.size();
self.as_mut_slice()[zero_start..zero_end].fill(0);

// Bump flush offset to write new changes on next flush
self.flush_offset = zero_end;
}

/// Flushes recently written entries to durable storage.
Expand All @@ -411,26 +416,26 @@ impl Segment {

match start.cmp(&end) {
Ordering::Equal => {
// nothing to flush
trace!("{self:?}: nothing to flush");
Ok(())
} // nothing to flush
}
Ordering::Less => {
// flush new elements added since last flush
trace!("{self:?}: flushing byte range [{start}, {end})");
let mut view = unsafe { self.mmap.clone() };
self.flush_offset = end;
view.restrict(start, end - start)?;
view.flush()
view.flush()?;
}
Ordering::Greater => {
// most likely truncated in between flushes
// register new flush offset & flush the whole segment
trace!("{self:?}: flushing after truncation");
let view = unsafe { self.mmap.clone() };
self.flush_offset = end;
view.flush()
error!("{self:?}: invalid flush range, flushing everything");
self.mmap.flush()?;
}
}

self.flush_offset = end;
Ok(())
}

/// Flushes recently written entries to durable storage.
Expand Down Expand Up @@ -467,13 +472,13 @@ impl Segment {
self.flush_offset = end;

let log_msg = if log_enabled!(log::Level::Trace) {
format!("{:?}: async flushing after truncation", &self)
format!("{self:?}: invalid flush range, flushing everything")
} else {
String::new()
};

thread::spawn(move || {
trace!("{log_msg}");
error!("{log_msg}");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe warn? error should mean something unexpected happened, but as I understood, this path is possible

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of wait, is that expected that with new changes this path is an actual error?

Copy link
Member Author

@timvisee timvisee Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new changes it should be impossible to hit this branch. That's why I promoted it to an error.

Still want to demote it to a warning?

view.flush()
})
}
Expand Down