diff --git a/Cargo.lock b/Cargo.lock index c6f80014..5e2176d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1153,7 +1153,7 @@ checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" [[package]] name = "datafusion" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "arrow-ipc", @@ -1207,7 +1207,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "async-trait", @@ -1232,7 +1232,7 @@ dependencies = [ [[package]] name = "datafusion-catalog-listing" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "async-trait", @@ -1254,7 +1254,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "ahash 0.8.12", "arrow", @@ -1278,7 +1278,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "futures", "log", @@ -1288,7 +1288,7 @@ dependencies = [ [[package]] name = "datafusion-datasource" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "async-compression", @@ -1324,7 +1324,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-csv" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "async-trait", @@ -1348,7 +1348,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-json" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "async-trait", @@ -1372,7 +1372,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-parquet" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "async-trait", @@ -1404,12 +1404,12 @@ dependencies = [ [[package]] name = "datafusion-doc" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" [[package]] name = "datafusion-execution" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "async-trait", @@ -1428,7 +1428,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "async-trait", @@ -1449,7 +1449,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "datafusion-common", @@ -1473,7 +1473,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "arrow-buffer", @@ -1501,7 +1501,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "ahash 0.8.12", "arrow", @@ -1521,7 +1521,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "ahash 0.8.12", "arrow", @@ -1533,7 +1533,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "arrow-ord", @@ -1554,7 +1554,7 @@ dependencies = [ [[package]] name = "datafusion-functions-table" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "async-trait", @@ -1569,7 +1569,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "datafusion-common", @@ -1586,7 +1586,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -1595,7 +1595,7 @@ dependencies = [ [[package]] name = "datafusion-macros" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "datafusion-expr", "quote", @@ -1605,7 +1605,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "chrono", @@ -1624,7 +1624,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "ahash 0.8.12", "arrow", @@ -1646,7 +1646,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-adapter" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "datafusion-common", @@ -1660,7 +1660,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "ahash 0.8.12", "arrow", @@ -1673,7 +1673,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "datafusion-common", @@ -1692,7 +1692,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "ahash 0.8.12", "arrow", @@ -1722,7 +1722,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "chrono", @@ -1737,7 +1737,7 @@ dependencies = [ [[package]] name = "datafusion-proto-common" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "datafusion-common", @@ -1747,7 +1747,7 @@ dependencies = [ [[package]] name = "datafusion-pruning" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "arrow-schema", @@ -1764,7 +1764,7 @@ dependencies = [ [[package]] name = "datafusion-session" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "async-trait", @@ -1787,7 +1787,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "50.3.0" -source = "git+https://github.com/spiceai/datafusion.git?rev=cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4#cd6b2f85c4b1d9d0b450aacd7f3498a1dea48ec4" +source = "git+https://github.com/spiceai/datafusion.git?rev=41d08054e37e9cdca9b760c16fc2b97a21893af2#41d08054e37e9cdca9b760c16fc2b97a21893af2" dependencies = [ "arrow", "bigdecimal", @@ -1941,8 +1941,8 @@ dependencies = [ [[package]] name = "duckdb" -version = "1.3.2" -source = "git+https://github.com/spiceai/duckdb-rs.git?rev=dd02045c3aa77895723e873222cbe30f5c8f77a9#dd02045c3aa77895723e873222cbe30f5c8f77a9" +version = "1.4.1" +source = "git+https://github.com/spiceai/duckdb-rs.git?rev=0a856c314671a8cae46536c4dca398842817e12d#0a856c314671a8cae46536c4dca398842817e12d" dependencies = [ "arrow", "cast", @@ -1954,7 +1954,6 @@ dependencies = [ "num-integer", "r2d2", "rust_decimal", - "smallvec", "strum", ] @@ -3047,8 +3046,8 @@ checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" [[package]] name = "libduckdb-sys" -version = "1.3.2" -source = "git+https://github.com/spiceai/duckdb-rs.git?rev=dd02045c3aa77895723e873222cbe30f5c8f77a9#dd02045c3aa77895723e873222cbe30f5c8f77a9" +version = "1.4.1" +source = "git+https://github.com/spiceai/duckdb-rs.git?rev=0a856c314671a8cae46536c4dca398842817e12d#0a856c314671a8cae46536c4dca398842817e12d" dependencies = [ "cc", "flate2", diff --git a/Cargo.toml b/Cargo.toml index 74523167..3f62496e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,11 +47,11 @@ datafusion-proto = { version = "50" } datafusion-physical-expr = { version = "50" } datafusion-physical-plan = { version = "50" } datafusion-table-providers = { path = "core" } -duckdb = { version = "=1.3.2", package = "spiceai_duckdb_fork" } # Forked to add support for duckdb_scan_arrow, pending: https://github.com/duckdb/duckdb-rs/pull/488 +duckdb = { version = "=1.4.1"} [patch.crates-io] datafusion-federation = { git = "https://github.com/spiceai/datafusion-federation.git", rev = "d6e7ebd853463acafaf8132dced23c98c45f7947" } # spiceai-50 -duckdb = { git = "https://github.com/spiceai/duckdb-rs.git", rev = "dd02045c3aa77895723e873222cbe30f5c8f77a9" } # spiceai-1.3.2 +duckdb = { git = "https://github.com/spiceai/duckdb-rs.git", rev = "0a856c314671a8cae46536c4dca398842817e12d" } # spiceai-1.4.1 (capi-append) datafusion = { git = "https://github.com/spiceai/datafusion.git", rev = "41d08054e37e9cdca9b760c16fc2b97a21893af2" } # spiceai-50 datafusion-expr = { git = "https://github.com/spiceai/datafusion.git", rev = "41d08054e37e9cdca9b760c16fc2b97a21893af2" } # spiceai-50 diff --git a/core/Cargo.toml b/core/Cargo.toml index e68956e9..cb90beb3 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -123,6 +123,7 @@ postgres = [ ] sqlite = ["dep:rusqlite", "dep:tokio-rusqlite", "dep:arrow-schema"] duckdb = [ + "dep:arrow-array", "dep:duckdb", "dep:r2d2", "dep:uuid", diff --git a/core/src/duckdb.rs b/core/src/duckdb.rs index db73c005..9f676e52 100644 --- a/core/src/duckdb.rs +++ b/core/src/duckdb.rs @@ -32,7 +32,7 @@ use datafusion::{ logical_expr::CreateExternalTable, sql::TableReference, }; -use duckdb::{AccessMode, DuckdbConnectionManager}; +use duckdb::{AccessMode, Connection, DuckdbConnectionManager}; use itertools::Itertools; use snafu::prelude::*; use std::{collections::HashMap, sync::Arc}; @@ -70,6 +70,9 @@ pub enum Error { source: sql_provider_datafusion::Error, }, + #[snafu(display("Unable to clone DuckDB connection handle: {source}"))] + UnableToCloneConnection { source: duckdb::Error }, + #[snafu(display("Unable to downcast DbConnection to DuckDbConnection"))] UnableToDowncastDbConnection {}, @@ -159,8 +162,8 @@ pub enum Error { #[snafu(display("A read provider is required to create a DuckDBTableWriter"))] MissingReadProvider, - #[snafu(display("A pool is required to create a DuckDBTableWriter"))] - MissingPool, + #[snafu(display("A connection is required to create a DuckDBTableWriter"))] + MissingConnection, #[snafu(display("A table definition is required to create a DuckDBTableWriter"))] MissingTableDefinition, @@ -447,13 +450,24 @@ impl TableProviderFactory for DuckDBTableProviderFactory { ); let pool = Arc::new(pool); - make_initial_table(Arc::clone(&table_definition), &pool)?; + let mut write_cxn = pool + .connect() + .await + .map_err(|e| DataFusionError::External(e.into()))?; + + let write_cxn = DuckDB::duckdb_conn(&mut write_cxn) + .map_err(to_datafusion_error)? + .conn + .try_clone() + .map_err(|e| DataFusionError::External(e.into()))?; + + make_initial_table(Arc::clone(&table_definition), &write_cxn)?; let write_settings = DuckDBWriteSettings::from_params(&options); let table_writer_builder = DuckDBTableWriterBuilder::new() .with_table_definition(Arc::clone(&table_definition)) - .with_pool(pool) + .with_connection(write_cxn) .set_on_conflict(on_conflict) .with_write_settings(write_settings); @@ -660,9 +674,22 @@ impl DuckDBTableFactory { let table_name = RelationName::from(table_reference); let table_definition = TableDefinition::new(table_name, Arc::clone(&schema)); + + let mut write_cxn = self + .pool + .connect() + .await + .map_err(|e| DataFusionError::External(e.into()))?; + + let write_cxn = DuckDB::duckdb_conn(&mut write_cxn) + .map_err(to_datafusion_error)? + .conn + .try_clone() + .map_err(|e| DataFusionError::External(e.into()))?; + let table_writer_builder = DuckDBTableWriterBuilder::new() .with_read_provider(read_provider) - .with_pool(Arc::clone(&self.pool)) + .with_connection(write_cxn) .with_table_definition(Arc::new(table_definition)); Ok(Arc::new(table_writer_builder.build()?)) @@ -695,19 +722,12 @@ fn create_table_function_view_name(table_reference: &TableReference) -> TableRef pub(crate) fn make_initial_table( table_definition: Arc, - pool: &Arc, + connection: &Connection, ) -> DataFusionResult<()> { - let cloned_pool = Arc::clone(pool); - let mut db_conn = Arc::clone(&cloned_pool) - .connect_sync() - .context(DbConnectionPoolSnafu) - .map_err(to_datafusion_error)?; - - let duckdb_conn = DuckDB::duckdb_conn(&mut db_conn).map_err(to_datafusion_error)?; + let create_connection = connection.to_owned(); - let tx = duckdb_conn - .conn - .transaction() + let tx = create_connection + .unchecked_transaction() .context(UnableToBeginTransactionSnafu) .map_err(to_datafusion_error)?; @@ -725,7 +745,7 @@ pub(crate) fn make_initial_table( let table_manager = TableManager::new(table_definition); table_manager - .create_table(cloned_pool, &tx) + .create_table(&create_connection, &tx) .map_err(to_datafusion_error)?; tx.commit() @@ -785,11 +805,9 @@ pub(crate) mod tests { .downcast_ref::() .expect("cast to DuckDBTableWriter"); - let mut conn_box = writer.pool().connect_sync().expect("to get connection"); - let conn = DuckDB::duckdb_conn(&mut conn_box).expect("to get DuckDB connection"); + let conn = writer.connection(); let mut stmt = conn - .conn .prepare("SELECT value FROM duckdb_settings() WHERE name = 'memory_limit'") .expect("to prepare statement"); @@ -846,11 +864,9 @@ pub(crate) mod tests { .downcast_ref::() .expect("cast to DuckDBTableWriter"); - let mut conn_box = writer.pool().connect_sync().expect("to get connection"); - let conn = DuckDB::duckdb_conn(&mut conn_box).expect("to get DuckDB connection"); + let conn = writer.connection(); let mut stmt = conn - .conn .prepare("SELECT value FROM duckdb_settings() WHERE name = 'temp_directory'") .expect("to prepare statement"); @@ -903,11 +919,9 @@ pub(crate) mod tests { .downcast_ref::() .expect("cast to DuckDBTableWriter"); - let mut conn_box = writer.pool().connect_sync().expect("to get connection"); - let conn = DuckDB::duckdb_conn(&mut conn_box).expect("to get DuckDB connection"); + let conn = writer.connection(); let mut stmt = conn - .conn .prepare("SELECT value FROM duckdb_settings() WHERE name = 'preserve_insertion_order'") .expect("to prepare statement"); @@ -958,11 +972,9 @@ pub(crate) mod tests { .downcast_ref::() .expect("cast to DuckDBTableWriter"); - let mut conn_box = writer.pool().connect_sync().expect("to get connection"); - let conn = DuckDB::duckdb_conn(&mut conn_box).expect("to get DuckDB connection"); + let conn = writer.connection(); let mut stmt = conn - .conn .prepare("SELECT value FROM duckdb_settings() WHERE name = 'preserve_insertion_order'") .expect("to prepare statement"); diff --git a/core/src/duckdb/creator.rs b/core/src/duckdb/creator.rs index 4a3eb058..79266ddf 100644 --- a/core/src/duckdb/creator.rs +++ b/core/src/duckdb/creator.rs @@ -1,7 +1,9 @@ use crate::sql::arrow_sql_gen::statement::IndexBuilder; -use crate::sql::db_connection_pool::dbconnection::duckdbconn::DuckDbConnection; -use crate::sql::db_connection_pool::duckdbpool::DuckDbConnectionPool; use crate::util::on_conflict::OnConflict; +use crate::util::{ + column_reference::ColumnReference, constraints::get_primary_keys_from_constraints, + indexes::IndexType, +}; use arrow::{ array::{RecordBatch, RecordBatchIterator, RecordBatchReader}, datatypes::SchemaRef, @@ -10,19 +12,13 @@ use arrow::{ use datafusion::common::utils::quote_identifier; use datafusion::common::Constraints; use datafusion::sql::TableReference; -use duckdb::Transaction; +use duckdb::{Connection, Transaction}; use itertools::Itertools; use snafu::prelude::*; use std::collections::HashSet; use std::fmt::Display; use std::sync::Arc; -use super::DuckDB; -use crate::util::{ - column_reference::ColumnReference, constraints::get_primary_keys_from_constraints, - indexes::IndexType, -}; - /// A newtype for a relation name, to better control the inputs for the `TableDefinition`, `TableCreator`, and `ViewCreator`. #[derive(Debug, Clone, PartialEq)] pub struct RelationName(String); @@ -263,14 +259,11 @@ impl TableManager { #[tracing::instrument(level = "debug", skip_all)] pub fn create_table( &self, - pool: Arc, - tx: &Transaction<'_>, + connection: &Connection, + write_tx: &Transaction<'_>, ) -> super::Result<()> { - let mut db_conn = pool.connect_sync().context(super::DbConnectionPoolSnafu)?; - let duckdb_conn = DuckDB::duckdb_conn(&mut db_conn)?; - // create the table with the supplied table name, or a generated internal name - let mut create_stmt = self.get_table_create_statement(duckdb_conn)?; + let mut create_stmt = self.get_table_create_statement(&connection)?; tracing::debug!("{create_stmt}"); let primary_keys = if let Some(constraints) = &self.table_definition.constraints { @@ -284,7 +277,8 @@ impl TableManager { create_stmt = create_stmt.replace(");", &primary_key_clause); } - tx.execute(&create_stmt, []) + write_tx + .execute(&create_stmt, []) .context(super::UnableToCreateDuckDBTableSnafu)?; Ok(()) @@ -405,12 +399,9 @@ impl TableManager { /// DuckDB CREATE TABLE statements aren't supported by sea-query - so we create a temporary table /// from an Arrow schema and ask DuckDB for the CREATE TABLE statement. #[tracing::instrument(level = "debug", skip_all)] - fn get_table_create_statement( - &self, - duckdb_conn: &mut DuckDbConnection, - ) -> super::Result { - let tx = duckdb_conn - .conn + fn get_table_create_statement(&self, connection: &Connection) -> super::Result { + let mut owned_connection = connection.to_owned(); + let tx = owned_connection .transaction() .context(super::UnableToBeginTransactionSnafu)?; let table_name = self.table_name(); @@ -760,9 +751,6 @@ impl ViewCreator { pub(crate) mod tests { use crate::{ duckdb::make_initial_table, - sql::db_connection_pool::{ - dbconnection::duckdbconn::DuckDbConnection, duckdbpool::DuckDbConnectionPool, - }, }; use arrow::array::RecordBatch; use datafusion::{ @@ -783,10 +771,8 @@ pub(crate) mod tests { use super::*; - pub(crate) fn get_mem_duckdb() -> Arc { - Arc::new( - DuckDbConnectionPool::new_memory().expect("to get a memory duckdb connection pool"), - ) + pub(crate) fn get_mem_duckdb() -> Connection { + Connection::open_in_memory().expect("Must open duckdb://:memory") } async fn get_logs_batches() -> Vec { @@ -864,13 +850,13 @@ pub(crate) mod tests { ); for overwrite in &[InsertOp::Append, InsertOp::Overwrite] { - let pool = get_mem_duckdb(); + let connection = get_mem_duckdb(); - make_initial_table(Arc::clone(&table_definition), &pool) + make_initial_table(Arc::clone(&table_definition), &connection) .expect("to make initial table"); let duckdb_sink = DuckDBDataSink::new( - Arc::clone(&pool), + &connection, Arc::clone(&table_definition), *overwrite, None, @@ -885,13 +871,7 @@ pub(crate) mod tests { .await .expect("to write all"); - let mut pool_conn = Arc::clone(&pool).connect_sync().expect("to get connection"); - let conn = pool_conn - .as_any_mut() - .downcast_mut::() - .expect("to downcast to duckdb connection"); - let num_rows = conn - .get_underlying_conn_mut() + let num_rows = connection .query_row(r#"SELECT COUNT(1) FROM "eth.logs""#, [], |r| { r.get::(0) }) @@ -899,10 +879,7 @@ pub(crate) mod tests { assert_eq!(num_rows, rows_written); - let tx = conn - .get_underlying_conn_mut() - .transaction() - .expect("should begin transaction"); + let tx = connection.unchecked_transaction().expect("should begin transaction"); let table_creator = if matches!(overwrite, InsertOp::Overwrite) { let internal_tables: Vec<(RelationName, u64)> = table_definition .list_internal_tables(&tx) @@ -978,13 +955,13 @@ pub(crate) mod tests { ); for overwrite in &[InsertOp::Append, InsertOp::Overwrite] { - let pool = get_mem_duckdb(); + let connection = get_mem_duckdb(); - make_initial_table(Arc::clone(&table_definition), &pool) + make_initial_table(Arc::clone(&table_definition), &connection) .expect("to make initial table"); let duckdb_sink = DuckDBDataSink::new( - Arc::clone(&pool), + &connection, Arc::clone(&table_definition), *overwrite, None, @@ -999,13 +976,7 @@ pub(crate) mod tests { .await .expect("to write all"); - let mut pool_conn = Arc::clone(&pool).connect_sync().expect("to get connection"); - let conn = pool_conn - .as_any_mut() - .downcast_mut::() - .expect("to downcast to duckdb connection"); - let num_rows = conn - .get_underlying_conn_mut() + let num_rows = connection .query_row(r#"SELECT COUNT(1) FROM "eth.logs""#, [], |r| { r.get::(0) }) @@ -1013,10 +984,7 @@ pub(crate) mod tests { assert_eq!(num_rows, rows_written); - let tx = conn - .get_underlying_conn_mut() - .transaction() - .expect("should begin transaction"); + let tx = connection.unchecked_transaction().expect("should begin transaction"); let table_creator = if matches!(overwrite, InsertOp::Overwrite) { let internal_tables: Vec<(RelationName, u64)> = table_definition @@ -1087,18 +1055,12 @@ pub(crate) mod tests { #[tokio::test] async fn test_list_related_tables_from_definition() { let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let connection = get_mem_duckdb(); let table_definition = get_basic_table_definition(); - let mut pool_conn = Arc::clone(&pool).connect_sync().expect("to get connection"); - let conn = pool_conn - .as_any_mut() - .downcast_mut::() - .expect("to downcast to duckdb connection"); - let tx = conn - .get_underlying_conn_mut() - .transaction() + let tx = connection + .unchecked_transaction() .expect("should begin transaction"); // make 3 internal tables @@ -1106,7 +1068,7 @@ pub(crate) mod tests { TableManager::new(Arc::clone(&table_definition)) .with_internal(true) .expect("to create table creator") - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); } @@ -1138,18 +1100,12 @@ pub(crate) mod tests { #[tokio::test] async fn test_list_related_tables_from_creator() { let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let connection = get_mem_duckdb(); let table_definition = get_basic_table_definition(); - let mut pool_conn = Arc::clone(&pool).connect_sync().expect("to get connection"); - let conn = pool_conn - .as_any_mut() - .downcast_mut::() - .expect("to downcast to duckdb connection"); - let tx = conn - .get_underlying_conn_mut() - .transaction() + let tx = connection + .unchecked_transaction() .expect("should begin transaction"); // make 3 internal tables @@ -1157,7 +1113,7 @@ pub(crate) mod tests { TableManager::new(Arc::clone(&table_definition)) .with_internal(true) .expect("to create table creator") - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); } @@ -1167,7 +1123,7 @@ pub(crate) mod tests { .expect("to create table creator"); table_creator - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); let internal_tables = table_creator @@ -1204,18 +1160,12 @@ pub(crate) mod tests { #[tokio::test] async fn test_create_view() { let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let connection = get_mem_duckdb(); let table_definition = get_basic_table_definition(); - let mut pool_conn = Arc::clone(&pool).connect_sync().expect("to get connection"); - let conn = pool_conn - .as_any_mut() - .downcast_mut::() - .expect("to downcast to duckdb connection"); - let tx = conn - .get_underlying_conn_mut() - .transaction() + let tx = connection + .unchecked_transaction() .expect("should begin transaction"); // make a table @@ -1224,7 +1174,7 @@ pub(crate) mod tests { .expect("to create table creator"); table_creator - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); // create a view from the internal table @@ -1247,18 +1197,12 @@ pub(crate) mod tests { #[tokio::test] async fn test_insert_into_tables() { let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let connection = get_mem_duckdb(); let table_definition = get_basic_table_definition(); - let mut pool_conn = Arc::clone(&pool).connect_sync().expect("to get connection"); - let conn = pool_conn - .as_any_mut() - .downcast_mut::() - .expect("to downcast to duckdb connection"); - let tx = conn - .get_underlying_conn_mut() - .transaction() + let tx = connection + .unchecked_transaction() .expect("should begin transaction"); // make a base table @@ -1267,7 +1211,7 @@ pub(crate) mod tests { .expect("to create table creator"); base_table - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); // make an internal table @@ -1276,7 +1220,7 @@ pub(crate) mod tests { .expect("to create table creator"); internal_table - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); // insert some rows directly into the base table @@ -1313,18 +1257,12 @@ pub(crate) mod tests { #[tokio::test] async fn test_lists_base_table_from_definition() { let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let connection = get_mem_duckdb(); let table_definition = get_basic_table_definition(); - let mut pool_conn = Arc::clone(&pool).connect_sync().expect("to get connection"); - let conn = pool_conn - .as_any_mut() - .downcast_mut::() - .expect("to downcast to duckdb connection"); - let tx = conn - .get_underlying_conn_mut() - .transaction() + let tx = connection + .unchecked_transaction() .expect("should begin transaction"); // make a base table @@ -1333,7 +1271,7 @@ pub(crate) mod tests { .expect("to create table creator"); table_creator - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); // list the base table from another base table @@ -1368,7 +1306,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_primary_keys_match() { let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let connection = get_mem_duckdb(); let schema = Arc::new(arrow::datatypes::Schema::new(vec![ arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int64, false), @@ -1380,14 +1318,8 @@ pub(crate) mod tests { .with_constraints(get_pk_constraints(&["id"], Arc::clone(&schema))), ); - let mut pool_conn = Arc::clone(&pool).connect_sync().expect("to get connection"); - let conn = pool_conn - .as_any_mut() - .downcast_mut::() - .expect("to downcast to duckdb connection"); - let tx = conn - .get_underlying_conn_mut() - .transaction() + let tx = connection + .unchecked_transaction() .expect("should begin transaction"); // make 2 internal tables which should have the same indexes @@ -1396,7 +1328,7 @@ pub(crate) mod tests { .expect("to create table creator"); table_creator - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); let table_creator2 = TableManager::new(Arc::clone(&table_definition)) @@ -1404,7 +1336,7 @@ pub(crate) mod tests { .expect("to create table creator"); table_creator2 - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); let primary_keys_match = table_creator @@ -1421,7 +1353,7 @@ pub(crate) mod tests { .expect("to create table creator"); table_creator3 - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); let primary_keys_match = table_creator @@ -1436,7 +1368,7 @@ pub(crate) mod tests { .expect("to create table creator"); table_creator4 - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); let primary_keys_match = table_creator3 @@ -1451,7 +1383,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_indexes_match() { let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let connection = get_mem_duckdb(); let schema = Arc::new(arrow::datatypes::Schema::new(vec![ arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int64, false), @@ -1470,14 +1402,8 @@ pub(crate) mod tests { ), ); - let mut pool_conn = Arc::clone(&pool).connect_sync().expect("to get connection"); - let conn = pool_conn - .as_any_mut() - .downcast_mut::() - .expect("to downcast to duckdb connection"); - let tx = conn - .get_underlying_conn_mut() - .transaction() + let tx = connection + .unchecked_transaction() .expect("should begin transaction"); // make 2 internal tables which should have the same indexes @@ -1486,7 +1412,7 @@ pub(crate) mod tests { .expect("to create table creator"); table_creator - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); table_creator @@ -1498,7 +1424,7 @@ pub(crate) mod tests { .expect("to create table creator"); table_creator2 - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); table_creator2 @@ -1519,7 +1445,7 @@ pub(crate) mod tests { .expect("to create table creator"); table_creator3 - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); table_creator3 @@ -1538,7 +1464,7 @@ pub(crate) mod tests { .expect("to create table creator"); table_creator4 - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); table_creator4 @@ -1557,18 +1483,12 @@ pub(crate) mod tests { #[tokio::test] async fn test_current_schema() { let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let connection = get_mem_duckdb(); let table_definition = get_basic_table_definition(); - let mut pool_conn = Arc::clone(&pool).connect_sync().expect("to get connection"); - let conn = pool_conn - .as_any_mut() - .downcast_mut::() - .expect("to downcast to duckdb connection"); - let tx = conn - .get_underlying_conn_mut() - .transaction() + let tx = connection + .unchecked_transaction() .expect("should begin transaction"); let table_creator = TableManager::new(Arc::clone(&table_definition)) @@ -1576,7 +1496,7 @@ pub(crate) mod tests { .expect("to create table creator"); table_creator - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); let schema = table_creator @@ -1591,7 +1511,7 @@ pub(crate) mod tests { .expect("to create table creator"); table_creator2 - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); let schema2 = table_creator2 @@ -1606,7 +1526,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_internal_tables_exclude_subsets_of_other_tables() { let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let connection = get_mem_duckdb(); let table_definition = get_basic_table_definition(); let other_definition = Arc::new(TableDefinition::new( @@ -1614,15 +1534,8 @@ pub(crate) mod tests { Arc::clone(&table_definition.schema), )); - let mut pool_conn = Arc::clone(&pool).connect_sync().expect("to get connection"); - let conn = pool_conn - .as_any_mut() - .downcast_mut::() - .expect("to downcast to duckdb connection"); - - let tx = conn - .get_underlying_conn_mut() - .transaction() + let tx = connection + .unchecked_transaction() .expect("should begin transaction"); // make an internal table for each definition @@ -1631,7 +1544,7 @@ pub(crate) mod tests { .expect("to create table creator"); table_creator - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); let other_table_creator = TableManager::new(Arc::clone(&other_definition)) @@ -1639,7 +1552,7 @@ pub(crate) mod tests { .expect("to create table creator"); other_table_creator - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); // each table should not list the other as an internal table @@ -1662,7 +1575,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_explain_analyze_with_index_and_view() { let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let connection = get_mem_duckdb(); let schema = Arc::new(arrow::datatypes::Schema::new(vec![ arrow::datatypes::Field::new("name", arrow::datatypes::DataType::Utf8, false), @@ -1679,14 +1592,8 @@ pub(crate) mod tests { )]), ); - let mut pool_conn = Arc::clone(&pool).connect_sync().expect("to get connection"); - let conn = pool_conn - .as_any_mut() - .downcast_mut::() - .expect("to downcast to duckdb connection"); - let tx = conn - .get_underlying_conn_mut() - .transaction() + let tx = connection + .unchecked_transaction() .expect("should begin transaction"); let table_manager = TableManager::new(Arc::clone(&table_definition)) @@ -1694,7 +1601,7 @@ pub(crate) mod tests { .expect("to create table manager"); table_manager - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); table_manager @@ -1752,7 +1659,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_explain_analyze_with_multiple_indexes_and_view() { let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let connection = get_mem_duckdb(); let schema = Arc::new(arrow::datatypes::Schema::new(vec![ arrow::datatypes::Field::new("name", arrow::datatypes::DataType::Utf8, false), @@ -1779,14 +1686,8 @@ pub(crate) mod tests { ]), ); - let mut pool_conn = Arc::clone(&pool).connect_sync().expect("to get connection"); - let conn = pool_conn - .as_any_mut() - .downcast_mut::() - .expect("to downcast to duckdb connection"); - let tx = conn - .get_underlying_conn_mut() - .transaction() + let tx = connection + .unchecked_transaction() .expect("should begin transaction"); let table_manager = TableManager::new(Arc::clone(&table_definition)) @@ -1794,7 +1695,7 @@ pub(crate) mod tests { .expect("to create table manager"); table_manager - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); table_manager @@ -1803,9 +1704,9 @@ pub(crate) mod tests { tx.execute( &format!( - r#"INSERT INTO "{table_name}" VALUES - ('Alice', 1, 30, 'active'), - ('Bob', 2, 25, 'inactive'), + r#"INSERT INTO "{table_name}" VALUES + ('Alice', 1, 30, 'active'), + ('Bob', 2, 25, 'inactive'), ('Charlie', 3, 35, 'active'), ('David', 4, 30, 'pending'), ('Eve', 5, 40, 'active')"#, @@ -1888,7 +1789,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_explain_analyze_with_primary_key_and_index_and_view() { let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let connection = get_mem_duckdb(); let schema = Arc::new(arrow::datatypes::Schema::new(vec![ arrow::datatypes::Field::new("name", arrow::datatypes::DataType::Utf8, false), @@ -1906,14 +1807,8 @@ pub(crate) mod tests { .with_constraints(get_pk_constraints(&["name"], Arc::clone(&schema))), ); - let mut pool_conn = Arc::clone(&pool).connect_sync().expect("to get connection"); - let conn = pool_conn - .as_any_mut() - .downcast_mut::() - .expect("to downcast to duckdb connection"); - let tx = conn - .get_underlying_conn_mut() - .transaction() + let tx = connection + .unchecked_transaction() .expect("should begin transaction"); let table_manager = TableManager::new(Arc::clone(&table_definition)) @@ -1921,7 +1816,7 @@ pub(crate) mod tests { .expect("to create table manager"); table_manager - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); table_manager @@ -1930,9 +1825,9 @@ pub(crate) mod tests { tx.execute( &format!( - r#"INSERT INTO "{table_name}" VALUES - ('Alice', 1, 30, 'active'), - ('Bob', 2, 25, 'inactive'), + r#"INSERT INTO "{table_name}" VALUES + ('Alice', 1, 30, 'active'), + ('Bob', 2, 25, 'inactive'), ('Charlie', 3, 35, 'active'), ('David', 4, 30, 'pending'), ('Eve', 5, 40, 'active')"#, diff --git a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_0.snap b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_0.snap index 6a93f486..c8bd13e5 100644 --- a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_0.snap +++ b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_0.snap @@ -1,5 +1,5 @@ --- -source: src/duckdb/creator.rs +source: core/src/duckdb/creator.rs expression: explain_result --- ┌─────────────────────────────────────┐ @@ -19,7 +19,7 @@ EXPLAIN ANALYZE SELECT * FROM test_table WHERE id = 1 ┌─────────────┴─────────────┐ │ EXPLAIN_ANALYZE │ │ ──────────────────── │ -│ 0 Rows │ +│ 0 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -39,6 +39,6 @@ EXPLAIN ANALYZE SELECT * FROM test_table WHERE id = 1 │ │ │ Filters: id=1 │ │ │ -│ 1 Rows │ +│ 1 row │ │ (0.00s) │ └───────────────────────────┘ diff --git a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_1.snap b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_1.snap index 59d93396..edcab01d 100644 --- a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_1.snap +++ b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_1.snap @@ -1,5 +1,5 @@ --- -source: src/duckdb/creator.rs +source: core/src/duckdb/creator.rs expression: explain_result --- ┌─────────────────────────────────────┐ @@ -19,7 +19,7 @@ EXPLAIN ANALYZE SELECT name FROM test_table WHERE id = 1 ┌─────────────┴─────────────┐ │ EXPLAIN_ANALYZE │ │ ──────────────────── │ -│ 0 Rows │ +│ 0 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -33,6 +33,6 @@ EXPLAIN ANALYZE SELECT name FROM test_table WHERE id = 1 │ Projections: name │ │ Filters: id=1 │ │ │ -│ 1 Rows │ +│ 1 row │ │ (0.00s) │ └───────────────────────────┘ diff --git a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_0.snap b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_0.snap index 6a93f486..c8bd13e5 100644 --- a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_0.snap +++ b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_0.snap @@ -1,5 +1,5 @@ --- -source: src/duckdb/creator.rs +source: core/src/duckdb/creator.rs expression: explain_result --- ┌─────────────────────────────────────┐ @@ -19,7 +19,7 @@ EXPLAIN ANALYZE SELECT * FROM test_table WHERE id = 1 ┌─────────────┴─────────────┐ │ EXPLAIN_ANALYZE │ │ ──────────────────── │ -│ 0 Rows │ +│ 0 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -39,6 +39,6 @@ EXPLAIN ANALYZE SELECT * FROM test_table WHERE id = 1 │ │ │ Filters: id=1 │ │ │ -│ 1 Rows │ +│ 1 row │ │ (0.00s) │ └───────────────────────────┘ diff --git a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_1.snap b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_1.snap index 59d93396..edcab01d 100644 --- a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_1.snap +++ b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_1.snap @@ -1,5 +1,5 @@ --- -source: src/duckdb/creator.rs +source: core/src/duckdb/creator.rs expression: explain_result --- ┌─────────────────────────────────────┐ @@ -19,7 +19,7 @@ EXPLAIN ANALYZE SELECT name FROM test_table WHERE id = 1 ┌─────────────┴─────────────┐ │ EXPLAIN_ANALYZE │ │ ──────────────────── │ -│ 0 Rows │ +│ 0 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -33,6 +33,6 @@ EXPLAIN ANALYZE SELECT name FROM test_table WHERE id = 1 │ Projections: name │ │ Filters: id=1 │ │ │ -│ 1 Rows │ +│ 1 row │ │ (0.00s) │ └───────────────────────────┘ diff --git a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_2.snap b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_2.snap index b1af244f..88022379 100644 --- a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_2.snap +++ b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_2.snap @@ -1,5 +1,5 @@ --- -source: src/duckdb/creator.rs +source: core/src/duckdb/creator.rs expression: explain_result --- ┌─────────────────────────────────────┐ @@ -19,7 +19,7 @@ EXPLAIN ANALYZE SELECT name, status FROM test_table WHERE id = 1 ┌─────────────┴─────────────┐ │ EXPLAIN_ANALYZE │ │ ──────────────────── │ -│ 0 Rows │ +│ 0 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -37,6 +37,6 @@ EXPLAIN ANALYZE SELECT name, status FROM test_table WHERE id = 1 │ │ │ Filters: id=1 │ │ │ -│ 1 Rows │ +│ 1 row │ │ (0.00s) │ └───────────────────────────┘ diff --git a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_3.snap b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_3.snap index 8cb00c22..5c1736ae 100644 --- a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_3.snap +++ b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_3.snap @@ -1,5 +1,5 @@ --- -source: src/duckdb/creator.rs +source: core/src/duckdb/creator.rs expression: explain_result --- ┌─────────────────────────────────────┐ @@ -19,7 +19,7 @@ EXPLAIN ANALYZE SELECT * FROM test_table WHERE age = 30 ┌─────────────┴─────────────┐ │ EXPLAIN_ANALYZE │ │ ──────────────────── │ -│ 0 Rows │ +│ 0 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -39,6 +39,6 @@ EXPLAIN ANALYZE SELECT * FROM test_table WHERE age = 30 │ │ │ Filters: age=30 │ │ │ -│ 2 Rows │ +│ 2 rows │ │ (0.00s) │ └───────────────────────────┘ diff --git a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_4.snap b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_4.snap index 22700bf5..6b38b29e 100644 --- a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_4.snap +++ b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_4.snap @@ -1,5 +1,5 @@ --- -source: src/duckdb/creator.rs +source: core/src/duckdb/creator.rs expression: explain_result --- ┌─────────────────────────────────────┐ @@ -19,7 +19,7 @@ EXPLAIN ANALYZE SELECT name FROM test_table WHERE age = 30 ┌─────────────┴─────────────┐ │ EXPLAIN_ANALYZE │ │ ──────────────────── │ -│ 0 Rows │ +│ 0 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -33,6 +33,6 @@ EXPLAIN ANALYZE SELECT name FROM test_table WHERE age = 30 │ Projections: name │ │ Filters: age=30 │ │ │ -│ 2 Rows │ +│ 2 rows │ │ (0.00s) │ └───────────────────────────┘ diff --git a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_5.snap b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_5.snap index 19356376..20056209 100644 --- a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_5.snap +++ b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_5.snap @@ -1,5 +1,5 @@ --- -source: src/duckdb/creator.rs +source: core/src/duckdb/creator.rs expression: explain_result --- ┌─────────────────────────────────────┐ @@ -19,7 +19,7 @@ EXPLAIN ANALYZE SELECT id, name FROM test_table WHERE age = 30 ┌─────────────┴─────────────┐ │ EXPLAIN_ANALYZE │ │ ──────────────────── │ -│ 0 Rows │ +│ 0 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -28,7 +28,7 @@ EXPLAIN ANALYZE SELECT id, name FROM test_table WHERE age = 30 │ id │ │ name │ │ │ -│ 2 Rows │ +│ 2 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -46,6 +46,6 @@ EXPLAIN ANALYZE SELECT id, name FROM test_table WHERE age = 30 │ │ │ Filters: age=30 │ │ │ -│ 2 Rows │ +│ 2 rows │ │ (0.00s) │ └───────────────────────────┘ diff --git a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_6.snap b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_6.snap index 79b1003b..d23408d7 100644 --- a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_6.snap +++ b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_6.snap @@ -1,5 +1,5 @@ --- -source: src/duckdb/creator.rs +source: core/src/duckdb/creator.rs expression: explain_result --- ┌─────────────────────────────────────┐ @@ -19,7 +19,7 @@ EXPLAIN ANALYZE SELECT * FROM test_table WHERE status = 'active' ┌─────────────┴─────────────┐ │ EXPLAIN_ANALYZE │ │ ──────────────────── │ -│ 0 Rows │ +│ 0 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -40,6 +40,6 @@ EXPLAIN ANALYZE SELECT * FROM test_table WHERE status = 'active' │ Filters: │ │ status='active' │ │ │ -│ 3 Rows │ +│ 3 rows │ │ (0.00s) │ └───────────────────────────┘ diff --git a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_7.snap b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_7.snap index 2533ea0a..c4f02fd9 100644 --- a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_7.snap +++ b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_7.snap @@ -1,5 +1,5 @@ --- -source: src/duckdb/creator.rs +source: core/src/duckdb/creator.rs expression: explain_result --- ┌─────────────────────────────────────┐ @@ -19,7 +19,7 @@ EXPLAIN ANALYZE SELECT name FROM test_table WHERE status = 'active' ┌─────────────┴─────────────┐ │ EXPLAIN_ANALYZE │ │ ──────────────────── │ -│ 0 Rows │ +│ 0 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -35,6 +35,6 @@ EXPLAIN ANALYZE SELECT name FROM test_table WHERE status = 'active' │ Filters: │ │ status='active' │ │ │ -│ 3 Rows │ +│ 3 rows │ │ (0.00s) │ └───────────────────────────┘ diff --git a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_8.snap b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_8.snap index 816d7e80..0c924790 100644 --- a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_8.snap +++ b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_multiple_indexes_8.snap @@ -1,5 +1,5 @@ --- -source: src/duckdb/creator.rs +source: core/src/duckdb/creator.rs expression: explain_result --- ┌─────────────────────────────────────┐ @@ -19,7 +19,7 @@ EXPLAIN ANALYZE SELECT id, age FROM test_table WHERE status = 'active' ┌─────────────┴─────────────┐ │ EXPLAIN_ANALYZE │ │ ──────────────────── │ -│ 0 Rows │ +│ 0 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -38,6 +38,6 @@ EXPLAIN ANALYZE SELECT id, age FROM test_table WHERE status = 'active' │ Filters: │ │ status='active' │ │ │ -│ 3 Rows │ +│ 3 rows │ │ (0.00s) │ └───────────────────────────┘ diff --git a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_0.snap b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_0.snap index 6a93f486..c8bd13e5 100644 --- a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_0.snap +++ b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_0.snap @@ -1,5 +1,5 @@ --- -source: src/duckdb/creator.rs +source: core/src/duckdb/creator.rs expression: explain_result --- ┌─────────────────────────────────────┐ @@ -19,7 +19,7 @@ EXPLAIN ANALYZE SELECT * FROM test_table WHERE id = 1 ┌─────────────┴─────────────┐ │ EXPLAIN_ANALYZE │ │ ──────────────────── │ -│ 0 Rows │ +│ 0 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -39,6 +39,6 @@ EXPLAIN ANALYZE SELECT * FROM test_table WHERE id = 1 │ │ │ Filters: id=1 │ │ │ -│ 1 Rows │ +│ 1 row │ │ (0.00s) │ └───────────────────────────┘ diff --git a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_1.snap b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_1.snap index 59d93396..edcab01d 100644 --- a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_1.snap +++ b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_1.snap @@ -1,5 +1,5 @@ --- -source: src/duckdb/creator.rs +source: core/src/duckdb/creator.rs expression: explain_result --- ┌─────────────────────────────────────┐ @@ -19,7 +19,7 @@ EXPLAIN ANALYZE SELECT name FROM test_table WHERE id = 1 ┌─────────────┴─────────────┐ │ EXPLAIN_ANALYZE │ │ ──────────────────── │ -│ 0 Rows │ +│ 0 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -33,6 +33,6 @@ EXPLAIN ANALYZE SELECT name FROM test_table WHERE id = 1 │ Projections: name │ │ Filters: id=1 │ │ │ -│ 1 Rows │ +│ 1 row │ │ (0.00s) │ └───────────────────────────┘ diff --git a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_2.snap b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_2.snap index b1af244f..88022379 100644 --- a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_2.snap +++ b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_2.snap @@ -1,5 +1,5 @@ --- -source: src/duckdb/creator.rs +source: core/src/duckdb/creator.rs expression: explain_result --- ┌─────────────────────────────────────┐ @@ -19,7 +19,7 @@ EXPLAIN ANALYZE SELECT name, status FROM test_table WHERE id = 1 ┌─────────────┴─────────────┐ │ EXPLAIN_ANALYZE │ │ ──────────────────── │ -│ 0 Rows │ +│ 0 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -37,6 +37,6 @@ EXPLAIN ANALYZE SELECT name, status FROM test_table WHERE id = 1 │ │ │ Filters: id=1 │ │ │ -│ 1 Rows │ +│ 1 row │ │ (0.00s) │ └───────────────────────────┘ diff --git a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_3.snap b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_3.snap index bab75885..8d404613 100644 --- a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_3.snap +++ b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_3.snap @@ -1,5 +1,5 @@ --- -source: src/duckdb/creator.rs +source: core/src/duckdb/creator.rs expression: explain_result --- ┌─────────────────────────────────────┐ @@ -19,7 +19,7 @@ EXPLAIN ANALYZE SELECT * FROM test_table WHERE name = 'Alice' ┌─────────────┴─────────────┐ │ EXPLAIN_ANALYZE │ │ ──────────────────── │ -│ 0 Rows │ +│ 0 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -39,6 +39,6 @@ EXPLAIN ANALYZE SELECT * FROM test_table WHERE name = 'Alice' │ │ │ Filters: name='Alice' │ │ │ -│ 1 Rows │ +│ 1 row │ │ (0.00s) │ └───────────────────────────┘ diff --git a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_4.snap b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_4.snap index a668f4d0..6bbf6858 100644 --- a/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_4.snap +++ b/core/src/duckdb/snapshots/datafusion_table_providers__duckdb__creator__tests__explain_analyze_primary_key_and_index_4.snap @@ -1,5 +1,5 @@ --- -source: src/duckdb/creator.rs +source: core/src/duckdb/creator.rs expression: explain_result --- ┌─────────────────────────────────────┐ @@ -19,7 +19,7 @@ EXPLAIN ANALYZE SELECT id, status FROM test_table WHERE name = 'Bob' ┌─────────────┴─────────────┐ │ EXPLAIN_ANALYZE │ │ ──────────────────── │ -│ 0 Rows │ +│ 0 rows │ │ (0.00s) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ @@ -37,6 +37,6 @@ EXPLAIN ANALYZE SELECT id, status FROM test_table WHERE name = 'Bob' │ │ │ Filters: name='Bob' │ │ │ -│ 1 Rows │ +│ 1 row │ │ (0.00s) │ └───────────────────────────┘ diff --git a/core/src/duckdb/write.rs b/core/src/duckdb/write.rs index b02b9c25..5d83e47c 100644 --- a/core/src/duckdb/write.rs +++ b/core/src/duckdb/write.rs @@ -1,18 +1,12 @@ -use std::time::{SystemTime, UNIX_EPOCH}; -use std::{any::Any, fmt, sync::Arc}; - -use crate::duckdb::DuckDB; -use crate::sql::db_connection_pool::duckdbpool::DuckDbConnectionPool; use crate::util::constraints::UpsertOptions; use crate::util::{ constraints, on_conflict::OnConflict, retriable_error::{check_and_mark_retriable_error, to_retriable_data_write_error}, }; -use arrow::array::RecordBatchReader; use arrow::ffi_stream::FFI_ArrowArrayStream; use arrow::{array::RecordBatch, datatypes::SchemaRef}; -use arrow_schema::ArrowError; +use arrow_array::RecordBatchIterator; use async_trait::async_trait; use datafusion::catalog::Session; use datafusion::common::{Constraints, SchemaExt}; @@ -25,15 +19,18 @@ use datafusion::{ logical_expr::Expr, physical_plan::{metrics::MetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan}, }; -use duckdb::Transaction; +use duckdb::{Connection, Transaction}; use futures::StreamExt; use snafu::prelude::*; +use std::time::{SystemTime, UNIX_EPOCH}; +use std::{any::Any, fmt, sync::Arc}; +use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task::JoinHandle; use super::creator::{TableDefinition, TableManager, ViewCreator}; -use super::{to_datafusion_error, RelationName}; use super::write_settings::DuckDBWriteSettings; +use super::{to_datafusion_error, RelationName}; /// A callback handler that is invoked after data has been successfully written to a DuckDB table /// but before the transaction is committed. @@ -58,7 +55,7 @@ const SCHEMA_EQUIVALENCE_ENABLED: bool = false; #[derive(Default)] pub struct DuckDBTableWriterBuilder { read_provider: Option>, - pool: Option>, + connection: Option, on_conflict: Option, table_definition: Option>, on_data_written: Option, @@ -78,8 +75,8 @@ impl DuckDBTableWriterBuilder { } #[must_use] - pub fn with_pool(mut self, pool: Arc) -> Self { - self.pool = Some(pool); + pub fn with_connection(mut self, connection: Connection) -> Self { + self.connection = Some(connection); self } @@ -120,8 +117,8 @@ impl DuckDBTableWriterBuilder { return Err(super::Error::MissingReadProvider); }; - let Some(pool) = self.pool else { - return Err(super::Error::MissingPool); + let Some(connection) = self.connection else { + return Err(super::Error::MissingConnection); }; let Some(table_definition) = self.table_definition else { @@ -132,28 +129,40 @@ impl DuckDBTableWriterBuilder { read_provider, on_conflict: self.on_conflict, table_definition, - pool, + connection, on_data_written: self.on_data_written, write_settings: self.write_settings.unwrap_or_default(), }) } } -#[derive(Clone)] pub struct DuckDBTableWriter { pub read_provider: Arc, - pool: Arc, + connection: Connection, table_definition: Arc, on_conflict: Option, on_data_written: Option, write_settings: DuckDBWriteSettings, } +impl Clone for DuckDBTableWriter { + fn clone(&self) -> Self { + Self { + read_provider: Arc::clone(&self.read_provider), + connection: self.connection.to_owned(), + table_definition: Arc::clone(&self.table_definition), + on_conflict: self.on_conflict.clone(), + on_data_written: self.on_data_written.clone(), + write_settings: self.write_settings.clone(), + } + } +} + impl std::fmt::Debug for DuckDBTableWriter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("DuckDBTableWriter") .field("read_provider", &self.read_provider) - .field("pool", &self.pool) + .field("connection", &self.connection) .field("table_definition", &self.table_definition) .field("on_conflict", &self.on_conflict) .field( @@ -169,9 +178,8 @@ impl std::fmt::Debug for DuckDBTableWriter { } impl DuckDBTableWriter { - #[must_use] - pub fn pool(&self) -> Arc { - Arc::clone(&self.pool) + pub fn connection(&self) -> &Connection { + &self.connection } #[must_use] @@ -233,7 +241,7 @@ impl TableProvider for DuckDBTableWriter { overwrite: InsertOp, ) -> datafusion::error::Result> { let mut sink = DuckDBDataSink::new( - Arc::clone(&self.pool), + &self.connection, Arc::clone(&self.table_definition), overwrite, self.on_conflict.clone(), @@ -250,7 +258,7 @@ impl TableProvider for DuckDBTableWriter { } pub(crate) struct DuckDBDataSink { - pool: Arc, + connection: Connection, table_definition: Arc, overwrite: InsertOp, on_conflict: Option, @@ -278,7 +286,6 @@ impl DataSink for DuckDBDataSink { mut data: SendableRecordBatchStream, _context: &Arc, ) -> datafusion::common::Result { - let pool = Arc::clone(&self.pool); let table_definition = Arc::clone(&self.table_definition); let overwrite = self.overwrite; let on_conflict = self.on_conflict.clone(); @@ -294,12 +301,13 @@ impl DataSink for DuckDBDataSink { let (notify_commit_transaction, on_commit_transaction) = tokio::sync::oneshot::channel(); let schema = data.schema(); + let task_connection = self.connection.to_owned(); let duckdb_write_handle: JoinHandle> = tokio::task::spawn_blocking(move || { let num_rows = match overwrite { InsertOp::Overwrite => insert_overwrite( - pool, + &task_connection, &table_definition, batch_rx, on_conflict.as_ref(), @@ -309,7 +317,7 @@ impl DataSink for DuckDBDataSink { &write_settings, )?, InsertOp::Append | InsertOp::Replace => insert_append( - pool, + &task_connection, &table_definition, batch_rx, on_conflict.as_ref(), @@ -387,14 +395,14 @@ impl DataSink for DuckDBDataSink { impl DuckDBDataSink { pub(crate) fn new( - pool: Arc, + connection: &Connection, table_definition: Arc, overwrite: InsertOp, on_conflict: Option, schema: SchemaRef, ) -> Self { Self { - pool, + connection: connection.to_owned(), table_definition, overwrite, on_conflict, @@ -431,7 +439,7 @@ impl DisplayAs for DuckDBDataSink { #[allow(clippy::too_many_arguments)] fn insert_append( - pool: Arc, + connection: &Connection, table_definition: &Arc, batch_rx: Receiver, on_conflict: Option<&OnConflict>, @@ -440,16 +448,8 @@ fn insert_append( schema: SchemaRef, write_settings: &DuckDBWriteSettings, ) -> datafusion::common::Result { - let mut db_conn = pool - .connect_sync() - .context(super::DbConnectionPoolSnafu) - .map_err(to_retriable_data_write_error)?; - - let duckdb_conn = DuckDB::duckdb_conn(&mut db_conn).map_err(to_retriable_data_write_error)?; - - let tx = duckdb_conn - .conn - .transaction() + let tx = connection + .unchecked_transaction() .context(super::UnableToBeginTransactionSnafu) .map_err(to_retriable_data_write_error)?; @@ -483,9 +483,10 @@ fn insert_append( "Append load for {table_name}", table_name = append_table.table_name() ); + let num_rows = write_to_table( - &append_table, &tx, + &append_table, Arc::clone(&schema), batch_rx, on_conflict, @@ -508,9 +509,8 @@ fn insert_append( .context(super::UnableToCommitTransactionSnafu) .map_err(to_retriable_data_write_error)?; - let tx = duckdb_conn - .conn - .transaction() + let tx = connection + .unchecked_transaction() .context(super::UnableToBeginTransactionSnafu) .map_err(to_datafusion_error)?; @@ -555,7 +555,7 @@ fn insert_append( #[allow(clippy::too_many_lines)] #[allow(clippy::too_many_arguments)] fn insert_overwrite( - pool: Arc, + connection: &Connection, table_definition: &Arc, batch_rx: Receiver, on_conflict: Option<&OnConflict>, @@ -564,17 +564,8 @@ fn insert_overwrite( schema: SchemaRef, write_settings: &DuckDBWriteSettings, ) -> datafusion::common::Result { - let cloned_pool = Arc::clone(&pool); - let mut db_conn = pool - .connect_sync() - .context(super::DbConnectionPoolSnafu) - .map_err(to_retriable_data_write_error)?; - - let duckdb_conn = DuckDB::duckdb_conn(&mut db_conn).map_err(to_retriable_data_write_error)?; - - let tx = duckdb_conn - .conn - .transaction() + let tx = connection + .unchecked_transaction() .context(super::UnableToBeginTransactionSnafu) .map_err(to_retriable_data_write_error)?; @@ -583,7 +574,7 @@ fn insert_overwrite( .map_err(to_retriable_data_write_error)?; new_table - .create_table(cloned_pool, &tx) + .create_table(connection, &tx) .map_err(to_retriable_data_write_error)?; let existing_tables = new_table @@ -655,7 +646,7 @@ fn insert_overwrite( } tracing::debug!("Initial load for {}", new_table.table_name()); - let num_rows = write_to_table(&new_table, &tx, Arc::clone(&schema), batch_rx, on_conflict) + let num_rows = write_to_table(&tx, &new_table, Arc::clone(&schema), batch_rx, on_conflict) .map_err(to_retriable_data_write_error)?; on_commit_transaction @@ -689,9 +680,8 @@ fn insert_overwrite( table_name = new_table.table_name() ); - let tx = duckdb_conn - .conn - .transaction() + let tx = connection + .unchecked_transaction() .context(super::UnableToBeginTransactionSnafu) .map_err(to_datafusion_error)?; @@ -715,17 +705,16 @@ fn insert_overwrite( #[allow(clippy::doc_markdown)] /// Writes a stream of ``RecordBatch``es to a DuckDB table. -fn write_to_table( +fn write_to_table_via_view( table: &TableManager, tx: &Transaction<'_>, schema: SchemaRef, - data_batches: Receiver, + batch: RecordBatch, on_conflict: Option<&OnConflict>, ) -> datafusion::common::Result { - let stream = FFI_ArrowArrayStream::new(Box::new(RecordBatchReaderFromStream::new( - data_batches, - schema, - ))); + let iterator = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema); + + let stream = FFI_ArrowArrayStream::new(Box::new(iterator)); let current_ts = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -747,6 +736,39 @@ fn write_to_table( Ok(rows as u64) } +#[allow(clippy::doc_markdown)] +/// Writes a stream of ``RecordBatch``es to a DuckDB table. +fn write_to_table( + tx: &Transaction<'_>, + table: &TableManager, + schema: SchemaRef, + mut data_batches: Receiver, + on_conflict: Option<&OnConflict>, +) -> datafusion::common::Result { + let mut appender = tx + .appender(table.table_name().to_string().as_str()) + .map_err(|e| DataFusionError::External(e.into()))?; + + let mut rows: u64 = 0; + + while let Some(batch) = data_batches.blocking_recv() { + rows += batch.num_rows() as u64; + + match appender.append_record_batch(batch.clone()) { + Ok(_) => {} + Err(_) => { + write_to_table_via_view(table, tx, schema.clone(), batch, on_conflict)?; + } + } + } + + appender + .flush() + .map_err(|e| DataFusionError::External(e.into()))?; + + Ok(rows) +} + /// Executes an ANALYZE statement to update query optimizer statistics. /// This helps DuckDB's query planner generate better execution plans, especially after large data changes. /// https://duckdb.org/docs/stable/sql/statements/analyze @@ -771,31 +793,6 @@ pub fn execute_analyze_sql(tx: &Transaction, table_name: &str) { } } -struct RecordBatchReaderFromStream { - stream: Receiver, - schema: SchemaRef, -} - -impl RecordBatchReaderFromStream { - fn new(stream: Receiver, schema: SchemaRef) -> Self { - Self { stream, schema } - } -} - -impl Iterator for RecordBatchReaderFromStream { - type Item = Result; - - fn next(&mut self) -> Option { - self.stream.blocking_recv().map(Ok) - } -} - -impl RecordBatchReader for RecordBatchReaderFromStream { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } -} - #[cfg(test)] mod test { use arrow::array::{Int64Array, StringArray}; @@ -803,8 +800,9 @@ mod test { use datafusion_physical_plan::memory::MemoryStream; use super::*; + use crate::duckdb::creator::tests::get_mem_duckdb; use crate::{ - duckdb::creator::tests::{get_basic_table_definition, get_mem_duckdb, init_tracing}, + duckdb::creator::tests::{get_basic_table_definition, init_tracing}, util::{column_reference::ColumnReference, indexes::IndexType}, }; @@ -814,12 +812,12 @@ mod test { // Expected behavior: Data sink creates a new internal table, writes data to it, and creates a view with the table definition name let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let mut connection = get_mem_duckdb(); let table_definition = get_basic_table_definition(); let duckdb_sink = DuckDBDataSink::new( - Arc::clone(&pool), + &connection, Arc::clone(&table_definition), InsertOp::Overwrite, None, @@ -848,9 +846,7 @@ mod test { .await .expect("to write all"); - let mut conn = pool.connect_sync().expect("to connect"); - let duckdb = DuckDB::duckdb_conn(&mut conn).expect("to get duckdb conn"); - let tx = duckdb.conn.transaction().expect("to begin transaction"); + let tx = connection.transaction().expect("to begin transaction"); let mut internal_tables = table_definition .list_internal_tables(&tx) .expect("to list internal tables"); @@ -889,14 +885,13 @@ mod test { // Before creating the view, the base table needs to get dropped as we need to create a view with the same name. let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let connection = get_mem_duckdb(); let table_definition = get_basic_table_definition(); - let cloned_pool = Arc::clone(&pool); - let mut conn = cloned_pool.connect_sync().expect("to connect"); - let duckdb = DuckDB::duckdb_conn(&mut conn).expect("to get duckdb conn"); - let tx = duckdb.conn.transaction().expect("to begin transaction"); + let tx = connection + .unchecked_transaction() + .expect("to be able to begin transaction"); // make an existing table to overwrite let overwrite_table = TableManager::new(Arc::clone(&table_definition)) @@ -904,7 +899,7 @@ mod test { .expect("to create table"); overwrite_table - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); tx.execute( @@ -919,7 +914,7 @@ mod test { tx.commit().expect("to commit"); let duckdb_sink = DuckDBDataSink::new( - Arc::clone(&pool), + &connection, Arc::clone(&table_definition), InsertOp::Overwrite, None, @@ -948,9 +943,9 @@ mod test { .await .expect("to write all"); - let mut conn = pool.connect_sync().expect("to connect"); - let duckdb = DuckDB::duckdb_conn(&mut conn).expect("to get duckdb conn"); - let tx = duckdb.conn.transaction().expect("to begin transaction"); + let tx = connection + .unchecked_transaction() + .expect("to begin transaction"); let mut internal_tables = table_definition .list_internal_tables(&tx) .expect("to list internal tables"); @@ -995,14 +990,13 @@ mod test { // Before creating the view, the base table needs to get dropped as we need to create a view with the same name. let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let mut connection = get_mem_duckdb(); let table_definition = get_basic_table_definition(); - let cloned_pool = Arc::clone(&pool); - let mut conn = cloned_pool.connect_sync().expect("to connect"); - let duckdb = DuckDB::duckdb_conn(&mut conn).expect("to get duckdb conn"); - let tx = duckdb.conn.transaction().expect("to begin transaction"); + let tx = connection + .unchecked_transaction() + .expect("to begin transaction"); // make an existing table to overwrite let overwrite_table = TableManager::new(Arc::clone(&table_definition)) @@ -1010,7 +1004,7 @@ mod test { .expect("to create table"); overwrite_table - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); tx.execute( @@ -1025,7 +1019,7 @@ mod test { tx.commit().expect("to commit"); let duckdb_sink = DuckDBDataSink::new( - Arc::clone(&pool), + &connection, Arc::clone(&table_definition), InsertOp::Overwrite, None, @@ -1054,9 +1048,7 @@ mod test { .await .expect("to write all"); - let mut conn = pool.connect_sync().expect("to connect"); - let duckdb = DuckDB::duckdb_conn(&mut conn).expect("to get duckdb conn"); - let tx = duckdb.conn.transaction().expect("to begin transaction"); + let tx = connection.transaction().expect("to begin transaction"); let mut internal_tables = table_definition .list_internal_tables(&tx) .expect("to list internal tables"); @@ -1095,12 +1087,11 @@ mod test { // The existing table is re-used. let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let mut connection = get_mem_duckdb(); - let cloned_pool = Arc::clone(&pool); - let mut conn = cloned_pool.connect_sync().expect("to connect"); - let duckdb = DuckDB::duckdb_conn(&mut conn).expect("to get duckdb conn"); - let tx = duckdb.conn.transaction().expect("to begin transaction"); + let tx = connection + .unchecked_transaction() + .expect("to begin transaction"); let table_definition = get_basic_table_definition(); @@ -1110,7 +1101,7 @@ mod test { .expect("to create table"); append_table - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); tx.execute( @@ -1125,7 +1116,7 @@ mod test { tx.commit().expect("to commit"); let duckdb_sink = DuckDBDataSink::new( - Arc::clone(&pool), + &connection, Arc::clone(&table_definition), InsertOp::Append, None, @@ -1154,7 +1145,7 @@ mod test { .await .expect("to write all"); - let tx = duckdb.conn.transaction().expect("to begin transaction"); + let tx = connection.transaction().expect("to begin transaction"); let internal_tables = table_definition .list_internal_tables(&tx) @@ -1188,12 +1179,11 @@ mod test { // The existing table is re-used. let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let mut connection = get_mem_duckdb(); - let cloned_pool = Arc::clone(&pool); - let mut conn = cloned_pool.connect_sync().expect("to connect"); - let duckdb = DuckDB::duckdb_conn(&mut conn).expect("to get duckdb conn"); - let tx = duckdb.conn.transaction().expect("to begin transaction"); + let tx = connection + .unchecked_transaction() + .expect("to begin transaction"); let schema = Arc::new(arrow::datatypes::Schema::new(vec![ arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int64, false), @@ -1218,7 +1208,7 @@ mod test { .expect("to create table"); append_table - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); // don't apply indexes, and leave the table empty to simulate a new table from TableProviderFactory::create() @@ -1226,7 +1216,7 @@ mod test { tx.commit().expect("to commit"); let duckdb_sink = DuckDBDataSink::new( - Arc::clone(&pool), + &connection, Arc::clone(&table_definition), InsertOp::Append, None, @@ -1255,7 +1245,7 @@ mod test { .await .expect("to write all"); - let tx = duckdb.conn.transaction().expect("to begin transaction"); + let tx = connection.transaction().expect("to begin transaction"); let internal_tables = table_definition .list_internal_tables(&tx) @@ -1312,12 +1302,11 @@ mod test { ); let _guard = init_tracing(None); - let pool = get_mem_duckdb(); + let mut connection = get_mem_duckdb(); - let cloned_pool = Arc::clone(&pool); - let mut conn = cloned_pool.connect_sync().expect("to connect"); - let duckdb = DuckDB::duckdb_conn(&mut conn).expect("to get duckdb conn"); - let tx = duckdb.conn.transaction().expect("to begin transaction"); + let tx = connection + .unchecked_transaction() + .expect("to begin transaction"); let schema = Arc::new(arrow::datatypes::Schema::new(vec![ arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int64, false), @@ -1342,7 +1331,7 @@ mod test { .expect("to create table"); append_table - .create_table(Arc::clone(&pool), &tx) + .create_table(&connection, &tx) .expect("to create table"); // don't apply indexes, and leave the table empty to simulate a new table from TableProviderFactory::create() @@ -1350,7 +1339,7 @@ mod test { tx.commit().expect("to commit"); let duckdb_sink = DuckDBDataSink::new( - Arc::clone(&pool), + &connection, Arc::clone(&table_definition), InsertOp::Append, None, @@ -1380,7 +1369,7 @@ mod test { .await .expect("to write all"); - let tx = duckdb.conn.transaction().expect("to begin transaction"); + let tx = connection.transaction().expect("to begin transaction"); let base_table = append_table .base_table(&tx) diff --git a/core/src/sql/arrow_sql_gen/postgres/schema.rs b/core/src/sql/arrow_sql_gen/postgres/schema.rs index 4539cb99..ea3d5d49 100644 --- a/core/src/sql/arrow_sql_gen/postgres/schema.rs +++ b/core/src/sql/arrow_sql_gen/postgres/schema.rs @@ -81,7 +81,7 @@ pub(crate) fn pg_data_type_to_arrow_type( )), "xml" | "json" => Ok(DataType::Utf8), // `Name` is a 64 bytes (varchar) / internal type for object names - "\"Name\"" => Ok(DataType::Utf8), + "\"Name\"" => Ok(DataType::Utf8), "array" => parse_array_type(context), "composite" => parse_composite_type(context), "geometry" | "geography" => Ok(DataType::Binary),