Skip to content

Commit 2713648

Browse files
committed
recompute checksum on frame overwrite
1 parent dcebba4 commit 2713648

File tree

1 file changed

+29
-6
lines changed

1 file changed

+29
-6
lines changed

libsql-wal/src/segment/current.rs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use fst::MapBuilder;
1414
use parking_lot::{Mutex, RwLock};
1515
use roaring::RoaringBitmap;
1616
use tokio_stream::Stream;
17+
use zerocopy::little_endian::U32;
1718
use zerocopy::{AsBytes, FromZeroes};
1819

1920
use crate::io::buf::{IoBufMut, ZeroCopyBoxIoBuf, ZeroCopyBuf};
@@ -25,7 +26,7 @@ use crate::transaction::{Transaction, TxGuard};
2526
use crate::LIBSQL_MAGIC;
2627

2728
use super::list::SegmentList;
28-
use super::{Frame, FrameHeader, SegmentHeader};
29+
use super::{CheckedFrame, Frame, FrameHeader, SegmentHeader};
2930

3031
use crate::error::Result;
3132

@@ -284,8 +285,8 @@ impl<F> CurrentSegment<F> {
284285

285286
if let Some(size_after) = size_after {
286287
if tx.not_empty() {
287-
if let Some(_offset) = tx.recompute_checksum {
288-
todo!("recompute checksum");
288+
if let Some(offset) = tx.recompute_checksum {
289+
self.recompute_checksum(offset, tx.next_offset - 1)?;
289290
}
290291
let last_frame_no = tx.next_frame_no - 1;
291292
let mut header = { *self.header.lock() };
@@ -499,6 +500,31 @@ impl<F> CurrentSegment<F> {
499500

500501
(stream, replicated_until, db_size)
501502
}
503+
504+
fn recompute_checksum(&self, start_offset: u32, until_offset: u32) -> Result<()>
505+
where F: FileExt
506+
{
507+
let mut current_checksum = if start_offset == 0 {
508+
self.header.lock().salt.get()
509+
} else {
510+
// we get the checksum from the frame just before the the start offset
511+
let frame_offset = checked_frame_offset(start_offset - 1);
512+
let mut out = U32::new(0);
513+
self.file.read_exact_at(out.as_bytes_mut(), frame_offset)?;
514+
out.get()
515+
};
516+
517+
let mut checked_frame: Box<CheckedFrame> = CheckedFrame::new_box_zeroed();
518+
for offset in start_offset..=until_offset {
519+
let frame_offset = checked_frame_offset(offset);
520+
self.file.read_exact_at(checked_frame.as_bytes_mut(), frame_offset)?;
521+
current_checksum = checked_frame.frame.checksum(current_checksum);
522+
self.file.write_all_at(&current_checksum.to_le_bytes(), frame_offset)?;
523+
524+
}
525+
526+
Ok(())
527+
}
502528
}
503529

504530
fn frame_list_to_option(frames: Vec<Box<Frame>>) -> Vec<Option<Box<Frame>>> {
@@ -956,12 +982,9 @@ mod test {
956982
}
957983
}
958984

959-
dbg!();
960985
{
961986
let env = TestEnv::new_io_and_tmp(SyncFailBufferIo::default(), tmp.clone());
962-
dbg!();
963987
let conn = env.open_conn("test");
964-
dbg!();
965988
conn.query_row("select count(*) from test", (), |row| {
966989
dbg!(row);
967990
Ok(())

0 commit comments

Comments
 (0)