Skip to content

Commit 47ed09a

Browse files
committed
compute current segment running checksum
1 parent d6b9e49 commit 47ed09a

File tree

7 files changed

+177
-33
lines changed

7 files changed

+177
-33
lines changed

libsql-wal/src/registry.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use libsql_sys::ffi::Sqlite3DbHeader;
88
use parking_lot::{Condvar, Mutex};
99
use tokio::sync::{mpsc, Notify, Semaphore};
1010
use tokio::task::JoinSet;
11+
use rand::Rng;
1112
use zerocopy::{AsBytes, FromZeroes};
1213

1314
use crate::checkpointer::CheckpointMessage;
@@ -32,7 +33,7 @@ enum Slot<IO: Io> {
3233

3334
/// Wal Registry maintains a set of shared Wal, and their respective set of files.
3435
pub struct WalRegistry<IO: Io, S> {
35-
fs: IO,
36+
io: IO,
3637
path: PathBuf,
3738
shutdown: AtomicBool,
3839
opened: DashMap<NamespaceName, Slot<IO>>,
@@ -59,7 +60,7 @@ impl<IO: Io, S> WalRegistry<IO, S> {
5960
) -> Result<Self> {
6061
io.create_dir_all(&path)?;
6162
let registry = Self {
62-
fs: io,
63+
io,
6364
path,
6465
opened: Default::default(),
6566
shutdown: Default::default(),
@@ -105,14 +106,15 @@ where
105106
.join(shared.namespace().as_str())
106107
.join(format!("{}:{start_frame_no:020}.seg", shared.namespace()));
107108

108-
let segment_file = self.fs.open(true, true, true, &path)?;
109-
109+
let segment_file = self.io.open(true, true, true, &path)?;
110+
let salt = self.io.with_rng(|rng| rng.gen());
110111
let new = CurrentSegment::create(
111112
segment_file,
112113
path,
113114
start_frame_no,
114115
current.db_size(),
115116
current.tail().clone(),
117+
salt,
116118
)?;
117119
// sealing must the last fallible operation, because we don't want to end up in a situation
118120
// where the current log is sealed and it wasn't swapped.
@@ -226,7 +228,7 @@ where
226228
db_path: &Path,
227229
) -> Result<Arc<SharedWal<IO>>> {
228230
let path = self.path.join(namespace.as_str());
229-
self.fs.create_dir_all(&path)?;
231+
self.io.create_dir_all(&path)?;
230232
// TODO: handle that with abstract io
231233
let dir = walkdir::WalkDir::new(&path).sort_by_file_name().into_iter();
232234

@@ -246,7 +248,7 @@ where
246248
continue;
247249
}
248250

249-
let file = self.fs.open(false, true, true, entry.path())?;
251+
let file = self.io.open(false, true, true, entry.path())?;
250252

251253
if let Some(sealed) =
252254
SealedSegment::open(file.into(), entry.path().to_path_buf(), Default::default())?
@@ -265,7 +267,7 @@ where
265267
}
266268
}
267269

268-
let db_file = self.fs.open(false, true, true, db_path)?;
270+
let db_file = self.io.open(false, true, true, db_path)?;
269271

270272
let mut header: Sqlite3DbHeader = Sqlite3DbHeader::new_zeroed();
271273
db_file.read_exact_at(header.as_bytes_mut(), 0)?;
@@ -283,14 +285,16 @@ where
283285

284286
let current_path = path.join(format!("{namespace}:{next_frame_no:020}.seg"));
285287

286-
let segment_file = self.fs.open(true, true, true, &current_path)?;
288+
let segment_file = self.io.open(true, true, true, &current_path)?;
289+
let salt = self.io.with_rng(|rng| rng.gen());
287290

288291
let current = arc_swap::ArcSwap::new(Arc::new(CurrentSegment::create(
289292
segment_file,
290293
current_path,
291294
next_frame_no,
292295
db_size,
293296
tail.into(),
297+
salt,
294298
)?));
295299

296300
let (new_frame_notifier, _) = tokio::sync::watch::channel(next_frame_no.get() - 1);

libsql-wal/src/replication/injector.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl<'a, IO: Io> Injector<'a, IO> {
5454
self.max_tx_frame_no = 0;
5555
}
5656
let buffer = current
57-
.insert_frames(buffer, commit_data, &mut self.tx)
57+
.insert_frames_inject(buffer, commit_data, &mut self.tx)
5858
.await?;
5959
self.buffer = buffer;
6060

libsql-wal/src/segment/current.rs

Lines changed: 82 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use std::hash::Hasher;
12
use std::io::{BufWriter, IoSlice, Write};
23
use std::num::NonZeroU64;
34
use std::ops::DerefMut;
45
use std::path::PathBuf;
6+
use std::sync::atomic::AtomicU32;
57
use std::sync::{
68
atomic::{AtomicBool, AtomicU64, Ordering},
79
Arc,
@@ -16,9 +18,11 @@ use zerocopy::{AsBytes, FromZeroes};
1618

1719
use crate::io::buf::{IoBufMut, ZeroCopyBoxIoBuf, ZeroCopyBuf};
1820
use crate::io::file::FileExt;
19-
use crate::segment::SegmentFlags;
21+
use crate::io::Inspect;
22+
use crate::segment::{checked_frame_offset, SegmentFlags};
2023
use crate::segment::{frame_offset, page_offset, sealed::SealedSegment};
2124
use crate::transaction::{Transaction, TxGuard};
25+
use crate::LIBSQL_MAGIC;
2226

2327
use super::list::SegmentList;
2428
use super::{Frame, FrameHeader, SegmentHeader};
@@ -34,6 +38,8 @@ pub struct CurrentSegment<F> {
3438
/// lock
3539
read_locks: Arc<AtomicU64>,
3640
sealed: AtomicBool,
41+
/// current runnign checksum
42+
current_checksum: AtomicU32,
3743
tail: Arc<SegmentList<SealedSegment<F>>>,
3844
}
3945

@@ -46,6 +52,7 @@ impl<F> CurrentSegment<F> {
4652
start_frame_no: NonZeroU64,
4753
db_size: u32,
4854
tail: Arc<SegmentList<SealedSegment<F>>>,
55+
salt: u32,
4956
) -> Result<Self>
5057
where
5158
F: FileExt,
@@ -60,6 +67,7 @@ impl<F> CurrentSegment<F> {
6067
flags: 0.into(),
6168
magic: LIBSQL_MAGIC.into(),
6269
version: 1.into(),
70+
salt: salt.into(),
6371
};
6472

6573
header.recompute_checksum();
@@ -74,6 +82,7 @@ impl<F> CurrentSegment<F> {
7482
read_locks: Arc::new(AtomicU64::new(0)),
7583
sealed: AtomicBool::default(),
7684
tail,
85+
current_checksum: salt.into(),
7786
})
7887
}
7988

