Skip to content

Commit 016bea4

Browse files
authored
Merge pull request #1928 from tursodatabase/sync-checkpoint
WAL sync checkpointing support
2 parents d01092a + 1a877ae commit 016bea4

File tree

3 files changed

+148
-47
lines changed

3 files changed

+148
-47
lines changed

libsql/src/local/connection.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use super::{Database, Error, Result, Rows, RowsFuture, Statement, Transaction};
99
use crate::TransactionBehavior;
1010

1111
use libsql_sys::ffi;
12+
use std::cell::RefCell;
1213
use std::{ffi::c_int, fmt, path::Path, sync::Arc};
1314

1415
/// A connection to a libSQL database.
@@ -451,6 +452,20 @@ impl Connection {
451452
}
452453
}
453454

455+
pub(crate) fn wal_checkpoint(&self, truncate: bool) -> Result<()> {
456+
let rc = unsafe { libsql_sys::ffi::sqlite3_wal_checkpoint_v2(self.handle(), std::ptr::null(), truncate as i32, std::ptr::null_mut(), std::ptr::null_mut()) };
457+
if rc != 0 {
458+
let err_msg = unsafe { libsql_sys::ffi::sqlite3_errmsg(self.handle()) };
459+
let err_msg = unsafe { std::ffi::CStr::from_ptr(err_msg) };
460+
let err_msg = err_msg.to_string_lossy().to_string();
461+
return Err(crate::errors::Error::SqliteFailure(
462+
rc as std::ffi::c_int,
463+
format!("Failed to checkpoint WAL: {}", err_msg),
464+
));
465+
}
466+
Ok(())
467+
}
468+
454469
pub(crate) fn wal_frame_count(&self) -> u32 {
455470
let mut max_frame_no: std::os::raw::c_uint = 0;
456471
unsafe { libsql_sys::ffi::libsql_wal_frame_count(self.handle(), &mut max_frame_no) };
@@ -537,25 +552,43 @@ impl Connection {
537552

538553
pub(crate) fn wal_insert_handle(&self) -> Result<WalInsertHandle<'_>> {
539554
self.wal_insert_begin()?;
540-
Ok(WalInsertHandle { conn: self })
555+
Ok(WalInsertHandle { conn: self, in_session: RefCell::new(true) })
541556
}
542557
}
543558

544559
pub(crate) struct WalInsertHandle<'a> {
545560
conn: &'a Connection,
561+
in_session: RefCell<bool>
546562
}
547563

548564
impl WalInsertHandle<'_> {
549565
pub fn insert(&self, frame: &[u8]) -> Result<()> {
566+
assert!(*self.in_session.borrow());
550567
self.conn.wal_insert_frame(frame)
551568
}
569+
570+
pub fn begin(&self) -> Result<()> {
571+
assert!(!*self.in_session.borrow());
572+
self.conn.wal_insert_begin()?;
573+
self.in_session.replace(true);
574+
Ok(())
575+
}
576+
577+
pub fn end(&self) -> Result<()> {
578+
assert!(*self.in_session.borrow());
579+
self.conn.wal_insert_end()?;
580+
self.in_session.replace(false);
581+
Ok(())
582+
}
552583
}
553584

554585
impl Drop for WalInsertHandle<'_> {
555586
fn drop(&mut self) {
556-
if let Err(err) = self.conn.wal_insert_end() {
557-
tracing::error!("{:?}", err);
558-
Err(err).unwrap()
587+
if *self.in_session.borrow() {
588+
if let Err(err) = self.conn.wal_insert_end() {
589+
tracing::error!("{:?}", err);
590+
Err(err).unwrap()
591+
}
559592
}
560593
}
561594
}

libsql/src/sync.rs

Lines changed: 107 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,23 @@ impl SyncError {
6161
}
6262
}
6363

64+
pub enum PullResult {
65+
/// A frame was successfully pulled.
66+
Frame(Bytes),
67+
/// We've reached the end of the generation.
68+
EndOfGeneration { max_generation: u32 },
69+
}
70+
6471
pub struct SyncContext {
6572
db_path: String,
6673
client: hyper::Client<ConnectorService, Body>,
6774
sync_url: String,
6875
auth_token: Option<HeaderValue>,
6976
max_retries: usize,
77+
/// The current durable generation.
78+
durable_generation: u32,
7079
/// Represents the max_frame_no from the server.
7180
durable_frame_num: u32,
72-
/// Represents the current checkpoint generation.
73-
generation: u32,
7481
}
7582

