Skip to content

Commit 240eee8

Browse files
authored
Merge pull request #1827 from tursodatabase/lucio/refactor-sync-ctx
libsql: refactor push and sync code
2 parents b80f611 + 4887753 commit 240eee8

File tree

2 files changed

+76
-76
lines changed

2 files changed

+76
-76
lines changed

libsql/src/local/database.rs

Lines changed: 12 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub struct Database {
3333
#[cfg(feature = "replication")]
3434
pub replication_ctx: Option<ReplicationContext>,
3535
#[cfg(feature = "sync")]
36-
pub sync_ctx: Option<SyncContext>,
36+
pub sync_ctx: Option<tokio::sync::Mutex<SyncContext>>,
3737
}
3838

3939
impl Database {
@@ -143,7 +143,10 @@ impl Database {
143143
endpoint
144144
};
145145
let mut db = Database::open(&db_path, flags)?;
146-
db.sync_ctx = Some(SyncContext::new(endpoint, Some(auth_token)));
146+
db.sync_ctx = Some(tokio::sync::Mutex::new(SyncContext::new(
147+
endpoint,
148+
Some(auth_token),
149+
)));
147150
Ok(db)
148151
}
149152

@@ -383,7 +386,7 @@ impl Database {
383386
#[cfg(feature = "sync")]
384387
/// Push WAL frames to remote.
385388
pub async fn push(&self) -> Result<crate::database::Replicated> {
386-
let sync_ctx = self.sync_ctx.as_ref().unwrap();
389+
let sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await;
387390
let conn = self.connect()?;
388391

389392
let page_size = {
@@ -398,17 +401,20 @@ impl Database {
398401
let max_frame_no = conn.wal_frame_count();
399402

400403
let generation = 1; // TODO: Probe from WAL.
401-
let start_frame_no = sync_ctx.durable_frame_num + 1;
404+
let start_frame_no = sync_ctx.durable_frame_num() + 1;
402405
let end_frame_no = max_frame_no;
403406

404407
let mut frame_no = start_frame_no;
405408
while frame_no <= end_frame_no {
409+
let frame = conn.wal_get_frame(frame_no, page_size)?;
410+
406411
// The server returns its maximum frame number. To avoid resending
407412
// frames the server already knows about, we need to update the
408413
// frame number to the one returned by the server.
409-
let max_frame_no = self
410-
.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size)
414+
let max_frame_no = sync_ctx
415+
.push_one_frame(frame.to_vec(), generation, frame_no)
411416
.await?;
417+
412418
if max_frame_no > frame_no {
413419
frame_no = max_frame_no;
414420
}
@@ -422,72 +428,6 @@ impl Database {
422428
})
423429
}
424430

425-
#[cfg(feature = "sync")]
426-
async fn push_one_frame(
427-
&self,
428-
conn: &Connection,
429-
sync_ctx: &SyncContext,
430-
generation: u32,
431-
frame_no: u32,
432-
page_size: u32,
433-
) -> Result<u32> {
434-
let frame = conn.wal_get_frame(frame_no, page_size)?;
435-
436-
let uri = format!(
437-
"{}/sync/{}/{}/{}",
438-
sync_ctx.sync_url,
439-
generation,
440-
frame_no,
441-
frame_no + 1
442-
);
443-
let max_frame_no = self
444-
.push_with_retry(
445-
uri,
446-
&sync_ctx.auth_token,
447-
frame.to_vec(),
448-
sync_ctx.max_retries,
449-
)
450-
.await?;
451-
Ok(max_frame_no)
452-
}
453-
454-
#[cfg(feature = "sync")]
455-
async fn push_with_retry(
456-
&self,
457-
uri: String,
458-
auth_token: &Option<String>,
459-
frame: Vec<u8>,
460-
max_retries: usize,
461-
) -> Result<u32> {
462-
let mut nr_retries = 0;
463-
loop {
464-
let client = reqwest::Client::new();
465-
let mut builder = client.post(uri.to_owned());
466-
match auth_token {
467-
Some(ref auth_token) => {
468-
builder = builder
469-
.header("Authorization", format!("Bearer {}", auth_token.to_owned()));
470-
}
471-
None => {}
472-
}
473-
let res = builder.body(frame.to_vec()).send().await.unwrap();
474-
if res.status().is_success() {
475-
let resp = res.json::<serde_json::Value>().await.unwrap();
476-
let max_frame_no = resp.get("max_frame_no").unwrap().as_u64().unwrap();
477-
return Ok(max_frame_no as u32);
478-
}
479-
if nr_retries > max_retries {
480-
return Err(crate::errors::Error::ConnectionFailed(format!(
481-
"Failed to push frame: {}",
482-
res.status()
483-
)));
484-
}
485-
let delay = std::time::Duration::from_millis(100 * (1 << nr_retries));
486-
tokio::time::sleep(delay).await;
487-
nr_retries += 1;
488-
}
489-
}
490-
491431
pub(crate) fn path(&self) -> &str {
492432
&self.db_path
493433
}

libsql/src/sync.rs

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
use crate::Result;
2+
13
const DEFAULT_MAX_RETRIES: usize = 5;
24
pub struct SyncContext {
3-
pub sync_url: String,
4-
pub auth_token: Option<String>,
5-
pub max_retries: usize,
6-
pub durable_frame_num: u32,
5+
sync_url: String,
6+
auth_token: Option<String>,
7+
max_retries: usize,
8+
durable_frame_num: u32,
79
}
810

911
impl SyncContext {
@@ -15,4 +17,62 @@ impl SyncContext {
1517
max_retries: DEFAULT_MAX_RETRIES,
1618
}
1719
}
20+
21+
pub(crate) async fn push_one_frame(
22+
&self,
23+
frame: Vec<u8>,
24+
generation: u32,
25+
frame_no: u32,
26+
) -> Result<u32> {
27+
let uri = format!(
28+
"{}/sync/{}/{}/{}",
29+
self.sync_url,
30+
generation,
31+
frame_no,
32+
frame_no + 1
33+
);
34+
let max_frame_no = self
35+
.push_with_retry(uri, frame.to_vec(), self.max_retries)
36+
.await?;
37+
38+
Ok(max_frame_no)
39+
}
40+
41+
async fn push_with_retry(
42+
&self,
43+
uri: String,
44+
frame: Vec<u8>,
45+
max_retries: usize,
46+
) -> Result<u32> {
47+
let mut nr_retries = 0;
48+
loop {
49+
let client = reqwest::Client::new();
50+
let mut builder = client.post(uri.to_owned());
51+
match &self.auth_token {
52+
Some(ref auth_token) => {
53+
builder = builder.header("Authorization", format!("Bearer {}", auth_token));
54+
}
55+
None => {}
56+
}
57+
let res = builder.body(frame.to_vec()).send().await.unwrap();
58+
if res.status().is_success() {
59+
let resp = res.json::<serde_json::Value>().await.unwrap();
60+
let max_frame_no = resp.get("max_frame_no").unwrap().as_u64().unwrap();
61+
return Ok(max_frame_no as u32);
62+
}
63+
if nr_retries > max_retries {
64+
return Err(crate::errors::Error::ConnectionFailed(format!(
65+
"Failed to push frame: {}",
66+
res.status()
67+
)));
68+
}
69+
let delay = std::time::Duration::from_millis(100 * (1 << nr_retries));
70+
tokio::time::sleep(delay).await;
71+
nr_retries += 1;
72+
}
73+
}
74+
75+
pub(crate) fn durable_frame_num(&self) -> u32 {
76+
self.durable_frame_num
77+
}
1878
}

0 commit comments

Comments
 (0)