Skip to content

Commit 9e6002f

Browse files
committed
bug: fix close & transaction handling
1 parent 5efbb52 commit 9e6002f

File tree

3 files changed

+102
-49
lines changed

3 files changed

+102
-49
lines changed

src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ pub unsafe extern "C" fn sqlite3_open_v2(
6767

6868
let reqwest_client = reqwest::Client::builder()
6969
.user_agent("libsqlite3_turso/1.0.0")
70+
.timeout(std::time::Duration::from_secs(30))
7071
.build()
7172
.unwrap();
7273

@@ -81,6 +82,7 @@ pub unsafe extern "C" fn sqlite3_open_v2(
8182
error_stack: Mutex::new(vec![]),
8283
transaction_baton: Mutex::new(None),
8384
last_insert_rowid: Mutex::new(None),
85+
transaction_has_began: Mutex::new(false),
8486
delete_hook: Mutex::new(None),
8587
insert_hook: Mutex::new(None),
8688
update_hook: Mutex::new(None),
@@ -383,7 +385,7 @@ pub extern "C" fn sqlite3_errmsg(db: *mut SQLite3) -> *const c_char {
383385

384386
if let Some(error_entry) = sqlite::get_latest_error(db) {
385387
match CString::new(error_entry.0) {
386-
Ok(c_string) => c_string.as_ptr(),
388+
Ok(c_string) => c_string.into_raw(),
387389
Err(_) => std::ptr::null(),
388390
}
389391
} else {
@@ -669,7 +671,7 @@ pub extern "C" fn sqlite3_get_autocommit(db: *mut SQLite3) -> c_int {
669671

670672
let db = unsafe { &*db };
671673

672-
if db.transaction_active() {
674+
if db.has_began_transaction() {
673675
0 // Transaction is active
674676
} else {
675677
1 // Autocommit mode

src/proxy.rs

Lines changed: 90 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use reqwest::Client;
22
use serde::Deserialize;
3-
use std::{collections::HashMap, error::Error};
3+
use std::{collections::HashMap, error::Error, time::Duration};
44

55
use crate::{
66
sqlite::{SQLite3, Value},
@@ -51,12 +51,6 @@ pub async fn execute_sql_and_params(
5151
) -> Result<RemoteSqliteResponse, Box<dyn Error>> {
5252
let mut query_request = serde_json::Map::new();
5353

54-
if let Some(b) = baton {
55-
query_request.insert("baton".to_string(), serde_json::json!(b));
56-
}
57-
58-
let can_keep_open = !(baton.is_some() && sql.contains("COMMIT"));
59-
6054
let mut json_array: Vec<serde_json::Value> = Vec::new();
6155

6256
json_array.push(serde_json::json!({
@@ -67,7 +61,9 @@ pub async fn execute_sql_and_params(
6761
}
6862
}));
6963

70-
if !can_keep_open {
64+
if db.has_began_transaction() {
65+
query_request.insert("baton".to_string(), serde_json::json!(baton));
66+
} else {
7167
json_array.push(serde_json::json!({
7268
"type": "close"
7369
}));
@@ -122,55 +118,107 @@ async fn send_sql_request(
122118
Ok(parsed)
123119
}
124120

125-
async fn send_remote_request(
121+
pub async fn send_remote_request(
126122
client: &Client,
127123
turso_config: &TursoConfig,
128124
path: &str,
129125
request: serde_json::Value,
130126
) -> Result<serde_json::Value, Box<dyn Error>> {
131-
let response = client
132-
.post(format!("https://{}/{}", turso_config.db_url, path))
133-
.header("Content-Type", "application/json")
134-
.header("Authorization", format!("Bearer {}", turso_config.db_token))
135-
.json(&request)
136-
.send()
137-
.await?;
127+
const MAX_ATTEMPTS: usize = 5;
128+
let mut last_error = String::new();
129+
130+
for attempt in 1..=MAX_ATTEMPTS {
131+
if cfg!(debug_assertions) {
132+
println!(
133+
"Attempt {}: Sending request to {}",
134+
attempt, turso_config.db_url
135+
);
136+
}
138137

139-
let status = response.status();
140-
let response_text = response.text().await?;
138+
let resp = client
139+
.post(format!("https://{}/{}", turso_config.db_url, path))
140+
.header("Content-Type", "application/json")
141+
.header("Authorization", format!("Bearer {}", turso_config.db_token))
142+
.json(&request)
143+
.send()
144+
.await;
145+
146+
let resp = match resp {
147+
Ok(r) => r,
148+
Err(e) => {
149+
last_error = format!("Request failed: {}", e);
150+
if attempt < MAX_ATTEMPTS {
151+
tokio::time::sleep(Duration::from_millis(100)).await;
152+
continue;
153+
} else {
154+
return Err(last_error.into());
155+
}
156+
}
157+
};
158+
159+
let status = resp.status();
160+
let text = match resp.text().await {
161+
Ok(t) => t,
162+
Err(e) => {
163+
last_error = format!("Failed to read response body: {}", e);
164+
if attempt < MAX_ATTEMPTS {
165+
tokio::time::sleep(Duration::from_millis(100)).await;
166+
continue;
167+
} else {
168+
return Err(last_error.into());
169+
}
170+
}
171+
};
141172

142-
if cfg!(debug_assertions) {
143-
println!("Received Response: {}\n", &response_text);
144-
}
173+
if cfg!(debug_assertions) {
174+
println!("Response received: {}", text);
175+
}
145176

146-
if !status.is_success() {
147-
if let Ok(error_body) = serde_json::from_str::<serde_json::Value>(&response_text) {
148-
if let Some(error_message) = error_body.get("error").and_then(|e| e.as_str()) {
149-
return Err(error_message.into());
177+
if !status.is_success() {
178+
if let Ok(err_json) = serde_json::from_str::<serde_json::Value>(&text) {
179+
if let Some(msg) = err_json.get("error").and_then(|v| v.as_str()) {
180+
last_error = format!("API error: {}", msg);
181+
} else {
182+
last_error = format!("HTTP error {}: {}", status, text);
183+
}
184+
} else {
185+
last_error = format!("HTTP error {} with invalid JSON: {}", status, text);
150186
}
151-
}
152-
return Err(format!("LibSqlite3_Turso Error: {}", response_text).into());
153-
}
154187

155-
let parsed_response = serde_json::from_str(&response_text);
156-
if parsed_response.is_err() {
157-
return Err(format!("Failed to parse response: {}", parsed_response.unwrap_err()).into());
158-
}
188+
if attempt < MAX_ATTEMPTS {
189+
tokio::time::sleep(Duration::from_millis(100)).await;
190+
continue;
191+
} else {
192+
return Err(last_error.into());
193+
}
194+
}
159195

160-
let parsed_response: serde_json::Value = parsed_response.unwrap();
161-
if let Some(results) = parsed_response.get("results").and_then(|r| r.as_array()) {
162-
for result in results {
163-
if let Some(error) = result
164-
.get("error")
165-
.and_then(|e| e.get("message"))
166-
.and_then(|m| m.as_str())
167-
{
168-
return Err(error.into());
196+
let parsed: serde_json::Value = match serde_json::from_str(&text) {
197+
Ok(v) => v,
198+
Err(e) => return Err(format!("Failed to parse JSON: {}", e).into()),
199+
};
200+
201+
// Check for embedded DB errors
202+
if let Some(results) = parsed.get("results").and_then(|r| r.as_array()) {
203+
for result in results {
204+
if let Some(msg) = result
205+
.get("error")
206+
.and_then(|e| e.get("message"))
207+
.and_then(|m| m.as_str())
208+
{
209+
return Err(msg.to_string().into());
210+
}
169211
}
170212
}
213+
214+
return Ok(parsed);
171215
}
172216

173-
Ok(parsed_response)
217+
Err(format!(
218+
"Failed to get successful response after {} attempts: {}",
219+
MAX_ATTEMPTS, last_error
220+
)
221+
.into())
174222
}
175223

176224
pub fn convert_params_to_json(params: &HashMap<i32, Value>) -> Vec<serde_json::Value> {

src/sqlite.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ pub struct SQLite3 {
6464
pub last_insert_rowid: Mutex<Option<i64>>, // Last inserted row ID
6565
pub error_stack: Mutex<Vec<(String, c_int)>>, // Stack to store error messages
6666
pub transaction_baton: Mutex<Option<String>>, // Baton for transaction management
67+
pub transaction_has_began: Mutex<bool>, // Flag to check if a transaction has started
6768
pub update_hook: Mutex<Option<(SqliteHook, *mut c_void)>>, // Update hook callback
6869
pub insert_hook: Mutex<Option<(SqliteHook, *mut c_void)>>, // Insert hook callback
6970
pub delete_hook: Mutex<Option<(SqliteHook, *mut c_void)>>, // Delete hook callback
@@ -114,8 +115,8 @@ impl SQLite3 {
114115
SQLITE_OK
115116
}
116117

117-
pub fn transaction_active(&self) -> bool {
118-
self.transaction_baton.lock().unwrap().is_some()
118+
pub fn has_began_transaction(&self) -> bool {
119+
*self.transaction_has_began.lock().unwrap()
119120
}
120121
}
121122

@@ -172,10 +173,11 @@ pub fn reset_txn_on_db(db: *mut SQLite3) -> c_int {
172173

173174
let db = unsafe { &mut *db };
174175

175-
if !db.transaction_active() {
176+
if !db.has_began_transaction() {
176177
return SQLITE_OK;
177178
}
178179

180+
*db.transaction_has_began.lock().unwrap() = false;
179181
db.transaction_baton.lock().unwrap().take();
180182

181183
SQLITE_OK
@@ -284,7 +286,7 @@ pub async fn begin_tnx_on_db(db: *mut SQLite3) -> Result<c_int, Box<dyn Error>>
284286

285287
let db = unsafe { &mut *db };
286288

287-
if db.transaction_active() {
289+
if db.has_began_transaction() {
288290
push_error(
289291
db,
290292
(
@@ -297,6 +299,7 @@ pub async fn begin_tnx_on_db(db: *mut SQLite3) -> Result<c_int, Box<dyn Error>>
297299

298300
let baton_value = get_transaction_baton(&db.client, &db.turso_config).await?;
299301
db.transaction_baton.lock().unwrap().replace(baton_value);
302+
*db.transaction_has_began.lock().unwrap() = true;
300303

301304
Ok(SQLITE_OK)
302305
}
@@ -308,7 +311,7 @@ pub async fn commit_tnx_on_db(db: *mut SQLite3) -> Result<c_int, Box<dyn Error>>
308311

309312
let db = unsafe { &mut *db };
310313

311-
if !db.transaction_active() {
314+
if !db.has_began_transaction() {
312315
push_error(
313316
db,
314317
(

0 commit comments

Comments
 (0)