Skip to content

Commit 7b216b5

Browse files
committed
libsql: WAL sync checkpoint support
This patch adds support for checkpointing during WAL sync, which allows us to sync multiple checkpoint generations from the server. The protocol is as follows: 1. A client uses the pull endpoint to fetch frames. 2. When we reach the end of a generation, the server returns "I am a teapot" (yes, really) with a JSON containing the maximum generation number on the server. 3. If we need to pull more generations, we first checkpoint the WAL on the client, and then continue pulling frames from the newer generation.
1 parent d01092a commit 7b216b5

File tree

3 files changed

+143
-44
lines changed

3 files changed

+143
-44
lines changed

libsql/src/local/connection.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use super::{Database, Error, Result, Rows, RowsFuture, Statement, Transaction};
99
use crate::TransactionBehavior;
1010

1111
use libsql_sys::ffi;
12+
use std::cell::RefCell;
1213
use std::{ffi::c_int, fmt, path::Path, sync::Arc};
1314

1415
/// A connection to a libSQL database.
@@ -451,6 +452,20 @@ impl Connection {
451452
}
452453
}
453454

455+
pub(crate) fn wal_checkpoint(&self, truncate: bool) -> Result<()> {
456+
let rc = unsafe { libsql_sys::ffi::sqlite3_wal_checkpoint_v2(self.handle(), std::ptr::null(), truncate as i32, std::ptr::null_mut(), std::ptr::null_mut()) };
457+
if rc != 0 {
458+
let err_msg = unsafe { libsql_sys::ffi::sqlite3_errmsg(self.handle()) };
459+
let err_msg = unsafe { std::ffi::CStr::from_ptr(err_msg) };
460+
let err_msg = err_msg.to_string_lossy().to_string();
461+
return Err(crate::errors::Error::SqliteFailure(
462+
rc as std::ffi::c_int,
463+
format!("Failed to checkpoint WAL: {}", err_msg),
464+
));
465+
}
466+
Ok(())
467+
}
468+
454469
pub(crate) fn wal_frame_count(&self) -> u32 {
455470
let mut max_frame_no: std::os::raw::c_uint = 0;
456471
unsafe { libsql_sys::ffi::libsql_wal_frame_count(self.handle(), &mut max_frame_no) };
@@ -537,18 +552,34 @@ impl Connection {
537552

538553
pub(crate) fn wal_insert_handle(&self) -> Result<WalInsertHandle<'_>> {
539554
self.wal_insert_begin()?;
540-
Ok(WalInsertHandle { conn: self })
555+
Ok(WalInsertHandle { conn: self, in_session: RefCell::new(true) })
541556
}
542557
}
543558

544559
pub(crate) struct WalInsertHandle<'a> {
545560
conn: &'a Connection,
561+
in_session: RefCell<bool>
546562
}
547563

548564
impl WalInsertHandle<'_> {
549565
pub fn insert(&self, frame: &[u8]) -> Result<()> {
566+
assert!(*self.in_session.borrow());
550567
self.conn.wal_insert_frame(frame)
551568
}
569+
570+
pub fn begin(&self) -> Result<()> {
571+
assert!(!*self.in_session.borrow());
572+
self.conn.wal_insert_begin()?;
573+
self.in_session.replace(true);
574+
Ok(())
575+
}
576+
577+
pub fn end(&self) -> Result<()> {
578+
assert!(*self.in_session.borrow());
579+
self.conn.wal_insert_end()?;
580+
self.in_session.replace(false);
581+
Ok(())
582+
}
552583
}
553584

554585
impl Drop for WalInsertHandle<'_> {

libsql/src/sync.rs

Lines changed: 107 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,23 @@ impl SyncError {
6161
}
6262
}
6363

64+
pub enum PullResult {
65+
/// A frame was successfully pulled.
66+
Frame(Bytes),
67+
/// We've reached the end of the generation.
68+
EndOfGeneration { max_generation: u32 },
69+
}
70+
6471
pub struct SyncContext {
6572
db_path: String,
6673
client: hyper::Client<ConnectorService, Body>,
6774
sync_url: String,
6875
auth_token: Option<HeaderValue>,
6976
max_retries: usize,
77+
/// The current durable generation.
78+
durable_generation: u32,
7079
/// Represents the max_frame_no from the server.
7180
durable_frame_num: u32,
72-
/// Represents the current checkpoint generation.
73-
generation: u32,
7481
}
7582

