Skip to content

Commit f7e1c86

Browse files
committed
improve lazy pull
1 parent e73cc82 commit f7e1c86

File tree

3 files changed

+90
-8
lines changed

3 files changed

+90
-8
lines changed

libsql/src/sync.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use uuid::Uuid;
1313
mod test;
1414

1515
pub mod connection;
16+
pub mod statement;
1617
pub mod transaction;
1718

1819
const METADATA_VERSION: u32 = 0;

libsql/src/sync/connection.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::sync::{
1515
use std::time::Duration;
1616
use tokio::sync::Mutex;
1717

18-
use super::transaction::SyncedTx;
18+
use super::{statement::SyncedStatement, transaction::SyncedTx};
1919

2020
#[derive(Clone)]
2121
pub struct SyncedConnection {
@@ -110,9 +110,10 @@ impl Conn for SyncedConnection {
110110

111111
async fn execute_batch(&self, sql: &str) -> Result<BatchRows> {
112112
if self.should_execute_local(sql).await? {
113-
if self.needs_pull.swap(false, Ordering::Relaxed) {
113+
if self.needs_pull.load(Ordering::Relaxed) {
114114
let mut context = self.context.lock().await;
115115
crate::sync::try_pull(&mut context, &self.local).await?;
116+
self.needs_pull.store(false, Ordering::Relaxed);
116117
}
117118
self.local.execute_batch(sql)
118119
} else {
@@ -122,9 +123,10 @@ impl Conn for SyncedConnection {
122123

123124
async fn execute_transactional_batch(&self, sql: &str) -> Result<BatchRows> {
124125
if self.should_execute_local(sql).await? {
125-
if self.needs_pull.swap(false, Ordering::Relaxed) {
126+
if self.needs_pull.load(Ordering::Relaxed) {
126127
let mut context = self.context.lock().await;
127128
crate::sync::try_pull(&mut context, &self.local).await?;
129+
self.needs_pull.store(false, Ordering::Relaxed);
128130
}
129131
self.local.execute_transactional_batch(sql)?;
130132
Ok(BatchRows::empty())
@@ -135,12 +137,17 @@ impl Conn for SyncedConnection {
135137

136138
async fn prepare(&self, sql: &str) -> Result<Statement> {
137139
if self.should_execute_local(sql).await? {
138-
if self.needs_pull.swap(false, Ordering::Relaxed) {
139-
let mut context = self.context.lock().await;
140-
crate::sync::try_pull(&mut context, &self.local).await?;
141-
}
142-
Ok(Statement {
140+
let stmt = Statement {
143141
inner: Box::new(LibsqlStmt(self.local.prepare(sql)?)),
142+
};
143+
144+
Ok(Statement {
145+
inner: Box::new(SyncedStatement {
146+
conn: self.local.clone(),
147+
inner: stmt,
148+
context: self.context.clone(),
149+
needs_pull: self.needs_pull.clone(),
150+
}),
144151
})
145152
} else {
146153
let stmt = Statement {

libsql/src/sync/statement.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use crate::{
2+
local::{self},
3+
params::Params,
4+
statement::Stmt,
5+
sync::SyncContext, Column, Result, Rows, Statement,
6+
};
7+
use std::sync::{atomic::{AtomicBool, Ordering}, Arc};
8+
use tokio::sync::Mutex;
9+
10+
pub struct SyncedStatement {
11+
pub conn: local::Connection,
12+
pub inner: Statement,
13+
pub context: Arc<Mutex<SyncContext>>,
14+
pub needs_pull: Arc<AtomicBool>,
15+
}
16+
17+
#[async_trait::async_trait]
18+
impl Stmt for SyncedStatement {
19+
fn finalize(&mut self) {
20+
self.inner.finalize()
21+
}
22+
23+
async fn execute(&mut self, params: &Params) -> Result<usize> {
24+
if self.needs_pull.load(Ordering::Relaxed) {
25+
let mut context = self.context.lock().await;
26+
crate::sync::try_pull(&mut context, &self.conn).await?;
27+
self.needs_pull.store(false, Ordering::Relaxed);
28+
}
29+
self.inner.execute(params).await
30+
}
31+
32+
async fn query(&mut self, params: &Params) -> Result<Rows> {
33+
if self.needs_pull.load(Ordering::Relaxed) {
34+
let mut context = self.context.lock().await;
35+
crate::sync::try_pull(&mut context, &self.conn).await?;
36+
self.needs_pull.store(false, Ordering::Relaxed);
37+
}
38+
self.inner.query(params).await
39+
}
40+
41+
async fn run(&mut self, params: &Params) -> Result<()> {
42+
if self.needs_pull.load(Ordering::Relaxed) {
43+
let mut context = self.context.lock().await;
44+
crate::sync::try_pull(&mut context, &self.conn).await?;
45+
self.needs_pull.store(false, Ordering::Relaxed);
46+
}
47+
self.inner.run(params).await
48+
}
49+
50+
fn interrupt(&mut self) -> Result<()> {
51+
self.inner.interrupt()
52+
}
53+
54+
fn reset(&mut self) {
55+
self.inner.reset()
56+
}
57+
58+
fn parameter_count(&self) -> usize {
59+
self.inner.parameter_count()
60+
}
61+
62+
fn parameter_name(&self, idx: i32) -> Option<&str> {
63+
self.inner.parameter_name(idx)
64+
}
65+
66+
fn column_count(&self) -> usize {
67+
self.inner.column_count()
68+
}
69+
70+
fn columns(&self) -> Vec<Column> {
71+
self.inner.columns()
72+
}
73+
}
74+

0 commit comments

Comments
 (0)