7683
impl SyncContext {
@@ -96,8 +103,8 @@ impl SyncContext {
96103
auth_token,
97104
max_retries: DEFAULT_MAX_RETRIES,
98105
client,
106+
durable_generation: 1,
99107
durable_frame_num: 0,
100-
generation: 1,
101108
};
102109

103110
if let Err(e) = me.read_metadata().await {
@@ -115,7 +122,7 @@ impl SyncContext {
115122
&mut self,
116123
generation: u32,
117124
frame_no: u32,
118-
) -> Result<Option<Bytes>> {
125+
) -> Result<PullResult> {
119126
let uri = format!(
120127
"{}/sync/{}/{}/{}",
121128
self.sync_url,
@@ -124,13 +131,7 @@ impl SyncContext {
124131
frame_no + 1
125132
);
126133
tracing::debug!("pulling frame");
127-
match self.pull_with_retry(uri, self.max_retries).await? {
128-
Some(frame) => {
129-
self.durable_frame_num = frame_no;
130-
Ok(Some(frame))
131-
}
132-
None => Ok(None),
133-
}
134+
self.pull_with_retry(uri, self.max_retries).await
134135
}
135136

136137
#[tracing::instrument(skip(self, frame))]
@@ -149,7 +150,7 @@ impl SyncContext {
149150
);
150151
tracing::debug!("pushing frame");
151152

152-
let durable_frame_num = self.push_with_retry(uri, frame, self.max_retries).await?;
153+
let (generation, durable_frame_num) = self.push_with_retry(uri, frame, self.max_retries).await?;
153154

154155
if durable_frame_num > frame_no {
155156
tracing::error!(
@@ -178,12 +179,14 @@ impl SyncContext {
178179
tracing::debug!(?durable_frame_num, "frame successfully pushed");
179180

180181
// Update our last known max_frame_no from the server.
182+
tracing::debug!(?generation, ?durable_frame_num, "updating remote generation and durable_frame_num");
183+
self.durable_generation = generation;
181184
self.durable_frame_num = durable_frame_num;
182185

183186
Ok(durable_frame_num)
184187
}
185188

186-
async fn push_with_retry(&self, uri: String, frame: Bytes, max_retries: usize) -> Result<u32> {
189+
async fn push_with_retry(&self, uri: String, frame: Bytes, max_retries: usize) -> Result<(u32, u32)> {
187190
let mut nr_retries = 0;
188191
loop {
189192
let mut req = http::Request::post(uri.clone());
@@ -213,6 +216,14 @@ impl SyncContext {
213216
let resp = serde_json::from_slice::<serde_json::Value>(&res_body[..])
214217
.map_err(SyncError::JsonDecode)?;
215218

219+
let generation = resp
220+
.get("generation")
221+
.ok_or_else(|| SyncError::JsonValue(resp.clone()))?;
222+
223+
let generation = generation
224+
.as_u64()
225+
.ok_or_else(|| SyncError::JsonValue(generation.clone()))?;
226+
216227
let max_frame_no = resp
217228
.get("max_frame_no")
218229
.ok_or_else(|| SyncError::JsonValue(resp.clone()))?;
@@ -221,7 +232,7 @@ impl SyncContext {
221232
.as_u64()
222233
.ok_or_else(|| SyncError::JsonValue(max_frame_no.clone()))?;
223234

224-
return Ok(max_frame_no as u32);
235+
return Ok((generation as u32, max_frame_no as u32));
225236
}
226237

227238
// If we've retried too many times or the error is not a server error,
@@ -244,7 +255,7 @@ impl SyncContext {
244255
}
245256
}
246257

247-
async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result<Option<Bytes>> {
258+
async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result<PullResult> {
248259
let mut nr_retries = 0;
249260
loop {
250261
let mut req = http::Request::builder().method("GET").uri(uri.clone());
@@ -268,10 +279,27 @@ impl SyncContext {
268279
let frame = hyper::body::to_bytes(res.into_body())
269280
.await
270281
.map_err(SyncError::HttpBody)?;
271-
return Ok(Some(frame));
282+
return Ok(PullResult::Frame(frame));
283+
}
284+
if res.status() == StatusCode::BAD_REQUEST {
285+
let res_body = hyper::body::to_bytes(res.into_body())
286+
.await
287+
.map_err(SyncError::HttpBody)?;
288+
289+
let resp = serde_json::from_slice::<serde_json::Value>(&res_body[..])
290+
.map_err(SyncError::JsonDecode)?;
291+
292+
let generation = resp
293+
.get("generation")
294+
.ok_or_else(|| SyncError::JsonValue(resp.clone()))?;
295+
296+
let generation = generation
297+
.as_u64()
298+
.ok_or_else(|| SyncError::JsonValue(generation.clone()))?;
299+
return Ok(PullResult::EndOfGeneration { max_generation: generation as u32 });
272300
}
273301
if res.status() == StatusCode::BAD_REQUEST {
274-
return Ok(None);
302+
return Err(SyncError::PullFrame(res.status(), "Bad Request".to_string()).into());
275303
}
276304
// If we've retried too many times or the error is not a server error,
277305
// return the error.
@@ -293,12 +321,18 @@ impl SyncContext {
293321
}
294322
}
295323

324+
325+
pub(crate) fn next_generation(&mut self) {
326+
self.durable_generation += 1;
327+
self.durable_frame_num = 0;
328+
}
329+
296330
pub(crate) fn durable_frame_num(&self) -> u32 {
297331
self.durable_frame_num
298332
}
299333

300-
pub(crate) fn generation(&self) -> u32 {
301-
self.generation
334+
pub(crate) fn durable_generation(&self) -> u32 {
335+
self.durable_generation
302336
}
303337

304338
pub(crate) async fn write_metadata(&mut self) -> Result<()> {
@@ -308,7 +342,7 @@ impl SyncContext {
308342
hash: 0,
309343
version: METADATA_VERSION,
310344
durable_frame_num: self.durable_frame_num,
311-
generation: self.generation,
345+
generation: self.durable_generation,
312346
};
313347

314348
metadata.set_hash();
@@ -350,8 +384,8 @@ impl SyncContext {
350384
metadata
351385
);
352386

387+
self.durable_generation = metadata.generation;
353388
self.durable_frame_num = metadata.durable_frame_num;
354-
self.generation = metadata.generation;
355389

356390
Ok(())
357391
}
@@ -436,10 +470,7 @@ pub async fn sync_offline(
436470
sync_ctx: &mut SyncContext,
437471
conn: &Connection,
438472
) -> Result<crate::database::Replicated> {
439-
let durable_frame_no = sync_ctx.durable_frame_num();
440-
let max_frame_no = conn.wal_frame_count();
441-
442-
if max_frame_no > durable_frame_no {
473+
if is_ahead_of_remote(&sync_ctx, &conn) {
443474
match try_push(sync_ctx, conn).await {
444475
Ok(rep) => Ok(rep),
445476
Err(Error::Sync(err)) => {
@@ -475,6 +506,11 @@ pub async fn sync_offline(
475506
})
476507
}
477508

509+
fn is_ahead_of_remote(sync_ctx: &SyncContext, conn: &Connection) -> bool {
510+
let max_local_frame = conn.wal_frame_count();
511+
max_local_frame > sync_ctx.durable_frame_num()
512+
}
513+
478514
async fn try_push(
479515
sync_ctx: &mut SyncContext,
480516
conn: &Connection,
@@ -496,7 +532,7 @@ async fn try_push(
496532
});
497533
}
498534

499-
let generation = sync_ctx.generation(); // TODO: Probe from WAL.
535+
let generation = sync_ctx.durable_generation();
500536
let start_frame_no = sync_ctx.durable_frame_num() + 1;
501537
let end_frame_no = max_frame_no;
502538

@@ -532,29 +568,60 @@ async fn try_pull(
532568
sync_ctx: &mut SyncContext,
533569
conn: &Connection,
534570
) -> Result<crate::database::Replicated> {
535-
let generation = sync_ctx.generation();
536-
let mut frame_no = sync_ctx.durable_frame_num() + 1;
537-
538571
let insert_handle = conn.wal_insert_handle()?;
539572

573+
let mut err = None;
574+
540575
loop {
576+
let generation = sync_ctx.durable_generation();
577+
let frame_no = sync_ctx.durable_frame_num() + 1;
541578
match sync_ctx.pull_one_frame(generation, frame_no).await {
542-
Ok(Some(frame)) => {
579+
Ok(PullResult::Frame(frame)) => {
543580
insert_handle.insert(&frame)?;
544-
frame_no += 1;
581+
sync_ctx.durable_frame_num = frame_no;
545582
}
546-
Ok(None) => {
583+
Ok(PullResult::EndOfGeneration { max_generation }) => {
584+
// If there are no more generations to pull, we're done.
585+
if generation >= max_generation {
586+
break;
587+
}
588+
insert_handle.end()?;
547589
sync_ctx.write_metadata().await?;
548-
return Ok(crate::database::Replicated {
549-
frame_no: None,
550-
frames_synced: 1,
551-
});
552-
}
553-
Err(err) => {
554-
tracing::debug!("pull_one_frame error: {:?}", err);
590+
591+
// TODO: Make this crash-proof.
592+
conn.wal_checkpoint(true)?;
593+
594+
sync_ctx.next_generation();
555595
sync_ctx.write_metadata().await?;
556-
return Err(err);
596+
597+
insert_handle.begin()?;
598+
}
599+
Err(e) => {
600+
tracing::debug!("pull_one_frame error: {:?}", e);
601+
err.replace(e);
602+
break;
557603
}
558604
}
559605
}
606+
// This is crash-proof because we:
607+
//
608+
// 1. Write WAL frame first
609+
// 2. Write new max frame to temporary metadata
610+
// 3. Atomically rename the temporary metadata to the real metadata
611+
//
612+
// If we crash before metadata rename completes, the old metadata still
613+
// points to last successful frame, allowing safe retry from that point.
614+
// If we happen to have the frame already in the WAL, it's fine to re-pull
615+
// because append locally is idempotent.
616+
insert_handle.end()?;
617+
sync_ctx.write_metadata().await?;
618+
619+
if let Some(err) = err {
620+
Err(err)
621+
} else {
622+
Ok(crate::database::Replicated {
623+
frame_no: None,
624+
frames_synced: 1,
625+
})
626+
}
560627
}

0 commit comments

Comments
 (0)