7683
impl SyncContext {
@@ -96,8 +103,8 @@ impl SyncContext {
96103
auth_token,
97104
max_retries: DEFAULT_MAX_RETRIES,
98105
client,
106+
durable_generation: 1,
99107
durable_frame_num: 0,
100-
generation: 1,
101108
};
102109

103110
if let Err(e) = me.read_metadata().await {
@@ -115,7 +122,7 @@ impl SyncContext {
115122
&mut self,
116123
generation: u32,
117124
frame_no: u32,
118-
) -> Result<Option<Bytes>> {
125+
) -> Result<PullResult> {
119126
let uri = format!(
120127
"{}/sync/{}/{}/{}",
121128
self.sync_url,
@@ -124,13 +131,7 @@ impl SyncContext {
124131
frame_no + 1
125132
);
126133
tracing::debug!("pulling frame");
127-
match self.pull_with_retry(uri, self.max_retries).await? {
128-
Some(frame) => {
129-
self.durable_frame_num = frame_no;
130-
Ok(Some(frame))
131-
}
132-
None => Ok(None),
133-
}
134+
self.pull_with_retry(uri, self.max_retries).await
134135
}
135136

136137
#[tracing::instrument(skip(self, frame))]
@@ -149,7 +150,7 @@ impl SyncContext {
149150
);
150151
tracing::debug!("pushing frame");
151152

152-
let durable_frame_num = self.push_with_retry(uri, frame, self.max_retries).await?;
153+
let (generation, durable_frame_num) = self.push_with_retry(uri, frame, self.max_retries).await?;
153154

154155
if durable_frame_num > frame_no {
155156
tracing::error!(
@@ -178,12 +179,14 @@ impl SyncContext {
178179
tracing::debug!(?durable_frame_num, "frame successfully pushed");
179180

180181
// Update our last known max_frame_no from the server.
182+
tracing::debug!(?generation, ?durable_frame_num, "updating remote generation and durable_frame_num");
183+
self.durable_generation = generation;
181184
self.durable_frame_num = durable_frame_num;
182185

183186
Ok(durable_frame_num)
184187
}
185188

186-
async fn push_with_retry(&self, uri: String, frame: Bytes, max_retries: usize) -> Result<u32> {
189+
async fn push_with_retry(&self, uri: String, frame: Bytes, max_retries: usize) -> Result<(u32, u32)> {
187190
let mut nr_retries = 0;
188191
loop {
189192
let mut req = http::Request::post(uri.clone());
@@ -213,6 +216,14 @@ impl SyncContext {
213216
let resp = serde_json::from_slice::<serde_json::Value>(&res_body[..])
214217
.map_err(SyncError::JsonDecode)?;
215218

219+
let generation = resp
220+
.get("generation")
221+
.ok_or_else(|| SyncError::JsonValue(resp.clone()))?;
222+
223+
let generation = generation
224+
.as_u64()
225+
.ok_or_else(|| SyncError::JsonValue(generation.clone()))?;
226+
216227
let max_frame_no = resp
217228
.get("max_frame_no")
218229
.ok_or_else(|| SyncError::JsonValue(resp.clone()))?;
@@ -221,7 +232,7 @@ impl SyncContext {
221232
.as_u64()
222233
.ok_or_else(|| SyncError::JsonValue(max_frame_no.clone()))?;
223234

224-
return Ok(max_frame_no as u32);
235+
return Ok((generation as u32, max_frame_no as u32));
225236
}
226237

227238
// If we've retried too many times or the error is not a server error,
@@ -244,7 +255,7 @@ impl SyncContext {
244255
}
245256
}
246257

247-
async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result<Option<Bytes>> {
258+
async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result<PullResult> {
248259
let mut nr_retries = 0;
249260
loop {
250261
let mut req = http::Request::builder().method("GET").uri(uri.clone());
@@ -268,10 +279,27 @@ impl SyncContext {
268279
let frame = hyper::body::to_bytes(res.into_body())
269280
.await
270281
.map_err(SyncError::HttpBody)?;
271-
return Ok(Some(frame));
282+
return Ok(PullResult::Frame(frame));
283+
}
284+
if res.status() == StatusCode::BAD_REQUEST {
285+
let res_body = hyper::body::to_bytes(res.into_body())
286+
.await
287+
.map_err(SyncError::HttpBody)?;
288+
289+
let resp = serde_json::from_slice::<serde_json::Value>(&res_body[..])
290+
.map_err(SyncError::JsonDecode)?;
291+
292+
let generation = resp
293+
.get("generation")
294+
.ok_or_else(|| SyncError::JsonValue(resp.clone()))?;
295+
296+
let generation = generation
297+
.as_u64()
298+
.ok_or_else(|| SyncError::JsonValue(generation.clone()))?;
299+
return Ok(PullResult::EndOfGeneration { max_generation: generation as u32 });
272300
}
273301
if res.status() == StatusCode::BAD_REQUEST {
274-
return Ok(None);
302+
return Err(SyncError::PullFrame(res.status(), "Bad Request".to_string()).into());
275303
}
276304
// If we've retried too many times or the error is not a server error,
277305
// return the error.
@@ -293,12 +321,18 @@ impl SyncContext {
293321
}
294322
}
295323

324+
325+
pub(crate) fn next_generation(&mut self) {
326+
self.durable_generation += 1;
327+
self.durable_frame_num = 0;
328+
}
329+
296330
pub(crate) fn durable_frame_num(&self) -> u32 {
297331
self.durable_frame_num
298332
}
299333

300-
pub(crate) fn generation(&self) -> u32 {
301-
self.generation
334+
pub(crate) fn durable_generation(&self) -> u32 {
335+
self.durable_generation
302336
}
303337

304338
pub(crate) async fn write_metadata(&mut self) -> Result<()> {
@@ -308,7 +342,7 @@ impl SyncContext {
308342
hash: 0,
309343
version: METADATA_VERSION,
310344
durable_frame_num: self.durable_frame_num,
311-
generation: self.generation,
345+
generation: self.durable_generation,
312346
};
313347

314348
metadata.set_hash();
@@ -350,8 +384,8 @@ impl SyncContext {
350384
metadata
351385
);
352386

387+
self.durable_generation = metadata.generation;
353388
self.durable_frame_num = metadata.durable_frame_num;
354-
self.generation = metadata.generation;
355389

356390
Ok(())
357391
}
@@ -436,10 +470,7 @@ pub async fn sync_offline(
436470
sync_ctx: &mut SyncContext,
437471
conn: &Connection,
438472
) -> Result<crate::database::Replicated> {
439-
let durable_frame_no = sync_ctx.durable_frame_num();
440-
let max_frame_no = conn.wal_frame_count();
441-
442-
if max_frame_no > durable_frame_no {
473+
if is_ahead_of_remote(&sync_ctx, &conn) {
443474
match try_push(sync_ctx, conn).await {
444475
Ok(rep) => Ok(rep),
445476
Err(Error::Sync(err)) => {
@@ -475,6 +506,11 @@ pub async fn sync_offline(
475506
})
476507
}
477508

509+
fn is_ahead_of_remote(sync_ctx: &SyncContext, conn: &Connection) -> bool {
510+
let max_local_frame = conn.wal_frame_count();
511+
max_local_frame > sync_ctx.durable_frame_num()
512+
}
513+
478514
async fn try_push(
479515
sync_ctx: &mut SyncContext,
480516
conn: &Connection,
@@ -496,7 +532,7 @@ async fn try_push(
496532
});
497533
}
498534

499-
let generation = sync_ctx.generation(); // TODO: Probe from WAL.
535+
let generation = sync_ctx.durable_generation();
500536
let start_frame_no = sync_ctx.durable_frame_num() + 1;
501537
let end_frame_no = max_frame_no;
502538

@@ -532,29 +568,60 @@ async fn try_pull(
532568
sync_ctx: &mut SyncContext,
533569
conn: &Connection,
534570
) -> Result<crate::database::Replicated> {
535-
let generation = sync_ctx.generation();
536-
let mut frame_no = sync_ctx.durable_frame_num() + 1;
537-
538571
let insert_handle = conn.wal_insert_handle()?;
539572

573+
let mut err = None;
574+
540575
loop {
576+
let generation = sync_ctx.durable_generation();
577+
let frame_no = sync_ctx.durable_frame_num() + 1;
541578
match sync_ctx.pull_one_frame(generation, frame_no).await {
542-
Ok(Some(frame)) => {
579+
Ok(PullResult::Frame(frame)) => {
543580
insert_handle.insert(&frame)?;
544-
frame_no += 1;
581+
sync_ctx.durable_frame_num = frame_no;
545582
}
546-
Ok(None) => {
583+
Ok(PullResult::EndOfGeneration { max_generation }) => {
584+
// If there are no more generations to pull, we're done.
585+
if generation >= max_generation {
586+
break;
587+
}
588+
insert_handle.end()?;
547589
sync_ctx.write_metadata().await?;
548-
return Ok(crate::database::Replicated {
549-
frame_no: None,
550-
frames_synced: 1,
551-
});
552-
}
553-
Err(err) => {
554-
tracing::debug!("pull_one_frame error: {:?}", err);
590+
591+
// TODO: Make this crash-proof.
592+
conn.wal_checkpoint(true)?;
593+
594+
sync_ctx.next_generation();
555595
sync_ctx.write_metadata().await?;
556-
return Err(err);
596+
597+
insert_handle.begin()?;
598+
}
599+
Err(e) => {
600+
tracing::debug!("pull_one_frame error: {:?}", e);
601+
err.replace(e);
602+
break;
557603
}
558604
}
559605
}
606+
// This is crash-proof because we:
607+
//
608+
// 1. Write WAL frame first
609+
// 2. Write new max frame to temporary metadata
610+
// 3. Atomically rename the temporary metadata to the real metadata
611+
//
612+
// If we crash before metadata rename completes, the old metadata still
613+
// points to last successful frame, allowing safe retry from that point.
614+
// If we happen to have the frame already in the WAL, it's fine to re-pull
615+
// because append locally is idempotent.
616+
insert_handle.end()?;
617+
sync_ctx.write_metadata().await?;
618+
619+
if let Some(err) = err {
620+
Err(err)
621+
} else {
622+
Ok(crate::database::Replicated {
623+
frame_no: None,
624+
frames_synced: 1,
625+
})
626+
}
560627
}

libsql/src/sync/test.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ async fn test_sync_context_push_frame() {
3434

3535
// Verify internal state was updated
3636
assert_eq!(sync_ctx.durable_frame_num(), 0);
37-
assert_eq!(sync_ctx.generation(), 1);
37+
assert_eq!(sync_ctx.durable_generation(), 1);
3838
assert_eq!(server.frame_count(), 1);
3939
}
4040

@@ -129,7 +129,7 @@ async fn test_sync_context_corrupted_metadata() {
129129

130130
// Verify that the context was reset to default values
131131
assert_eq!(sync_ctx.durable_frame_num(), 0);
132-
assert_eq!(sync_ctx.generation(), 1);
132+
assert_eq!(sync_ctx.durable_generation(), 1);
133133
}
134134

135135
#[tokio::test]
@@ -174,7 +174,7 @@ async fn test_sync_restarts_with_lower_max_frame_no() {
174174

175175
// Verify that the context was set to new fake values.
176176
assert_eq!(sync_ctx.durable_frame_num(), 3);
177-
assert_eq!(sync_ctx.generation(), 1);
177+
assert_eq!(sync_ctx.durable_generation(), 1);
178178

179179
let frame_no = sync_ctx.durable_frame_num() + 1;
180180
// This push should fail because we are ahead of the server and thus should get an invalid
@@ -376,6 +376,7 @@ impl MockServer {
376376
if req.uri().path().contains("/sync/") {
377377
// Return the max_frame_no that has been accepted
378378
let response = serde_json::json!({
379+
"generation": 1,
379380
"max_frame_no": current_count
380381
});
381382

0 commit comments

Comments
 (0)