Skip to content

Commit e3904f9

Browse files
authored
Add sync interval support for synced databases (#2055)
This adds sync interval support for synced databases, which use the V2 sync protocol. Fixes #2059
2 parents 5a0f951 + 4647469 commit e3904f9

File tree

3 files changed

+91
-21
lines changed

3 files changed

+91
-21
lines changed

libsql/src/database.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ enum DbType {
9999
url: String,
100100
auth_token: String,
101101
connector: crate::util::ConnectorService,
102+
_bg_abort: Option<std::sync::Arc<crate::sync::DropAbort>>,
102103
},
103104
#[cfg(feature = "remote")]
104105
Remote {
@@ -673,12 +674,11 @@ impl Database {
673674
url,
674675
auth_token,
675676
connector,
677+
..
676678
} => {
677679
use crate::{
678-
hrana::{connection::HttpConnection, hyper::HttpSender},
679-
local::impls::LibsqlConnection,
680-
replication::connection::State,
681-
sync::connection::SyncedConnection,
680+
hrana::connection::HttpConnection, local::impls::LibsqlConnection,
681+
replication::connection::State, sync::connection::SyncedConnection,
682682
};
683683
use tokio::sync::Mutex;
684684

@@ -699,10 +699,11 @@ impl Database {
699699
if *remote_writes {
700700
let synced = SyncedConnection {
701701
local,
702-
remote: HttpConnection::new(
702+
remote: HttpConnection::new_with_connector(
703703
url.clone(),
704704
auth_token.clone(),
705-
HttpSender::new(connector.clone(), None),
705+
connector.clone(),
706+
None,
706707
),
707708
read_your_writes: *read_your_writes,
708709
context: db.sync_ctx.clone().unwrap(),

libsql/src/database/builder.rs

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ impl Builder<()> {
108108
read_your_writes: true,
109109
remote_writes: false,
110110
push_batch_size: 0,
111+
sync_interval: None,
111112
},
112113
}
113114
}
@@ -401,12 +402,18 @@ cfg_replication! {
401402

402403
if res.status().is_success() {
403404
tracing::trace!("Using sync protocol v2 for {}", url);
404-
return Builder::new_synced_database(path, url, auth_token)
405+
let builder = Builder::new_synced_database(path, url, auth_token)
405406
.connector(connector)
406407
.remote_writes(true)
407-
.read_your_writes(read_your_writes)
408-
.build()
409-
.await;
408+
.read_your_writes(read_your_writes);
409+
410+
let builder = if let Some(sync_interval) = sync_interval {
411+
builder.sync_interval(sync_interval)
412+
} else {
413+
builder
414+
};
415+
416+
return builder.build().await;
410417
}
411418
tracing::trace!("Using sync protocol v1 for {} based on probe results", url);
412419
}
@@ -542,6 +549,7 @@ cfg_sync! {
542549
remote_writes: bool,
543550
read_your_writes: bool,
544551
push_batch_size: u32,
552+
sync_interval: Option<std::time::Duration>,
545553
}
546554

547555
impl Builder<SyncedDatabase> {
@@ -566,6 +574,14 @@ cfg_sync! {
566574
self
567575
}
568576

577+
/// Set the duration at which the replicator will automatically call `sync` in the
578+
/// background. The sync will continue for the duration that the resulted `Database`
579+
/// type is alive for, once it is dropped the background task will get dropped and stop.
580+
pub fn sync_interval(mut self, duration: std::time::Duration) -> Builder<SyncedDatabase> {
581+
self.inner.sync_interval = Some(duration);
582+
self
583+
}
584+
569585
/// Provide a custom http connector that will be used to create http connections.
570586
pub fn connector<C>(mut self, connector: C) -> Builder<SyncedDatabase>
571587
where
@@ -580,6 +596,8 @@ cfg_sync! {
580596

581597
/// Build a connection to a local database that can be synced to remote server.
582598
pub async fn build(self) -> Result<Database> {
599+
use tracing::Instrument as _;
600+
583601
let SyncedDatabase {
584602
path,
585603
flags,
@@ -594,6 +612,7 @@ cfg_sync! {
594612
remote_writes,
595613
read_your_writes,
596614
push_batch_size,
615+
sync_interval,
597616
} = self.inner;
598617

599618
let path = path.to_str().ok_or(crate::Error::InvalidUTF8Path)?.to_owned();
@@ -624,6 +643,35 @@ cfg_sync! {
624643
db.sync_ctx.as_ref().unwrap().lock().await.set_push_batch_size(push_batch_size);
625644
}
626645

646+
let mut bg_abort: Option<std::sync::Arc<crate::sync::DropAbort>> = None;
647+
let conn = db.connect()?;
648+
649+
let sync_ctx = db.sync_ctx.as_ref().unwrap().clone();
650+
651+
if let Some(sync_interval) = sync_interval {
652+
let jh = tokio::spawn(
653+
async move {
654+
loop {
655+
tracing::trace!("trying to sync");
656+
let mut ctx = sync_ctx.lock().await;
657+
if remote_writes {
658+
if let Err(e) = crate::sync::try_pull(&mut ctx, &conn).await {
659+
tracing::error!("sync error: {}", e);
660+
}
661+
} else {
662+
if let Err(e) = crate::sync::sync_offline(&mut ctx, &conn).await {
663+
tracing::error!("sync error: {}", e);
664+
}
665+
}
666+
tokio::time::sleep(sync_interval).await;
667+
}
668+
}
669+
.instrument(tracing::info_span!("sync_interval")),
670+
);
671+
672+
bg_abort.replace(std::sync::Arc::new(crate::sync::DropAbort(jh.abort_handle())));
673+
}
674+
627675
Ok(Database {
628676
db_type: DbType::Offline {
629677
db,
@@ -632,6 +680,7 @@ cfg_sync! {
632680
url,
633681
auth_token,
634682
connector,
683+
_bg_abort: bg_abort,
635684
},
636685
max_write_replication_index: Default::default(),
637686
})

libsql/src/sync.rs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use bytes::Bytes;
66
use chrono::Utc;
77
use http::{HeaderValue, StatusCode};
88
use hyper::Body;
9-
use tokio::io::AsyncWriteExt as _;
9+
use tokio::{io::AsyncWriteExt as _, task::AbortHandle};
1010
use uuid::Uuid;
1111

1212
#[cfg(test)]
@@ -81,6 +81,14 @@ pub struct PushResult {
8181
baton: Option<String>,
8282
}
8383

84+
pub struct DropAbort(pub AbortHandle);
85+
86+
impl Drop for DropAbort {
87+
fn drop(&mut self) {
88+
self.0.abort();
89+
}
90+
}
91+
8492
pub enum PushStatus {
8593
Ok,
8694
Conflict,
@@ -216,7 +224,9 @@ impl SyncContext {
216224

217225
match result.status {
218226
PushStatus::Conflict => {
219-
return Err(SyncError::InvalidPushFrameConflict(frame_no, result.max_frame_no).into());
227+
return Err(
228+
SyncError::InvalidPushFrameConflict(frame_no, result.max_frame_no).into(),
229+
);
220230
}
221231
_ => {}
222232
}
@@ -251,7 +261,11 @@ impl SyncContext {
251261
tracing::debug!(?durable_frame_num, "frame successfully pushed");
252262

253263
// Update our last known max_frame_no from the server.
254-
tracing::debug!(?generation, ?durable_frame_num, "updating remote generation and durable_frame_num");
264+
tracing::debug!(
265+
?generation,
266+
?durable_frame_num,
267+
"updating remote generation and durable_frame_num"
268+
);
255269
self.durable_generation = generation;
256270
self.durable_frame_num = durable_frame_num;
257271

@@ -261,7 +275,12 @@ impl SyncContext {
261275
})
262276
}
263277

264-
async fn push_with_retry(&self, mut uri: String, body: Bytes, max_retries: usize) -> Result<PushResult> {
278+
async fn push_with_retry(
279+
&self,
280+
mut uri: String,
281+
body: Bytes,
282+
max_retries: usize,
283+
) -> Result<PushResult> {
265284
let mut nr_retries = 0;
266285
loop {
267286
let mut req = http::Request::post(uri.clone());
@@ -402,7 +421,9 @@ impl SyncContext {
402421
}
403422
// BUG ALERT: The server returns a 500 error if the remote database is empty.
404423
// This is a bug and should be fixed.
405-
if res.status() == StatusCode::BAD_REQUEST || res.status() == StatusCode::INTERNAL_SERVER_ERROR {
424+
if res.status() == StatusCode::BAD_REQUEST
425+
|| res.status() == StatusCode::INTERNAL_SERVER_ERROR
426+
{
406427
let res_body = hyper::body::to_bytes(res.into_body())
407428
.await
408429
.map_err(SyncError::HttpBody)?;
@@ -417,7 +438,9 @@ impl SyncContext {
417438
let generation = generation
418439
.as_u64()
419440
.ok_or_else(|| SyncError::JsonValue(generation.clone()))?;
420-
return Ok(PullResult::EndOfGeneration { max_generation: generation as u32 });
441+
return Ok(PullResult::EndOfGeneration {
442+
max_generation: generation as u32,
443+
});
421444
}
422445
if res.status().is_redirection() {
423446
uri = match res.headers().get(hyper::header::LOCATION) {
@@ -449,7 +472,6 @@ impl SyncContext {
449472
}
450473
}
451474

452-
453475
pub(crate) fn next_generation(&mut self) {
454476
self.durable_generation += 1;
455477
self.durable_frame_num = 0;
@@ -741,9 +763,7 @@ pub async fn bootstrap_db(sync_ctx: &mut SyncContext) -> Result<()> {
741763
// if we are lagging behind, then we will call the export API and get to the latest
742764
// generation directly.
743765
let info = sync_ctx.get_remote_info().await?;
744-
sync_ctx
745-
.sync_db_if_needed(info.current_generation)
746-
.await?;
766+
sync_ctx.sync_db_if_needed(info.current_generation).await?;
747767
// when sync_ctx is initialised, we set durable_generation to 0. however, once
748768
// sync_db is called, it should be > 0.
749769
assert!(sync_ctx.durable_generation > 0, "generation should be > 0");
@@ -871,7 +891,7 @@ pub async fn try_pull(
871891
let insert_handle = conn.wal_insert_handle()?;
872892

873893
let mut err = None;
874-
894+
875895
loop {
876896
let generation = sync_ctx.durable_generation();
877897
let frame_no = sync_ctx.durable_frame_num() + 1;

0 commit comments

Comments
 (0)