Skip to content

Commit 89b3460

Browse files
authored
Merge pull request #1817 from tursodatabase/optimize-wal-push
libsql: Improve WAL frame push logic
2 parents 5ee4723 + 37bc47d commit 89b3460

File tree

2 files changed

+19
-8
lines changed

2 files changed

+19
-8
lines changed

libsql/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ fallible-iterator = { version = "0.3", optional = true }
4242

4343
libsql_replication = { version = "0.6", path = "../libsql-replication", optional = true }
4444
async-stream = { version = "0.3.5", optional = true }
45-
reqwest = { version = "0.12.9", default-features = false, features = [ "rustls-tls" ], optional = true }
45+
reqwest = { version = "0.12.9", default-features = false, features = [ "rustls-tls", "json" ], optional = true }
4646

4747
[dev-dependencies]
4848
criterion = { version = "0.5", features = ["html_reports", "async", "async_futures", "async_tokio"] }
@@ -106,6 +106,7 @@ sync = [
106106
"dep:tokio",
107107
"dep:futures",
108108
"dep:reqwest",
109+
"dep:serde_json",
109110
]
110111
hrana = [
111112
"parser",

libsql/src/local/database.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -397,8 +397,16 @@ impl Database {
397397
let start_frame_no = sync_ctx.durable_frame_num + 1;
398398
let end_frame_no = max_frame_no;
399399

400-
for frame_no in start_frame_no..end_frame_no+1 {
401-
self.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size).await?;
400+
let mut frame_no = start_frame_no;
401+
while frame_no <= end_frame_no {
402+
// The server returns its maximum frame number. To avoid resending
403+
// frames the server already knows about, we need to update the
404+
// frame number to the one returned by the server.
405+
let max_frame_no = self.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size).await?;
406+
if max_frame_no > frame_no {
407+
frame_no = max_frame_no;
408+
}
409+
frame_no += 1;
402410
}
403411

404412
let frame_count = end_frame_no - start_frame_no + 1;
@@ -409,7 +417,7 @@ impl Database {
409417
}
410418

411419
#[cfg(feature = "sync")]
412-
async fn push_one_frame(&self, conn: &Connection, sync_ctx: &SyncContext, generation: u32, frame_no: u32, page_size: u32) -> Result<()> {
420+
async fn push_one_frame(&self, conn: &Connection, sync_ctx: &SyncContext, generation: u32, frame_no: u32, page_size: u32) -> Result<u32> {
413421
let frame_size: usize = 24+page_size as usize;
414422
let frame = vec![0; frame_size];
415423
let rc = unsafe {
@@ -419,12 +427,12 @@ impl Database {
419427
return Err(crate::errors::Error::SqliteFailure(rc as std::ffi::c_int, format!("Failed to get frame: {}", frame_no)));
420428
}
421429
let uri = format!("{}/sync/{}/{}/{}", sync_ctx.sync_url, generation, frame_no, frame_no+1);
422-
self.push_with_retry(uri, &sync_ctx.auth_token, frame.to_vec(), sync_ctx.max_retries).await?;
423-
Ok(())
430+
let max_frame_no = self.push_with_retry(uri, &sync_ctx.auth_token, frame.to_vec(), sync_ctx.max_retries).await?;
431+
Ok(max_frame_no)
424432
}
425433

426434
#[cfg(feature = "sync")]
427-
async fn push_with_retry(&self, uri: String, auth_token: &Option<String>, frame: Vec<u8>, max_retries: usize) -> Result<()> {
435+
async fn push_with_retry(&self, uri: String, auth_token: &Option<String>, frame: Vec<u8>, max_retries: usize) -> Result<u32> {
428436
let mut nr_retries = 0;
429437
loop {
430438
let client = reqwest::Client::new();
@@ -437,7 +445,9 @@ impl Database {
437445
}
438446
let res = builder.body(frame.to_vec()).send().await.unwrap();
439447
if res.status().is_success() {
440-
return Ok(());
448+
let resp = res.json::<serde_json::Value>().await.unwrap();
449+
let max_frame_no = resp.get("max_frame_no").unwrap().as_u64().unwrap();
450+
return Ok(max_frame_no as u32);
441451
}
442452
if nr_retries > max_retries {
443453
return Err(crate::errors::Error::ConnectionFailed(format!("Failed to push frame: {}", res.status())));

0 commit comments

Comments
 (0)