@@ -102,10 +111,14 @@ impl<F> CurrentSegment<F> {
102111
self.header.lock().size_after.get()
103112
}
104113

114+
pub fn current_checksum(&self) -> u32 {
115+
self.current_checksum.load(Ordering::Relaxed)
116+
}
117+
105118
/// insert a bunch of frames in the Wal. The frames needn't be ordered, therefore, on commit
106119
/// the last frame no needs to be passed alongside the new size_after.
107120
#[tracing::instrument(skip_all)]
108-
pub async fn insert_frames(
121+
pub async fn insert_frames_inject(
109122
&self,
110123
frames: Vec<Box<Frame>>,
111124
// (size_after, last_frame_no)
@@ -116,26 +129,48 @@ impl<F> CurrentSegment<F> {
116129
F: FileExt,
117130
{
118131
assert!(!self.sealed.load(Ordering::SeqCst));
132+
assert_eq!(
133+
tx.savepoints.len(),
134+
1,
135+
"injecting wal should not use savepoints"
136+
);
119137
{
120138
let tx = tx.deref_mut();
121139
// let mut commit_frame_written = false;
122140
let current_savepoint = tx.savepoints.last_mut().expect("no savepoints initialized");
123141
let mut frames = frame_list_to_option(frames);
142+
// For each frame, we compute and write the frame checksum, followed by the frame
143+
// itself as an array of CheckedFrame
124144
for i in 0..frames.len() {
125145
let offset = tx.next_offset;
146+
let current_checksum = current_savepoint.current_checksum;
147+
let mut digest = crc32fast::Hasher::new_with_initial(current_checksum);
148+
digest.write(frames[i].as_ref().unwrap().as_bytes());
149+
let new_checksum = digest.finalize();
150+
let (_buf, ret) = self
151+
.file
152+
.write_all_at_async(
153+
ZeroCopyBuf::new_init(zerocopy::byteorder::little_endian::U32::new(
154+
new_checksum,
155+
)),
156+
checked_frame_offset(offset),
157+
)
158+
.await;
159+
ret?;
160+
126161
let buf = ZeroCopyBoxIoBuf::new(frames[i].take().unwrap());
127162
let (buf, ret) = self
128163
.file
129164
.write_all_at_async(buf, frame_offset(offset))
130165
.await;
131-
132166
ret?;
133167

134168
let frame = buf.into_inner();
135169

136170
current_savepoint
137171
.index
138172
.insert(frame.header().page_no(), offset);
173+
current_savepoint.current_checksum = new_checksum;
139174
tx.next_offset += 1;
140175
frames[i] = Some(frame);
141176
}
@@ -161,6 +196,8 @@ impl<F> CurrentSegment<F> {
161196
tx.merge_savepoints(&self.index);
162197
// set the header last, so that a transaction does not witness a write before
163198
// it's actually committed.
199+
self.current_checksum
200+
.store(tx.current_checksum(), Ordering::Relaxed);
164201
*self.header.lock() = header.into_inner();
165202

166203
tx.is_commited = true;
@@ -194,6 +231,11 @@ impl<F> CurrentSegment<F> {
194231
if let Some(offset) = current_savepoint.index.get(&page_no) {
195232
tracing::trace!(page_no, "recycling frame");
196233
self.file.write_all_at(page, page_offset(*offset))?;
234+
// we overwrote a frame, record that for later rewrite
235+
tx.recompute_checksum = Some(tx
236+
.recompute_checksum
237+
.map(|old| old.min(*offset))
238+
.unwrap_or(*offset));
197239
continue;
198240
}
199241

@@ -210,24 +252,41 @@ impl<F> CurrentSegment<F> {
210252
size_after: size_after.into(),
211253
frame_no: frame_no.into(),
212254
};
213-
let slices = &[IoSlice::new(header.as_bytes()), IoSlice::new(&page)];
255+
256+
let mut digest =
257+
crc32fast::Hasher::new_with_initial(current_savepoint.current_checksum);
258+
digest.write(header.as_bytes());
259+
digest.write(page);
260+
261+
let checksum = digest.finalize();
262+
let checksum_bytes = checksum.to_le_bytes();
263+
// We write a instance of a ChecksummedFrame
264+
let slices = &[
265+
IoSlice::new(&checksum_bytes),
266+
IoSlice::new(header.as_bytes()),
267+
IoSlice::new(&page),
268+
];
214269
let offset = tx.next_offset;
215270
debug_assert_eq!(
216271
self.header.lock().start_frame_no.get() + offset as u64,
217272
frame_no
218273
);
219-
self.file.write_at_vectored(slices, frame_offset(offset))?;
274+
self.file.write_at_vectored(slices, checked_frame_offset(offset))?;
220275
assert!(
221276
current_savepoint.index.insert(page_no, offset).is_none(),
222277
"existing frames should be recycled"
223278
);
279+
current_savepoint.current_checksum = checksum;
224280
tx.next_frame_no += 1;
225281
tx.next_offset += 1;
226282
}
227283
}
228284

229285
if let Some(size_after) = size_after {
230286
if tx.not_empty() {
287+
if let Some(_offset) = tx.recompute_checksum {
288+
todo!("recompute checksum");
289+
}
231290
let last_frame_no = tx.next_frame_no - 1;
232291
let mut header = { *self.header.lock() };
233292
header.last_commited_frame_no = last_frame_no.into();
@@ -240,6 +299,8 @@ impl<F> CurrentSegment<F> {
240299
// set the header last, so that a transaction does not witness a write before
241300
// it's actually committed.
242301
*self.header.lock() = header;
302+
self.current_checksum
303+
.store(tx.current_checksum(), Ordering::Relaxed);
243304

244305
tx.is_commited = true;
245306

@@ -281,7 +342,7 @@ impl<F> CurrentSegment<F> {
281342
F: FileExt,
282343
B: IoBufMut + Send + 'static,
283344
{
284-
let byte_offset = frame_offset(offset);
345+
let byte_offset = dbg!(frame_offset(dbg!(offset)));
285346
self.file.read_exact_at_async(buf, byte_offset).await
286347
}
287348

@@ -304,10 +365,21 @@ impl<F> CurrentSegment<F> {
304365
{
305366
let mut header = self.header.lock();
306367
let index_offset = header.count_committed() as u32;
307-
let index_byte_offset = frame_offset(index_offset);
368+
let index_byte_offset = checked_frame_offset(index_offset);
308369
let mut cursor = self.file.cursor(index_byte_offset);
309-
let mut writer = BufWriter::new(&mut cursor);
370+
let writer = BufWriter::new(&mut cursor);
371+
372+
let current = self.current_checksum();
373+
let mut digest = crc32fast::Hasher::new_with_initial(current);
374+
let mut writer = Inspect::new(writer, |data: &[u8]| {
375+
digest.write(data);
376+
});
310377
self.index.merge_all(&mut writer)?;
378+
let mut writer = writer.into_inner();
379+
let index_checksum = digest.finalize();
380+
let index_size = writer.get_ref().count();
381+
writer.write_all(&index_checksum.to_le_bytes())?;
382+
311383
writer.into_inner().map_err(|e| e.into_parts().0)?;
312384
// we perform a first sync to ensure that all the segment has been flushed to disk. We then
313385
// write the header and flush again. We want to guarantee that if we find a segement marked
@@ -318,10 +390,10 @@ impl<F> CurrentSegment<F> {
318390
self.file.sync_all()?;
319391

320392
header.index_offset = index_byte_offset.into();
321-
header.index_size = cursor.count().into();
322-
header.recompute_checksum();
393+
header.index_size = index_size.into();
323394
let flags = header.flags();
324395
header.set_flags(flags | SegmentFlags::SEALED);
396+
header.recompute_checksum();
325397
self.file.write_all_at(header.as_bytes(), 0)?;
326398

327399
// flush the header.

0 commit comments

Comments
 (0)