Skip to content

Commit 3a84ed4

Browse files
committed
feat: sycn_interval for offline writes
1 parent 5a0f951 commit 3a84ed4

File tree

3 files changed

+90
-19
lines changed

3 files changed

+90
-19
lines changed

libsql/src/database.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ pub use builder::Builder;
77
#[cfg(feature = "core")]
88
pub use libsql_sys::{Cipher, EncryptionConfig};
99

10+
use crate::sync::DropAbort;
1011
use crate::{Connection, Result};
1112
use std::fmt;
1213
use std::sync::atomic::AtomicU64;
14+
use std::sync::Arc;
1315

1416
cfg_core! {
1517
bitflags::bitflags! {
@@ -99,6 +101,7 @@ enum DbType {
99101
url: String,
100102
auth_token: String,
101103
connector: crate::util::ConnectorService,
104+
_bg_abort: Option<Arc<DropAbort>>,
102105
},
103106
#[cfg(feature = "remote")]
104107
Remote {
@@ -673,9 +676,10 @@ impl Database {
673676
url,
674677
auth_token,
675678
connector,
679+
..
676680
} => {
677681
use crate::{
678-
hrana::{connection::HttpConnection, hyper::HttpSender},
682+
hrana::connection::HttpConnection,
679683
local::impls::LibsqlConnection,
680684
replication::connection::State,
681685
sync::connection::SyncedConnection,
@@ -699,10 +703,11 @@ impl Database {
699703
if *remote_writes {
700704
let synced = SyncedConnection {
701705
local,
702-
remote: HttpConnection::new(
706+
remote: HttpConnection::new_with_connector(
703707
url.clone(),
704708
auth_token.clone(),
705-
HttpSender::new(connector.clone(), None),
709+
connector.clone(),
710+
None,
706711
),
707712
read_your_writes: *read_your_writes,
708713
context: db.sync_ctx.clone().unwrap(),

libsql/src/database/builder.rs

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ cfg_core! {
22
use crate::EncryptionConfig;
33
}
44

5-
use crate::{Database, Result};
5+
use std::sync::Arc;
6+
7+
use tracing::Instrument as _;
8+
9+
use crate::{sync::DropAbort, Database, Result};
610

711
use super::DbType;
812

@@ -108,6 +112,7 @@ impl Builder<()> {
108112
read_your_writes: true,
109113
remote_writes: false,
110114
push_batch_size: 0,
115+
sync_interval: None,
111116
},
112117
}
113118
}
@@ -401,12 +406,18 @@ cfg_replication! {
401406

402407
if res.status().is_success() {
403408
tracing::trace!("Using sync protocol v2 for {}", url);
404-
return Builder::new_synced_database(path, url, auth_token)
409+
let builder = Builder::new_synced_database(path, url, auth_token)
405410
.connector(connector)
406411
.remote_writes(true)
407-
.read_your_writes(read_your_writes)
408-
.build()
409-
.await;
412+
.read_your_writes(read_your_writes);
413+
414+
let builder = if let Some(sync_interval) = sync_interval {
415+
builder.sync_interval(sync_interval)
416+
} else {
417+
builder
418+
};
419+
420+
return builder.build().await;
410421
}
411422
tracing::trace!("Using sync protocol v1 for {} based on probe results", url);
412423
}
@@ -542,6 +553,7 @@ cfg_sync! {
542553
remote_writes: bool,
543554
read_your_writes: bool,
544555
push_batch_size: u32,
556+
sync_interval: Option<std::time::Duration>,
545557
}
546558

547559
impl Builder<SyncedDatabase> {
@@ -566,6 +578,14 @@ cfg_sync! {
566578
self
567579
}
568580

581+
/// Set the duration at which the replicator will automatically call `sync` in the
582+
/// background. The sync will continue for the duration that the resulted `Database`
583+
/// type is alive for, once it is dropped the background task will get dropped and stop.
584+
pub fn sync_interval(mut self, duration: std::time::Duration) -> Builder<SyncedDatabase> {
585+
self.inner.sync_interval = Some(duration);
586+
self
587+
}
588+
569589
/// Provide a custom http connector that will be used to create http connections.
570590
pub fn connector<C>(mut self, connector: C) -> Builder<SyncedDatabase>
571591
where
@@ -594,6 +614,7 @@ cfg_sync! {
594614
remote_writes,
595615
read_your_writes,
596616
push_batch_size,
617+
sync_interval,
597618
} = self.inner;
598619

599620
let path = path.to_str().ok_or(crate::Error::InvalidUTF8Path)?.to_owned();
@@ -624,6 +645,30 @@ cfg_sync! {
624645
db.sync_ctx.as_ref().unwrap().lock().await.set_push_batch_size(push_batch_size);
625646
}
626647

648+
let mut bg_abort: Option<Arc<DropAbort>> = None;
649+
let conn = db.connect()?;
650+
651+
let sync_ctx = db.sync_ctx.as_ref().unwrap().clone();
652+
653+
if let Some(sync_interval) = sync_interval {
654+
let jh = tokio::spawn(
655+
async move {
656+
loop {
657+
tracing::trace!("trying to sync");
658+
let mut ctx = sync_ctx.lock().await;
659+
if let Err(e) = crate::sync::try_pull(&mut ctx, &conn).await {
660+
tracing::error!("sync error: {}", e);
661+
}
662+
663+
tokio::time::sleep(sync_interval).await;
664+
}
665+
}
666+
.instrument(tracing::info_span!("sync_interval")),
667+
);
668+
669+
bg_abort.replace(Arc::new(DropAbort(jh.abort_handle())));
670+
}
671+
627672
Ok(Database {
628673
db_type: DbType::Offline {
629674
db,
@@ -632,6 +677,7 @@ cfg_sync! {
632677
url,
633678
auth_token,
634679
connector,
680+
_bg_abort: bg_abort,
635681
},
636682
max_write_replication_index: Default::default(),
637683
})

libsql/src/sync.rs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use bytes::Bytes;
66
use chrono::Utc;
77
use http::{HeaderValue, StatusCode};
88
use hyper::Body;
9-
use tokio::io::AsyncWriteExt as _;
9+
use tokio::{io::AsyncWriteExt as _, task::AbortHandle};
1010
use uuid::Uuid;
1111

1212
#[cfg(test)]
@@ -81,6 +81,14 @@ pub struct PushResult {
8181
baton: Option<String>,
8282
}
8383

84+
pub struct DropAbort(pub AbortHandle);
85+
86+
impl Drop for DropAbort {
87+
fn drop(&mut self) {
88+
self.0.abort();
89+
}
90+
}
91+
8492
pub enum PushStatus {
8593
Ok,
8694
Conflict,
@@ -216,7 +224,9 @@ impl SyncContext {
216224

217225
match result.status {
218226
PushStatus::Conflict => {
219-
return Err(SyncError::InvalidPushFrameConflict(frame_no, result.max_frame_no).into());
227+
return Err(
228+
SyncError::InvalidPushFrameConflict(frame_no, result.max_frame_no).into(),
229+
);
220230
}
221231
_ => {}
222232
}
@@ -251,7 +261,11 @@ impl SyncContext {
251261
tracing::debug!(?durable_frame_num, "frame successfully pushed");
252262

253263
// Update our last known max_frame_no from the server.
254-
tracing::debug!(?generation, ?durable_frame_num, "updating remote generation and durable_frame_num");
264+
tracing::debug!(
265+
?generation,
266+
?durable_frame_num,
267+
"updating remote generation and durable_frame_num"
268+
);
255269
self.durable_generation = generation;
256270
self.durable_frame_num = durable_frame_num;
257271

@@ -261,7 +275,12 @@ impl SyncContext {
261275
})
262276
}
263277

264-
async fn push_with_retry(&self, mut uri: String, body: Bytes, max_retries: usize) -> Result<PushResult> {
278+
async fn push_with_retry(
279+
&self,
280+
mut uri: String,
281+
body: Bytes,
282+
max_retries: usize,
283+
) -> Result<PushResult> {
265284
let mut nr_retries = 0;
266285
loop {
267286
let mut req = http::Request::post(uri.clone());
@@ -402,7 +421,9 @@ impl SyncContext {
402421
}
403422
// BUG ALERT: The server returns a 500 error if the remote database is empty.
404423
// This is a bug and should be fixed.
405-
if res.status() == StatusCode::BAD_REQUEST || res.status() == StatusCode::INTERNAL_SERVER_ERROR {
424+
if res.status() == StatusCode::BAD_REQUEST
425+
|| res.status() == StatusCode::INTERNAL_SERVER_ERROR
426+
{
406427
let res_body = hyper::body::to_bytes(res.into_body())
407428
.await
408429
.map_err(SyncError::HttpBody)?;
@@ -417,7 +438,9 @@ impl SyncContext {
417438
let generation = generation
418439
.as_u64()
419440
.ok_or_else(|| SyncError::JsonValue(generation.clone()))?;
420-
return Ok(PullResult::EndOfGeneration { max_generation: generation as u32 });
441+
return Ok(PullResult::EndOfGeneration {
442+
max_generation: generation as u32,
443+
});
421444
}
422445
if res.status().is_redirection() {
423446
uri = match res.headers().get(hyper::header::LOCATION) {
@@ -449,7 +472,6 @@ impl SyncContext {
449472
}
450473
}
451474

452-
453475
pub(crate) fn next_generation(&mut self) {
454476
self.durable_generation += 1;
455477
self.durable_frame_num = 0;
@@ -741,9 +763,7 @@ pub async fn bootstrap_db(sync_ctx: &mut SyncContext) -> Result<()> {
741763
// if we are lagging behind, then we will call the export API and get to the latest
742764
// generation directly.
743765
let info = sync_ctx.get_remote_info().await?;
744-
sync_ctx
745-
.sync_db_if_needed(info.current_generation)
746-
.await?;
766+
sync_ctx.sync_db_if_needed(info.current_generation).await?;
747767
// when sync_ctx is initialised, we set durable_generation to 0. however, once
748768
// sync_db is called, it should be > 0.
749769
assert!(sync_ctx.durable_generation > 0, "generation should be > 0");
@@ -871,7 +891,7 @@ pub async fn try_pull(
871891
let insert_handle = conn.wal_insert_handle()?;
872892

873893
let mut err = None;
874-
894+
875895
loop {
876896
let generation = sync_ctx.durable_generation();
877897
let frame_no = sync_ctx.durable_frame_num() + 1;

0 commit comments

Comments
 (0)