Skip to content

Commit 9cbba77

Browse files
authored
Pull lazily on read for read-your-writes (#2109)
#2105 Before: ```cmd $ hyperfine target/debug/libsql-probe Benchmark 1: target/debug/libsql-probe Time (mean ± σ): 11.488 s ± 3.327 s [User: 0.367 s, System: 0.111 s] Range (min … max): 6.270 s … 17.406 s 10 runs ``` After: ```cmd $ hyperfine target/debug/libsql-probe Benchmark 1: target/debug/libsql-probe Time (mean ± σ): 3.156 s ± 1.063 s [User: 0.406 s, System: 0.066 s] Range (min … max): 1.764 s … 5.268 s 10 runs ``` On this kind of workload, running the server locally: ```rs for i in 0..100 { conn.execute("insert into test values (?)", libsql::params!(i)).await?; } let mut rows = conn.query("select * from test", ()).await?; ```
2 parents eecb25d + f7e1c86 commit 9cbba77

File tree

3 files changed

+52
-26
lines changed

3 files changed

+52
-26
lines changed

libsql/src/database.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,7 @@ impl Database {
712712
read_your_writes: *read_your_writes,
713713
context: db.sync_ctx.clone().unwrap(),
714714
state: std::sync::Arc::new(Mutex::new(State::Init)),
715+
needs_pull: std::sync::atomic::AtomicBool::new(false).into(),
715716
};
716717

717718
let conn = std::sync::Arc::new(synced);

libsql/src/sync/connection.rs

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use crate::{
88
sync::SyncContext,
99
BatchRows, Error, Result, Statement, Transaction, TransactionBehavior,
1010
};
11-
use std::sync::Arc;
11+
use std::sync::{
12+
atomic::{AtomicBool, Ordering},
13+
Arc,
14+
};
1215
use std::time::Duration;
1316
use tokio::sync::Mutex;
1417

@@ -21,6 +24,7 @@ pub struct SyncedConnection {
2124
pub read_your_writes: bool,
2225
pub context: Arc<Mutex<SyncContext>>,
2326
pub state: Arc<Mutex<State>>,
27+
pub needs_pull: Arc<AtomicBool>,
2428
}
2529

2630
impl SyncedConnection {
@@ -89,7 +93,7 @@ impl SyncedConnection {
8993
_ => {
9094
*state = predicted_end_state;
9195
false
92-
},
96+
}
9397
};
9498

9599
Ok(should_execute_local)
@@ -106,6 +110,11 @@ impl Conn for SyncedConnection {
106110

107111
async fn execute_batch(&self, sql: &str) -> Result<BatchRows> {
108112
if self.should_execute_local(sql).await? {
113+
if self.needs_pull.load(Ordering::Relaxed) {
114+
let mut context = self.context.lock().await;
115+
crate::sync::try_pull(&mut context, &self.local).await?;
116+
self.needs_pull.store(false, Ordering::Relaxed);
117+
}
109118
self.local.execute_batch(sql)
110119
} else {
111120
self.remote.execute_batch(sql).await
@@ -114,6 +123,11 @@ impl Conn for SyncedConnection {
114123

115124
async fn execute_transactional_batch(&self, sql: &str) -> Result<BatchRows> {
116125
if self.should_execute_local(sql).await? {
126+
if self.needs_pull.load(Ordering::Relaxed) {
127+
let mut context = self.context.lock().await;
128+
crate::sync::try_pull(&mut context, &self.local).await?;
129+
self.needs_pull.store(false, Ordering::Relaxed);
130+
}
117131
self.local.execute_transactional_batch(sql)?;
118132
Ok(BatchRows::empty())
119133
} else {
@@ -123,25 +137,28 @@ impl Conn for SyncedConnection {
123137

124138
async fn prepare(&self, sql: &str) -> Result<Statement> {
125139
if self.should_execute_local(sql).await? {
126-
Ok(Statement {
140+
let stmt = Statement {
127141
inner: Box::new(LibsqlStmt(self.local.prepare(sql)?)),
142+
};
143+
144+
Ok(Statement {
145+
inner: Box::new(SyncedStatement {
146+
conn: self.local.clone(),
147+
inner: stmt,
148+
context: self.context.clone(),
149+
needs_pull: self.needs_pull.clone(),
150+
}),
128151
})
129152
} else {
130153
let stmt = Statement {
131154
inner: Box::new(self.remote.prepare(sql).await?),
132155
};
133156

134157
if self.read_your_writes {
135-
Ok(Statement {
136-
inner: Box::new(SyncedStatement {
137-
conn: self.local.clone(),
138-
context: self.context.clone(),
139-
inner: stmt,
140-
}),
141-
})
142-
} else {
143-
Ok(stmt)
158+
self.needs_pull.store(true, Ordering::Relaxed);
144159
}
160+
161+
Ok(stmt)
145162
}
146163
}
147164

libsql/src/sync/statement.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ use crate::{
44
statement::Stmt,
55
sync::SyncContext, Column, Result, Rows, Statement,
66
};
7-
use std::sync::Arc;
7+
use std::sync::{atomic::{AtomicBool, Ordering}, Arc};
88
use tokio::sync::Mutex;
99

1010
pub struct SyncedStatement {
1111
pub conn: local::Connection,
12-
pub context: Arc<Mutex<SyncContext>>,
1312
pub inner: Statement,
13+
pub context: Arc<Mutex<SyncContext>>,
14+
pub needs_pull: Arc<AtomicBool>,
1415
}
1516

1617
#[async_trait::async_trait]
@@ -20,24 +21,30 @@ impl Stmt for SyncedStatement {
2021
}
2122

2223
async fn execute(&mut self, params: &Params) -> Result<usize> {
23-
let result = self.inner.execute(params).await;
24-
let mut context = self.context.lock().await;
25-
crate::sync::try_pull(&mut context, &self.conn).await?;
26-
result
24+
if self.needs_pull.load(Ordering::Relaxed) {
25+
let mut context = self.context.lock().await;
26+
crate::sync::try_pull(&mut context, &self.conn).await?;
27+
self.needs_pull.store(false, Ordering::Relaxed);
28+
}
29+
self.inner.execute(params).await
2730
}
2831

2932
async fn query(&mut self, params: &Params) -> Result<Rows> {
30-
let result = self.inner.query(params).await;
31-
let mut context = self.context.lock().await;
32-
crate::sync::try_pull(&mut context, &self.conn).await?;
33-
result
33+
if self.needs_pull.load(Ordering::Relaxed) {
34+
let mut context = self.context.lock().await;
35+
crate::sync::try_pull(&mut context, &self.conn).await?;
36+
self.needs_pull.store(false, Ordering::Relaxed);
37+
}
38+
self.inner.query(params).await
3439
}
3540

3641
async fn run(&mut self, params: &Params) -> Result<()> {
37-
let result = self.inner.run(params).await;
38-
let mut context = self.context.lock().await;
39-
crate::sync::try_pull(&mut context, &self.conn).await?;
40-
result
42+
if self.needs_pull.load(Ordering::Relaxed) {
43+
let mut context = self.context.lock().await;
44+
crate::sync::try_pull(&mut context, &self.conn).await?;
45+
self.needs_pull.store(false, Ordering::Relaxed);
46+
}
47+
self.inner.run(params).await
4148
}
4249

4350
fn interrupt(&mut self) -> Result<()> {
@@ -64,3 +71,4 @@ impl Stmt for SyncedStatement {
6471
self.inner.columns()
6572
}
6673
}
74+

0 commit comments

Comments
 (0)