Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
426 changes: 123 additions & 303 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion bench-vortex/src/engines/ddb2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl DuckDBCtx {
// TODO: handle multiple queries
trace!("execute duckdb query: {}", query);
let time_instant = Instant::now();
self.connection.execute(query)?;
self.connection.query(query)?;
let query_time = time_instant.elapsed();
trace!("query completed in {:.3}s", query_time.as_secs_f64());

Expand Down
7 changes: 1 addition & 6 deletions vortex-duckdb-ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,9 @@ path = "src/lib.rs"
crate-type = ["staticlib", "cdylib", "rlib"]

[dependencies]
anyhow = { workspace = true }
bitvec = { workspace = true }
crossbeam-queue = { workspace = true }
# duckdb-rs using arrow-rs v55.1.0 with decimal32/64 types cherry-picked on top
# (https://github.com/apache/arrow-rs/pull/7098) this is will be removed once we
# hit 56 (hopefully in july). required since duckdb returns decimal32/64 from scans.
duckdb = { git = "https://github.com/vortex-data/duckdb-rs", rev = "247ffb36c41bd44bb18e586bdd6640a95783bb5e", features = [
"vtab-full",
] }
glob = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
Expand Down
217 changes: 186 additions & 31 deletions vortex-duckdb-ext/src/duckdb/connection.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::ffi::CStr;
use std::ptr;

use vortex::error::{VortexResult, vortex_err};

use crate::duckdb::Database;
use crate::duckdb::{Database, QueryResult};
use crate::{cpp, duckdb_try, wrapper};

wrapper!(
Expand All @@ -22,8 +23,8 @@ impl Connection {
Ok(unsafe { Self::own(ptr) })
}

/// Execute SQL query and return the row count.
pub fn execute_and_get_row_count(&self, query: &str) -> VortexResult<usize> {
/// Execute SQL query and return the result.
pub fn query(&self, query: &str) -> VortexResult<QueryResult> {
let mut result: cpp::duckdb_result = unsafe { std::mem::zeroed() };
let query_cstr =
std::ffi::CString::new(query).map_err(|_| vortex_err!("Invalid query string"))?;
Expand All @@ -36,48 +37,202 @@ impl Connection {
if error_ptr.is_null() {
"Unknown DuckDB error".to_string()
} else {
std::ffi::CStr::from_ptr(error_ptr)
.to_string_lossy()
.into_owned()
CStr::from_ptr(error_ptr).to_string_lossy().into_owned()
}
};

unsafe { cpp::duckdb_destroy_result(&mut result) };
return Err(vortex_err!("Failed to execute query: {}", error_msg));
}

let row_count = unsafe { cpp::duckdb_row_count(&mut result).try_into()? };
unsafe { cpp::duckdb_destroy_result(&mut result) };
Ok(unsafe { QueryResult::new(result) })
}
}

Ok(row_count)
#[cfg(test)]
mod tests {
use super::*;

fn test_connection() -> VortexResult<Connection> {
let db = Database::open_in_memory()?;
db.connect()
}

/// Execute SQL query.
pub fn execute(&self, query: &str) -> VortexResult<()> {
let mut result: cpp::duckdb_result = unsafe { std::mem::zeroed() };
let query_cstr =
std::ffi::CString::new(query).map_err(|_| vortex_err!("Invalid query string"))?;
#[test]
fn test_connection_creation() {
let conn = test_connection();
assert!(conn.is_ok());
}

let status = unsafe { cpp::duckdb_query(self.as_ptr(), query_cstr.as_ptr(), &mut result) };
#[test]
fn test_execute_success() {
let conn = test_connection().unwrap();
let result = conn.query("SELECT 1");
assert!(result.is_ok());
}

if status != cpp::duckdb_state::DuckDBSuccess {
let error_msg = unsafe {
let error_ptr = cpp::duckdb_result_error(&mut result);
if error_ptr.is_null() {
"Unknown DuckDB error".to_string()
} else {
std::ffi::CStr::from_ptr(error_ptr)
.to_string_lossy()
.into_owned()
}
};
#[test]
fn test_execute_invalid_sql() {
let conn = test_connection().unwrap();
let result = conn.query("INVALID SQL STATEMENT");
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("Failed to execute query"));
}

unsafe { cpp::duckdb_destroy_result(&mut result) };
return Err(vortex_err!("Failed to execute query: {}", error_msg));
}
#[test]
fn test_execute_with_null_bytes() {
let conn = test_connection().unwrap();
let result = conn.query("SELECT\0 1");
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("Invalid query string"));
}

#[test]
fn test_query_and_get_row_count_select() {
let conn = test_connection().unwrap();
let result = conn.query("SELECT 1, 2, 3").unwrap();
assert_eq!(result.row_count().unwrap(), 1);
}

#[test]
fn test_query_and_get_row_count_create_table() {
let conn = test_connection().unwrap();

// CREATE TABLE should return 0 rows
let result = conn
.query("CREATE TABLE test (id INTEGER, name VARCHAR)")
.unwrap();
assert_eq!(result.row_count().unwrap(), 0);
}

#[test]
fn test_query_and_get_row_count_insert() {
let conn = test_connection().unwrap();
conn.query("CREATE TABLE test (id INTEGER, name VARCHAR)")
.unwrap();

let result = conn
.query("INSERT INTO test VALUES (1, 'Alice'), (2, 'Bob')")
.unwrap();

assert_eq!(result.row_count().unwrap(), 2);
}

#[test]
fn test_query_invalid_sql() {
let conn = test_connection().unwrap();
let result = conn.query("INVALID SQL");
assert!(result.is_err());
}

#[test]
fn test_query_single_value() {
let conn = test_connection().unwrap();
let result = conn.query("SELECT 42").unwrap();

assert_eq!(result.column_count().unwrap(), 1);
assert_eq!(result.row_count().unwrap(), 1);
assert_eq!(result.get::<i64>(0, 0).unwrap(), 42);
}

#[test]
fn test_query_multiple_rows() {
let conn = test_connection().unwrap();
conn.query("CREATE TABLE test (id INTEGER)").unwrap();
conn.query("INSERT INTO test VALUES (1), (2), (3)").unwrap();

let result = conn.query("SELECT id FROM test ORDER BY id").unwrap();

assert_eq!(result.column_count().unwrap(), 1);
assert_eq!(result.row_count().unwrap(), 3);
assert_eq!(result.get::<i64>(0, 0).unwrap(), 1);
assert_eq!(result.get::<i64>(0, 1).unwrap(), 2);
assert_eq!(result.get::<i64>(0, 2).unwrap(), 3);
}

#[test]
fn test_query_multiple_columns() {
let conn = test_connection().unwrap();
let result = conn.query("SELECT 1 as num, 'hello' as text").unwrap();

assert_eq!(result.column_count().unwrap(), 2);
assert_eq!(result.row_count().unwrap(), 1);
assert_eq!(result.column_name(0).unwrap(), "num");
assert_eq!(result.column_name(1).unwrap(), "text");
assert_eq!(result.get::<i64>(0, 0).unwrap(), 1);
assert_eq!(result.get::<String>(1, 0).unwrap(), "hello");
}

#[test]
fn test_query_bounds_checking() {
let conn = test_connection().unwrap();
let result = conn.query("SELECT 1").unwrap();

// Test row bounds
assert!(result.get::<i64>(0, 1).is_err());

// Test column bounds
assert!(result.get::<i64>(1, 0).is_err());
}

#[test]
fn test_query_column_types() {
let conn = test_connection().unwrap();
let result = conn
.query("SELECT 1 as int_col, 'text' as str_col")
.unwrap();

assert_eq!(result.column_type(0), cpp::DUCKDB_TYPE::DUCKDB_TYPE_INTEGER);
assert_eq!(result.column_type(1), cpp::DUCKDB_TYPE::DUCKDB_TYPE_VARCHAR);
}

#[test]
fn test_null_handling() {
let conn = test_connection().unwrap();
let result = conn
.query("SELECT NULL as null_col, 1 as not_null_col")
.unwrap();

assert!(result.is_null(0, 0).unwrap());
assert!(!result.is_null(1, 0).unwrap());
}

#[test]
fn test_type_conversion() {
let conn = test_connection().unwrap();
let result = conn
.query("SELECT 42::TINYINT, 42::SMALLINT, 42::INTEGER, 42::BIGINT")
.unwrap();

assert_eq!(result.get::<i64>(0, 0).unwrap(), 42); // TINYINT -> i64
assert_eq!(result.get::<i64>(1, 0).unwrap(), 42); // SMALLINT -> i64
assert_eq!(result.get::<i64>(2, 0).unwrap(), 42); // INTEGER -> i64
assert_eq!(result.get::<i64>(3, 0).unwrap(), 42); // BIGINT -> i64
}

#[test]
fn test_query_and_get_row_count_update() {
let conn = test_connection().unwrap();
conn.query("CREATE TABLE test (id INTEGER, name VARCHAR)")
.unwrap();
conn.query("INSERT INTO test VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie')")
.unwrap();

let result = conn
.query("UPDATE test SET name = 'Updated' WHERE id <= 2")
.unwrap();
assert_eq!(result.row_count().unwrap(), 2);
}

unsafe { cpp::duckdb_destroy_result(&mut result) };
#[test]
fn test_query_and_get_row_count_delete() {
let conn = test_connection().unwrap();
conn.query("CREATE TABLE test (id INTEGER)").unwrap();
conn.query("INSERT INTO test VALUES (1), (2), (3)").unwrap();

Ok(())
let result = conn.query("DELETE FROM test WHERE id > 1").unwrap();
assert_eq!(result.row_count().unwrap(), 2);
}
}
2 changes: 2 additions & 0 deletions vortex-duckdb-ext/src/duckdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod data_chunk;
mod database;
mod expr;
mod logical_type;
mod query_result;
mod scalar_function;
mod selection_vector;
mod table_filter;
Expand All @@ -20,6 +21,7 @@ pub use data_chunk::*;
pub use database::*;
pub use expr::*;
pub use logical_type::*;
pub use query_result::*;
pub use scalar_function::*;
pub use selection_vector::*;
pub use table_filter::*;
Expand Down
Loading
Loading