Skip to content

Commit 06e177f

Browse files
committed
streaming compaction
1 parent 912aaf3 commit 06e177f

File tree

13 files changed

+860
-701
lines changed

13 files changed

+860
-701
lines changed

libsql-wal/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ rand = "0.8.5"
4848
aws-smithy-types-convert = { version = "0.60.8", features = ["convert-chrono"] }
4949
petgraph = "0.6.5"
5050
anyhow = { version = "1.0.86", optional = true }
51+
futures = "0.3.30"
52+
memmap = "0.7.0"
53+
pin-project-lite = "0.2.14"
5154

5255
[dev-dependencies]
5356
criterion = "0.5.1"

libsql-wal/src/replication/storage.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use tokio_stream::Stream;
88
use zerocopy::FromZeroes;
99

1010
use crate::io::buf::ZeroCopyBoxIoBuf;
11+
use crate::segment::compacted::CompactedFrame;
1112
use crate::segment::Frame;
1213
use crate::storage::backend::FindSegmentReq;
1314
use crate::storage::Storage;
@@ -65,10 +66,19 @@ where
6566
},
6667
};
6768

68-
let (frame, ret) = segment.read_frame(ZeroCopyBoxIoBuf::new_uninit(Frame::new_box_zeroed()), offset as u32).await;
69+
// TODO: The copy here is inneficient. This is OK for now, until we rewrite
70+
// this code to read from the main db file instead of storage.
71+
let (compacted_frame, ret) = segment.read_frame(ZeroCopyBoxIoBuf::new_uninit(CompactedFrame::new_box_zeroed()), offset as u32).await;
6972
ret?;
70-
let frame = frame.into_inner();
71-
debug_assert_eq!(frame.header().size_after(), 0, "all frames in a compacted segment should have size_after set to 0");
73+
let compacted_frame = compacted_frame.into_inner();
74+
let mut frame = Frame::new_box_zeroed();
75+
frame.data_mut().copy_from_slice(&compacted_frame.data);
76+
77+
let header = frame.header_mut();
78+
header.frame_no = compacted_frame.header().frame_no;
79+
header.size_after = 0.into();
80+
header.page_no = compacted_frame.header().page_no;
81+
7282
if frame.header().frame_no() >= until {
7383
yield frame;
7484
}

libsql-wal/src/segment/compacted.rs

Lines changed: 118 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,131 @@
11
use std::io;
2-
use std::mem::size_of;
2+
use std::mem::{offset_of, size_of};
33

4+
use chrono::{DateTime, Utc};
5+
use uuid::Uuid;
46
use zerocopy::little_endian::{U128 as lu128, U16 as lu16, U32 as lu32, U64 as lu64};
57
use zerocopy::{AsBytes, FromBytes, FromZeroes};
68

79
use crate::io::buf::{IoBufMut, ZeroCopyBuf};
810
use crate::io::FileExt;
9-
use crate::segment::FrameHeader;
1011
use crate::{LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION};
1112

12-
use super::{Frame, Result};
13+
use super::Result;
1314

