Skip to content

Commit cd69518

Browse files
authored
libsql: add batched rows for local conn (#1499)
* libsql: add batched rows for local conn * continue impl * Fix local batched rows test * remove dbg * fix feature flag
1 parent 2418d55 commit cd69518

File tree

6 files changed

+269
-40
lines changed

6 files changed

+269
-40
lines changed

libsql/src/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ impl BatchRows {
6161
}
6262
}
6363

64-
#[cfg(feature = "hrana")]
64+
#[cfg(any(feature = "hrana", feature = "core"))]
6565
pub(crate) fn new(rows: Vec<Option<Rows>>) -> Self {
6666
Self {
6767
inner: rows.into(),

libsql/src/local/connection.rs

Lines changed: 126 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
#![allow(dead_code)]
22

3+
use crate::local::rows::BatchedRows;
34
use crate::params::Params;
4-
use crate::errors;
5+
use crate::{connection::BatchRows, errors};
56

67
use super::{Database, Error, Result, Rows, RowsFuture, Statement, Transaction};
78

@@ -136,21 +137,93 @@ impl Connection {
136137
///
137138
/// Will return `Err` if `sql` cannot be converted to a C-compatible string
138139
/// or if the underlying SQLite call fails.
139-
pub fn execute_batch<S>(&self, sql: S) -> Result<()>
140+
pub fn execute_batch<S>(&self, sql: S) -> Result<BatchRows>
140141
where
141142
S: Into<String>,
142143
{
143144
let sql = sql.into();
144145
let mut sql = sql.as_str();
145146

147+
let mut batch_rows = Vec::new();
148+
146149
while !sql.is_empty() {
147150
let stmt = self.prepare(sql)?;
148151

149-
if !stmt.inner.raw_stmt.is_null() {
150-
stmt.step()?;
151-
}
152+
let tail = if !stmt.inner.raw_stmt.is_null() {
153+
let returned_rows = stmt.step()?;
152154

153-
let tail = stmt.tail();
155+
let tail = stmt.tail();
156+
157+
// Check if there are rows to be extracted, we must do this upfront due to the lazy
158+
// nature of sqlite and our somewhat hacked batch command.
159+
if returned_rows {
160+
// Extract columns
161+
let cols = stmt
162+
.columns()
163+
.iter()
164+
.enumerate()
165+
.map(|(i, c)| {
166+
use crate::value::ValueType;
167+
168+
let val = stmt.inner.column_type(i as i32);
169+
let t = match val {
170+
libsql_sys::ffi::SQLITE_INTEGER => ValueType::Integer,
171+
libsql_sys::ffi::SQLITE_FLOAT => ValueType::Real,
172+
libsql_sys::ffi::SQLITE_BLOB => ValueType::Blob,
173+
libsql_sys::ffi::SQLITE_TEXT => ValueType::Text,
174+
libsql_sys::ffi::SQLITE_NULL => ValueType::Null,
175+
_ => unreachable!("unknown column type {} at index {}", val, i),
176+
};
177+
178+
(c.name.to_string(), t)
179+
})
180+
.collect::<Vec<_>>();
181+
182+
let mut rows = Vec::new();
183+
184+
// If returned rows we must extract the rows available right away instead of
185+
// using the `Rows` type we have already. This is due to the step api once its
186+
// returned SQLITE_ROWS we must extract them before we call step again.
187+
{
188+
let row = crate::local::Row { stmt: stmt.clone() };
189+
190+
let mut values = Vec::with_capacity(cols.len());
191+
192+
for i in 0..cols.len() {
193+
let value = row.get_value(i as i32)?;
194+
195+
values.push(value);
196+
}
197+
198+
rows.push(values);
199+
}
200+
201+
// Now we can use the normal rows type to extract any n+1 rows
202+
let rows_sys = Rows::new(stmt);
203+
204+
while let Some(row) = rows_sys.next()? {
205+
let mut values = Vec::with_capacity(cols.len());
206+
207+
for i in 0..cols.len() {
208+
let value = row.get_value(i as i32)?;
209+
210+
values.push(value);
211+
}
212+
213+
rows.push(values);
214+
}
215+
216+
rows.len();
217+
218+
batch_rows.push(Some(crate::Rows::new(BatchedRows::new(cols, rows))));
219+
} else {
220+
batch_rows.push(None);
221+
}
222+
223+
tail
224+
} else {
225+
stmt.tail()
226+
};
154227

155228
if tail == 0 || tail >= sql.len() {
156229
break;
@@ -159,12 +232,12 @@ impl Connection {
159232
sql = &sql[tail..];
160233
}
161234

162-
Ok(())
235+
Ok(BatchRows::new(batch_rows))
163236
}
164237

165238
fn execute_transactional_batch_inner<S>(&self, sql: S) -> Result<()>
166-
where
167-
S: Into<String>,
239+
where
240+
S: Into<String>,
168241
{
169242
let sql = sql.into();
170243
let mut sql = sql.as_str();
@@ -177,13 +250,16 @@ impl Connection {
177250
} else {
178251
&sql[..tail]
179252
};
180-
let prefix_count = stmt_sql
181-
.chars()
182-
.take_while(|c| c.is_whitespace())
183-
.count();
253+
let prefix_count = stmt_sql.chars().take_while(|c| c.is_whitespace()).count();
184254
let stmt_sql = &stmt_sql[prefix_count..];
185-
if stmt_sql.starts_with("BEGIN") || stmt_sql.starts_with("COMMIT") || stmt_sql.starts_with("ROLLBACK") || stmt_sql.starts_with("END") {
186-
return Err(Error::TransactionalBatchError("Transactions forbidden inside transactional batch".to_string()));
255+
if stmt_sql.starts_with("BEGIN")
256+
|| stmt_sql.starts_with("COMMIT")
257+
|| stmt_sql.starts_with("ROLLBACK")
258+
|| stmt_sql.starts_with("END")
259+
{
260+
return Err(Error::TransactionalBatchError(
261+
"Transactions forbidden inside transactional batch".to_string(),
262+
));
187263
}
188264

189265
if !stmt.inner.raw_stmt.is_null() {
@@ -299,35 +375,53 @@ impl Connection {
299375
pub fn enable_load_extension(&self, onoff: bool) -> Result<()> {
300376
// SQLITE_DBCONFIG_ENABLE_LOAD_EXTENSION configration verb accepts 2 additional parameters: an on/off flag and a pointer to an c_int where new state of the parameter will be written (or NULL if reporting back the setting is not needed)
301377
// See: https://sqlite.org/c3ref/c_dbconfig_defensive.html#sqlitedbconfigenableloadextension
302-
let err = unsafe { ffi::sqlite3_db_config(self.raw, ffi::SQLITE_DBCONFIG_ENABLE_LOAD_EXTENSION, onoff as i32, std::ptr::null::<c_int>()) };
378+
let err = unsafe {
379+
ffi::sqlite3_db_config(
380+
self.raw,
381+
ffi::SQLITE_DBCONFIG_ENABLE_LOAD_EXTENSION,
382+
onoff as i32,
383+
std::ptr::null::<c_int>(),
384+
)
385+
};
303386
match err {
304387
ffi::SQLITE_OK => Ok(()),
305-
_ => Err(errors::Error::SqliteFailure(err, errors::error_from_code(err))),
388+
_ => Err(errors::Error::SqliteFailure(
389+
err,
390+
errors::error_from_code(err),
391+
)),
306392
}
307393
}
308394

309-
pub fn load_extension(
310-
&self,
311-
dylib_path: &Path,
312-
entry_point: Option<&str>,
313-
) -> Result<()> {
395+
pub fn load_extension(&self, dylib_path: &Path, entry_point: Option<&str>) -> Result<()> {
314396
let mut raw_err_msg: *mut std::ffi::c_char = std::ptr::null_mut();
315397
let dylib_path = match dylib_path.to_str() {
316-
Some(dylib_path) => {
317-
std::ffi::CString::new(dylib_path).unwrap()
318-
},
319-
None => return Err(crate::Error::Misuse(format!(
320-
"dylib path is not a valid utf8 string"
321-
))),
398+
Some(dylib_path) => std::ffi::CString::new(dylib_path).unwrap(),
399+
None => {
400+
return Err(crate::Error::Misuse(format!(
401+
"dylib path is not a valid utf8 string"
402+
)))
403+
}
322404
};
323405
let err = match entry_point {
324406
Some(entry_point) => {
325407
let entry_point = std::ffi::CString::new(entry_point).unwrap();
326-
unsafe { ffi::sqlite3_load_extension(self.raw, dylib_path.as_ptr(), entry_point.as_ptr(), &mut raw_err_msg) }
327-
}
328-
None => {
329-
unsafe { ffi::sqlite3_load_extension(self.raw, dylib_path.as_ptr(), std::ptr::null(), &mut raw_err_msg) }
408+
unsafe {
409+
ffi::sqlite3_load_extension(
410+
self.raw,
411+
dylib_path.as_ptr(),
412+
entry_point.as_ptr(),
413+
&mut raw_err_msg,
414+
)
415+
}
330416
}
417+
None => unsafe {
418+
ffi::sqlite3_load_extension(
419+
self.raw,
420+
dylib_path.as_ptr(),
421+
std::ptr::null(),
422+
&mut raw_err_msg,
423+
)
424+
},
331425
};
332426
match err {
333427
ffi::SQLITE_OK => Ok(()),

libsql/src/local/impls.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ impl Conn for LibsqlConnection {
2424
}
2525

2626
async fn execute_batch(&self, sql: &str) -> Result<BatchRows> {
27-
self.conn.execute_batch(sql)?;
28-
Ok(BatchRows::empty())
27+
self.conn.execute_batch(sql)
2928
}
3029

3130
async fn execute_transactional_batch(&self, sql: &str) -> Result<BatchRows> {

libsql/src/local/rows.rs

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
use crate::local::{Connection, Statement};
22
use crate::params::Params;
3+
use crate::rows::{RowInner, RowsInner};
34
use crate::{errors, Error, Result};
45
use crate::{Value, ValueRef};
56
use libsql_sys::ValueType;
67

7-
use std::fmt;
88
use std::cell::RefCell;
9+
use std::collections::VecDeque;
910
use std::ffi::c_char;
11+
use std::fmt;
12+
use std::sync::Arc;
1013
/// Query result rows.
1114
#[derive(Debug, Clone)]
1215
pub struct Rows {
@@ -165,7 +168,9 @@ impl fmt::Debug for Row {
165168
ValueRef::Null => dbg_map.value(&(value_type, ())),
166169
ValueRef::Integer(i) => dbg_map.value(&(value_type, i)),
167170
ValueRef::Real(f) => dbg_map.value(&(value_type, f)),
168-
ValueRef::Text(s) => dbg_map.value(&(value_type, String::from_utf8_lossy(s))),
171+
ValueRef::Text(s) => {
172+
dbg_map.value(&(value_type, String::from_utf8_lossy(s)))
173+
}
169174
ValueRef::Blob(b) => dbg_map.value(&(value_type, b.len())),
170175
};
171176
}
@@ -178,6 +183,94 @@ impl fmt::Debug for Row {
178183
}
179184
}
180185

186+
#[derive(Debug)]
187+
pub(crate) struct BatchedRows {
188+
/// Colname, decl_type
189+
cols: Arc<Vec<(String, crate::value::ValueType)>>,
190+
rows: VecDeque<Vec<Value>>,
191+
}
192+
193+
impl BatchedRows {
194+
pub fn new(cols: Vec<(String, crate::value::ValueType)>, rows: Vec<Vec<Value>>) -> Self {
195+
Self {
196+
cols: Arc::new(cols),
197+
rows: rows.into(),
198+
}
199+
}
200+
}
201+
202+
#[async_trait::async_trait]
203+
impl RowsInner for BatchedRows {
204+
async fn next(&mut self) -> Result<Option<crate::Row>> {
205+
let cols = self.cols.clone();
206+
let row = self.rows.pop_front();
207+
208+
if let Some(row) = row {
209+
Ok(Some(crate::Row {
210+
inner: Box::new(BatchedRow { cols, row }),
211+
}))
212+
} else {
213+
Ok(None)
214+
}
215+
}
216+
217+
fn column_count(&self) -> i32 {
218+
self.cols.len() as i32
219+
}
220+
221+
fn column_name(&self, idx: i32) -> Option<&str> {
222+
self.cols.get(idx as usize).map(|s| s.0.as_str())
223+
}
224+
225+
fn column_type(&self, idx: i32) -> Result<crate::value::ValueType> {
226+
self.cols
227+
.get(idx as usize)
228+
.ok_or(Error::InvalidColumnIndex)
229+
.map(|(_, vt)| vt.clone())
230+
}
231+
}
232+
233+
#[derive(Debug)]
234+
pub(crate) struct BatchedRow {
235+
cols: Arc<Vec<(String, crate::value::ValueType)>>,
236+
row: Vec<Value>,
237+
}
238+
239+
impl RowInner for BatchedRow {
240+
fn column_value(&self, idx: i32) -> Result<Value> {
241+
self.row
242+
.get(idx as usize)
243+
.cloned()
244+
.ok_or(Error::InvalidColumnIndex)
245+
}
246+
247+
fn column_name(&self, idx: i32) -> Option<&str> {
248+
self.cols.get(idx as usize).map(|c| c.0.as_str())
249+
}
250+
251+
fn column_str(&self, idx: i32) -> Result<&str> {
252+
self.row
253+
.get(idx as usize)
254+
.ok_or(Error::InvalidColumnIndex)
255+
.and_then(|v| {
256+
v.as_text()
257+
.map(String::as_str)
258+
.ok_or(Error::InvalidColumnType)
259+
})
260+
}
261+
262+
fn column_count(&self) -> usize {
263+
self.cols.len()
264+
}
265+
266+
fn column_type(&self, idx: i32) -> Result<crate::value::ValueType> {
267+
self.cols
268+
.get(idx as usize)
269+
.ok_or(Error::InvalidColumnIndex)
270+
.map(|(_, vt)| vt.clone())
271+
}
272+
}
273+
181274
pub trait FromValue {
182275
fn from_sql(val: libsql_sys::Value) -> Result<Self>
183276
where

libsql/src/local/statement.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,11 +221,12 @@ impl Statement {
221221
}
222222
}
223223

224-
pub(crate) fn step(&self) -> Result<()> {
224+
/// Returns true if this statement has rows ready to be read.
225+
pub(crate) fn step(&self) -> Result<bool> {
225226
let err = self.inner.step();
226227
match err {
227-
crate::ffi::SQLITE_DONE => Ok(()),
228-
crate::ffi::SQLITE_ROW => Err(Error::ExecuteReturnedRows),
228+
crate::ffi::SQLITE_DONE => Ok(false),
229+
crate::ffi::SQLITE_ROW => Ok(true),
229230
_ => Err(Error::SqliteFailure(
230231
errors::extended_error_code(self.conn.raw),
231232
errors::error_from_handle(self.conn.raw),

0 commit comments

Comments
 (0)