Skip to content

Commit e0ce549

Browse files
authored
Prevent holding lock to context between sync interval ticks (#2080)
Fixes #2076.
2 parents 41eae61 + db08784 commit e0ce549

File tree

2 files changed

+35
-16
lines changed

2 files changed

+35
-16
lines changed

libsql/src/database/builder.rs

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -644,38 +644,54 @@ cfg_sync! {
644644

645645
let mut bg_abort: Option<std::sync::Arc<crate::sync::DropAbort>> = None;
646646

647+
647648
if let Some(sync_interval) = sync_interval {
649+
let (cancel_tx, mut cancel_rx) = tokio::sync::oneshot::channel::<()>();
650+
651+
let sync_span = tracing::debug_span!("sync_interval");
652+
let _enter = sync_span.enter();
653+
648654
let sync_ctx = db.sync_ctx.as_ref().unwrap().clone();
649655
{
650656
let mut ctx = sync_ctx.lock().await;
651657
crate::sync::bootstrap_db(&mut ctx).await?;
658+
tracing::debug!("finished bootstrap with sync interval");
652659
}
653660

654661
// db.connect creates a local db file, so it is important that we always call
655662
// `bootstrap_db` (for synced dbs) before calling connect. Otherwise, the sync
656663
// protocol skips calling `export` endpoint causing slowdown in initial bootstrap.
657664
let conn = db.connect()?;
658-
let jh = tokio::spawn(
665+
666+
tokio::spawn(
659667
async move {
668+
let mut interval = tokio::time::interval(sync_interval);
669+
660670
loop {
661-
tracing::trace!("trying to sync");
662-
let mut ctx = sync_ctx.lock().await;
663-
if remote_writes {
664-
if let Err(e) = crate::sync::try_pull(&mut ctx, &conn).await {
665-
tracing::error!("sync error: {}", e);
666-
}
667-
} else {
668-
if let Err(e) = crate::sync::sync_offline(&mut ctx, &conn).await {
669-
tracing::error!("sync error: {}", e);
671+
tokio::select! {
672+
_ = &mut cancel_rx => break,
673+
_ = interval.tick() => {
674+
tracing::debug!("trying to sync");
675+
676+
let mut ctx = sync_ctx.lock().await;
677+
678+
let result = if remote_writes {
679+
crate::sync::try_pull(&mut ctx, &conn).await
680+
} else {
681+
crate::sync::sync_offline(&mut ctx, &conn).await
682+
};
683+
684+
if let Err(e) = result {
685+
tracing::error!("Error syncing database: {}", e);
686+
}
670687
}
671688
}
672-
tokio::time::sleep(sync_interval).await;
673689
}
674690
}
675-
.instrument(tracing::info_span!("sync_interval")),
691+
.instrument(tracing::debug_span!("sync interval thread")),
676692
);
677693

678-
bg_abort.replace(std::sync::Arc::new(crate::sync::DropAbort(jh.abort_handle())));
694+
bg_abort.replace(std::sync::Arc::new(crate::sync::DropAbort(Some(cancel_tx))));
679695
}
680696

681697
Ok(Database {

libsql/src/sync.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use bytes::Bytes;
66
use chrono::Utc;
77
use http::{HeaderValue, StatusCode};
88
use hyper::Body;
9-
use tokio::{io::AsyncWriteExt as _, task::AbortHandle};
9+
use tokio::io::AsyncWriteExt as _;
1010
use uuid::Uuid;
1111

1212
#[cfg(test)]
@@ -81,11 +81,14 @@ pub struct PushResult {
8181
baton: Option<String>,
8282
}
8383

84-
pub struct DropAbort(pub AbortHandle);
84+
pub struct DropAbort(pub Option<tokio::sync::oneshot::Sender<()>>);
8585

8686
impl Drop for DropAbort {
8787
fn drop(&mut self) {
88-
self.0.abort();
88+
tracing::debug!("aborting");
89+
if let Some(sender) = self.0.take() {
90+
let _ = sender.send(());
91+
}
8992
}
9093
}
9194

0 commit comments

Comments
 (0)