1415
#[derive(Debug, AsBytes, FromZeroes, FromBytes)]
1516
#[repr(C)]
16-
pub struct CompactedSegmentDataHeader {
17+
pub struct CompactedSegmentHeader {
1718
pub(crate) magic: lu64,
1819
pub(crate) version: lu16,
19-
pub(crate) frame_count: lu32,
20-
pub(crate) segment_id: lu128,
20+
pub(crate) log_id: lu128,
2121
pub(crate) start_frame_no: lu64,
2222
pub(crate) end_frame_no: lu64,
2323
pub(crate) size_after: lu32,
2424
/// for now, always 4096
2525
pub(crate) page_size: lu16,
2626
pub(crate) timestamp: lu64,
2727
}
28-
impl CompactedSegmentDataHeader {
28+
29+
bitflags::bitflags! {
30+
pub struct CompactedFrameFlags: u32 {
31+
/// This flag is set for the last frame in the segment
32+
const LAST = 1 << 0;
33+
}
34+
}
35+
36+
#[derive(Debug, AsBytes, FromZeroes, FromBytes)]
37+
#[repr(C)]
38+
pub struct CompactedFrameHeader {
39+
pub flags: lu32,
40+
pub page_no: lu32,
41+
pub frame_no: lu64,
42+
/// running checksum from this frame
43+
/// this is the crc32 of the checksum of the previous frame and all the frame data, including
44+
/// all the fields before checksum in the header. THIS FIELD MUST ALWAYS BE THE last FIELD IN
45+
/// THE STRUCT
46+
pub checksum: lu32,
47+
}
48+
49+
impl CompactedFrameHeader {
50+
pub fn flags(&self) -> CompactedFrameFlags {
51+
CompactedFrameFlags::from_bits(self.flags.get()).unwrap()
52+
}
53+
54+
pub(crate) fn is_last(&self) -> bool {
55+
self.flags().contains(CompactedFrameFlags::LAST)
56+
}
57+
58+
pub(crate) fn set_last(&mut self) {
59+
let mut flags = self.flags();
60+
flags.insert(CompactedFrameFlags::LAST);
61+
self.flags = flags.bits().into();
62+
}
63+
64+
pub(crate) fn reset_flags(&mut self) {
65+
self.flags = 0.into();
66+
}
67+
68+
pub(crate) fn compute_checksum(&self, previous: u32, data: &[u8]) -> u32 {
69+
assert_eq!(data.len(), LIBSQL_PAGE_SIZE as usize);
70+
let mut h = crc32fast::Hasher::new_with_initial(previous);
71+
h.update(&self.as_bytes()[..offset_of!(Self, checksum)]);
72+
h.update(data);
73+
h.finalize()
74+
}
75+
76+
/// updates the checksum with the previous frame checksum and the frame data
77+
pub(crate) fn update_checksum(&mut self, previous: u32, data: &[u8]) -> u32 {
78+
let checksum = self.compute_checksum(previous, data);
79+
self.checksum = checksum.into();
80+
checksum
81+
}
82+
83+
pub fn checksum(&self) -> u32 {
84+
self.checksum.get()
85+
}
86+
87+
pub fn page_no(&self) -> u32 {
88+
self.page_no.get()
89+
}
90+
}
91+
92+
#[derive(Debug, AsBytes, FromZeroes, FromBytes)]
93+
#[repr(C)]
94+
pub struct CompactedFrame {
95+
pub header: CompactedFrameHeader,
96+
pub data: [u8; LIBSQL_PAGE_SIZE as usize],
97+
}
98+
99+
impl CompactedFrame {
100+
pub fn header(&self) -> &CompactedFrameHeader {
101+
&self.header
102+
}
103+
104+
pub(crate) fn header_mut(&mut self) -> &mut CompactedFrameHeader {
105+
&mut self.header
106+
}
107+
}
108+
109+
impl CompactedSegmentHeader {
110+
pub fn new(
111+
start_frame_no: u64,
112+
end_frame_no: u64,
113+
size_after: u32,
114+
timestamp: DateTime<Utc>,
115+
log_id: Uuid,
116+
) -> Self {
117+
Self {
118+
magic: LIBSQL_MAGIC.into(),
119+
version: LIBSQL_WAL_VERSION.into(),
120+
start_frame_no: start_frame_no.into(),
121+
end_frame_no: end_frame_no.into(),
122+
size_after: size_after.into(),
123+
page_size: LIBSQL_PAGE_SIZE.into(),
124+
timestamp: (timestamp.timestamp_millis() as u64).into(),
125+
log_id: log_id.as_u128().into(),
126+
}
127+
}
128+
29129
fn check(&self) -> Result<()> {
30130
if self.magic.get() != LIBSQL_MAGIC {
31131
return Err(super::Error::InvalidHeaderMagic);
@@ -47,14 +147,8 @@ impl CompactedSegmentDataHeader {
47147
}
48148
}
49149

50-
#[derive(Debug, AsBytes, FromZeroes, FromBytes)]
51-
#[repr(C)]
52-
pub struct CompactedSegmentDataFooter {
53-
pub(crate) checksum: lu32,
54-
}
55-
56150
pub struct CompactedSegment<F> {
57-
header: CompactedSegmentDataHeader,
151+
header: CompactedSegmentHeader,
58152
file: F,
59153
}
60154

@@ -69,7 +163,7 @@ impl<F> CompactedSegment<F> {
69163
}
70164
}
71165

72-
pub fn header(&self) -> &CompactedSegmentDataHeader {
166+
pub fn header(&self) -> &CompactedSegmentHeader {
73167
&self.header
74168
}
75169
}
@@ -79,23 +173,25 @@ impl<F: FileExt> CompactedSegment<F> {
79173
let buf = ZeroCopyBuf::new_uninit();
80174
let (buf, ret) = file.read_exact_at_async(buf, 0).await;
81175
ret?;
82-
let header: CompactedSegmentDataHeader = buf.into_inner();
176+
let header: CompactedSegmentHeader = buf.into_inner();
83177
header.check()?;
84178
Ok(Self { file, header })
85179
}
86180

87-
pub(crate) fn from_parts(file: F, header: CompactedSegmentDataHeader) -> Self {
181+
pub(crate) fn from_parts(file: F, header: CompactedSegmentHeader) -> Self {
88182
Self { header, file }
89183
}
90184

185+
/// read a CompactedFrame from the segment
91186
pub(crate) async fn read_frame<B: IoBufMut + Send + 'static>(
92187
&self,
93188
buf: B,
94189
offset: u32,
95190
) -> (B, io::Result<()>) {
96191
assert_eq!(buf.bytes_init(), 0);
97-
assert_eq!(buf.bytes_total(), size_of::<Frame>());
98-
let offset = size_of::<CompactedSegmentDataHeader>() + size_of::<Frame>() * offset as usize;
192+
assert_eq!(buf.bytes_total(), size_of::<CompactedFrame>());
193+
let offset =
194+
size_of::<CompactedSegmentHeader>() + size_of::<CompactedFrame>() * offset as usize;
99195
let (buf, ret) = self.file.read_exact_at_async(buf, offset as u64).await;
100196
(buf, ret)
101197
}
@@ -107,9 +203,9 @@ impl<F: FileExt> CompactedSegment<F> {
107203
) -> (B, io::Result<()>) {
108204
assert_eq!(buf.bytes_init(), 0);
109205
assert_eq!(buf.bytes_total(), LIBSQL_PAGE_SIZE as usize);
110-
let offset = size_of::<CompactedSegmentDataHeader>()
111-
+ size_of::<Frame>() * offset as usize
112-
+ size_of::<FrameHeader>();
206+
let offset = size_of::<CompactedSegmentHeader>()
207+
+ size_of::<CompactedFrame>() * offset as usize
208+
+ size_of::<CompactedFrameHeader>();
113209
let (buf, ret) = self.file.read_exact_at_async(buf, offset as u64).await;
114210
(buf, ret)
115211
}

libsql-wal/src/segment/mod.rs

Lines changed: 1 addition & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,7 @@ impl SegmentHeader {
155155
}
156156

157157
pub trait Segment: Send + Sync + 'static {
158-
fn compact(
159-
&self,
160-
out_file: &impl FileExt,
161-
id: uuid::Uuid,
162-
) -> impl Future<Output = Result<Vec<u8>>> + Send;
158+
fn compact(&self, out_file: &impl FileExt) -> impl Future<Output = Result<Vec<u8>>> + Send;
163159
fn start_frame_no(&self) -> u64;
164160
fn last_committed(&self) -> u64;
165161
fn index(&self) -> &fst::Map<Arc<[u8]>>;
@@ -177,55 +173,6 @@ pub trait Segment: Send + Sync + 'static {
177173
fn destroy<IO: Io>(&self, io: &IO) -> impl Future<Output = ()>;
178174
}
179175

180-
impl<T: Segment> Segment for Arc<T> {
181-
fn compact(
182-
&self,
183-
out_file: &impl FileExt,
184-
id: uuid::Uuid,
185-
) -> impl Future<Output = Result<Vec<u8>>> + Send {
186-
self.as_ref().compact(out_file, id)
187-
}
188-
189-
fn start_frame_no(&self) -> u64 {
190-
self.as_ref().start_frame_no()
191-
}
192-
193-
fn last_committed(&self) -> u64 {
194-
self.as_ref().last_committed()
195-
}
196-
197-
fn index(&self) -> &fst::Map<Arc<[u8]>> {
198-
self.as_ref().index()
199-
}
200-
201-
fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result<bool> {
202-
self.as_ref().read_page(page_no, max_frame_no, buf)
203-
}
204-
205-
fn is_checkpointable(&self) -> bool {
206-
self.as_ref().is_checkpointable()
207-
}
208-
209-
fn size_after(&self) -> u32 {
210-
self.as_ref().size_after()
211-
}
212-
213-
async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
214-
where
215-
B: IoBufMut + Send + 'static,
216-
{
217-
self.as_ref().read_frame_offset_async(offset, buf).await
218-
}
219-
220-
fn destroy<IO: Io>(&self, io: &IO) -> impl Future<Output = ()> {
221-
self.as_ref().destroy(io)
222-
}
223-
224-
fn timestamp(&self) -> DateTime<Utc> {
225-
self.as_ref().timestamp()
226-
}
227-
}
228-
229176
#[repr(C)]
230177
#[derive(Debug, zerocopy::AsBytes, zerocopy::FromBytes, zerocopy::FromZeroes)]
231178
pub struct FrameHeader {

0 commit comments

Comments
 (0)