Skip to content

Commit e73cc82

Browse files
committed
fix: pull on read
1 parent eecb25d commit e73cc82

File tree

4 files changed

+23
-79
lines changed

4 files changed

+23
-79
lines changed

libsql/src/database.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,7 @@ impl Database {
712712
read_your_writes: *read_your_writes,
713713
context: db.sync_ctx.clone().unwrap(),
714714
state: std::sync::Arc::new(Mutex::new(State::Init)),
715+
needs_pull: std::sync::atomic::AtomicBool::new(false).into(),
715716
};
716717

717718
let conn = std::sync::Arc::new(synced);

libsql/src/sync.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use uuid::Uuid;
1313
mod test;
1414

1515
pub mod connection;
16-
pub mod statement;
1716
pub mod transaction;
1817

1918
const METADATA_VERSION: u32 = 0;

libsql/src/sync/connection.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@ use crate::{
88
sync::SyncContext,
99
BatchRows, Error, Result, Statement, Transaction, TransactionBehavior,
1010
};
11-
use std::sync::Arc;
11+
use std::sync::{
12+
atomic::{AtomicBool, Ordering},
13+
Arc,
14+
};
1215
use std::time::Duration;
1316
use tokio::sync::Mutex;
1417

15-
use super::{statement::SyncedStatement, transaction::SyncedTx};
18+
use super::transaction::SyncedTx;
1619

1720
#[derive(Clone)]
1821
pub struct SyncedConnection {
@@ -21,6 +24,7 @@ pub struct SyncedConnection {
2124
pub read_your_writes: bool,
2225
pub context: Arc<Mutex<SyncContext>>,
2326
pub state: Arc<Mutex<State>>,
27+
pub needs_pull: Arc<AtomicBool>,
2428
}
2529

2630
impl SyncedConnection {
@@ -89,7 +93,7 @@ impl SyncedConnection {
8993
_ => {
9094
*state = predicted_end_state;
9195
false
92-
},
96+
}
9397
};
9498

9599
Ok(should_execute_local)
@@ -106,6 +110,10 @@ impl Conn for SyncedConnection {
106110

107111
async fn execute_batch(&self, sql: &str) -> Result<BatchRows> {
108112
if self.should_execute_local(sql).await? {
113+
if self.needs_pull.swap(false, Ordering::Relaxed) {
114+
let mut context = self.context.lock().await;
115+
crate::sync::try_pull(&mut context, &self.local).await?;
116+
}
109117
self.local.execute_batch(sql)
110118
} else {
111119
self.remote.execute_batch(sql).await
@@ -114,6 +122,10 @@ impl Conn for SyncedConnection {
114122

115123
async fn execute_transactional_batch(&self, sql: &str) -> Result<BatchRows> {
116124
if self.should_execute_local(sql).await? {
125+
if self.needs_pull.swap(false, Ordering::Relaxed) {
126+
let mut context = self.context.lock().await;
127+
crate::sync::try_pull(&mut context, &self.local).await?;
128+
}
117129
self.local.execute_transactional_batch(sql)?;
118130
Ok(BatchRows::empty())
119131
} else {
@@ -123,6 +135,10 @@ impl Conn for SyncedConnection {
123135

124136
async fn prepare(&self, sql: &str) -> Result<Statement> {
125137
if self.should_execute_local(sql).await? {
138+
if self.needs_pull.swap(false, Ordering::Relaxed) {
139+
let mut context = self.context.lock().await;
140+
crate::sync::try_pull(&mut context, &self.local).await?;
141+
}
126142
Ok(Statement {
127143
inner: Box::new(LibsqlStmt(self.local.prepare(sql)?)),
128144
})
@@ -132,16 +148,10 @@ impl Conn for SyncedConnection {
132148
};
133149

134150
if self.read_your_writes {
135-
Ok(Statement {
136-
inner: Box::new(SyncedStatement {
137-
conn: self.local.clone(),
138-
context: self.context.clone(),
139-
inner: stmt,
140-
}),
141-
})
142-
} else {
143-
Ok(stmt)
151+
self.needs_pull.store(true, Ordering::Relaxed);
144152
}
153+
154+
Ok(stmt)
145155
}
146156
}
147157

libsql/src/sync/statement.rs

Lines changed: 0 additions & 66 deletions
This file was deleted.

0 commit comments

Comments
 (0)