Skip to content

Commit 72c6bd1

Browse files
committed
libsql: move push_with_retry to sync.rs
1 parent 2e7b564 commit 72c6bd1

File tree

2 files changed

+58
-38
lines changed

2 files changed

+58
-38
lines changed

libsql/src/local/database.rs

Lines changed: 1 addition & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ impl Database {
443443
frame_no,
444444
frame_no + 1
445445
);
446-
let max_frame_no = self
446+
let max_frame_no = sync_ctx
447447
.push_with_retry(
448448
uri,
449449
&sync_ctx.auth_token,
@@ -454,43 +454,6 @@ impl Database {
454454
Ok(max_frame_no)
455455
}
456456

457-
#[cfg(feature = "sync")]
458-
async fn push_with_retry(
459-
&self,
460-
uri: String,
461-
auth_token: &Option<String>,
462-
frame: Vec<u8>,
463-
max_retries: usize,
464-
) -> Result<u32> {
465-
let mut nr_retries = 0;
466-
loop {
467-
let client = reqwest::Client::new();
468-
let mut builder = client.post(uri.to_owned());
469-
match auth_token {
470-
Some(ref auth_token) => {
471-
builder = builder
472-
.header("Authorization", format!("Bearer {}", auth_token.to_owned()));
473-
}
474-
None => {}
475-
}
476-
let res = builder.body(frame.to_vec()).send().await.unwrap();
477-
if res.status().is_success() {
478-
let resp = res.json::<serde_json::Value>().await.unwrap();
479-
let max_frame_no = resp.get("max_frame_no").unwrap().as_u64().unwrap();
480-
return Ok(max_frame_no as u32);
481-
}
482-
if nr_retries > max_retries {
483-
return Err(crate::errors::Error::ConnectionFailed(format!(
484-
"Failed to push frame: {}",
485-
res.status()
486-
)));
487-
}
488-
let delay = std::time::Duration::from_millis(100 * (1 << nr_retries));
489-
tokio::time::sleep(delay).await;
490-
nr_retries += 1;
491-
}
492-
}
493-
494457
pub(crate) fn path(&self) -> &str {
495458
&self.db_path
496459
}

libsql/src/sync.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use crate::Result;
2+
13
const DEFAULT_MAX_RETRIES: usize = 5;
24
pub struct SyncContext {
35
pub sync_url: String,
@@ -15,4 +17,59 @@ impl SyncContext {
1517
max_retries: DEFAULT_MAX_RETRIES,
1618
}
1719
}
20+
21+
pub(crate) async fn push_with_retry(
22+
&self,
23+
uri: String,
24+
auth_token: &Option<String>,
25+
frame: Vec<u8>,
26+
max_retries: usize,
27+
) -> Result<u32> {
28+
let mut nr_retries = 0;
29+
loop {
30+
// TODO(lucio): add custom connector + tls support here
31+
let client = hyper::client::Client::builder().build_http::<hyper::Body>();
32+
33+
let mut req = http::Request::post(uri.clone());
34+
35+
match auth_token {
36+
Some(ref auth_token) => {
37+
let auth_header =
38+
http::HeaderValue::try_from(format!("Bearer {}", auth_token.to_owned()))
39+
.unwrap();
40+
41+
req.headers_mut()
42+
.expect("valid http request")
43+
.insert("Authorization", auth_header);
44+
}
45+
None => {}
46+
}
47+
48+
// TODO(lucio): convert this to use bytes to make this clone cheap, it should be
49+
// to possible use BytesMut when reading frames from the WAL and efficiently use Bytes
50+
// from that.
51+
let req = req.body(frame.clone().into()).expect("valid body");
52+
53+
let res = client.request(req).await.unwrap();
54+
55+
// TODO(lucio): only retry on server side errors
56+
if res.status().is_success() {
57+
let res_body = hyper::body::to_bytes(res.into_body()).await.unwrap();
58+
let resp = serde_json::from_slice::<serde_json::Value>(&res_body[..]).unwrap();
59+
60+
let max_frame_no = resp.get("max_frame_no").unwrap().as_u64().unwrap();
61+
return Ok(max_frame_no as u32);
62+
}
63+
64+
if nr_retries > max_retries {
65+
return Err(crate::errors::Error::ConnectionFailed(format!(
66+
"Failed to push frame: {}",
67+
res.status()
68+
)));
69+
}
70+
let delay = std::time::Duration::from_millis(100 * (1 << nr_retries));
71+
tokio::time::sleep(delay).await;
72+
nr_retries += 1;
73+
}
74+
}
1875
}

0 commit comments

Comments
 (0)