diff --git a/Cargo.lock b/Cargo.lock index aa30a87e89..0aef3f46f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3712,6 +3712,9 @@ dependencies = [ "iceberg-catalog-hms", "iceberg-catalog-rest", "iceberg-catalog-s3tables", + "iceberg-catalog-sql", + "sqlx", + "tempfile", "tokio", ] @@ -3757,13 +3760,12 @@ version = "0.7.0" dependencies = [ "async-trait", "iceberg", - "iceberg_test_utils", "itertools 0.13.0", "regex", "sqlx", + "strum 0.27.2", "tempfile", "tokio", - "typed-builder 0.20.1", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 999b911753..ca3e6f72d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,6 +80,7 @@ iceberg-catalog-rest = { version = "0.7.0", path = "./crates/catalog/rest" } iceberg-catalog-glue = { version = "0.7.0", path = "./crates/catalog/glue" } iceberg-catalog-s3tables = { version = "0.7.0", path = "./crates/catalog/s3tables" } iceberg-catalog-hms = { version = "0.7.0", path = "./crates/catalog/hms" } +iceberg-catalog-sql = { version = "0.7.0", path = "./crates/catalog/sql" } iceberg-datafusion = { version = "0.7.0", path = "./crates/integrations/datafusion" } indicatif = "0.17" itertools = "0.13" diff --git a/crates/catalog/loader/Cargo.toml b/crates/catalog/loader/Cargo.toml index 014f84a3a1..4228ed7d5a 100644 --- a/crates/catalog/loader/Cargo.toml +++ b/crates/catalog/loader/Cargo.toml @@ -34,5 +34,14 @@ iceberg-catalog-rest = { workspace = true } iceberg-catalog-glue = { workspace = true } iceberg-catalog-s3tables = { workspace = true } iceberg-catalog-hms = { workspace = true } +iceberg-catalog-sql = { workspace = true } tokio = { workspace = true } async-trait = { workspace = true } + +[dev-dependencies] +sqlx = { version = "0.8.1", features = [ + "runtime-tokio", + "sqlite", + "migrate", +], default-features = false } +tempfile = { workspace = true } diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs index 9c18ab4e5f..e118ef86a9 100644 --- a/crates/catalog/loader/src/lib.rs +++ b/crates/catalog/loader/src/lib.rs @@ -24,6 +24,7 @@ use iceberg_catalog_glue::GlueCatalogBuilder; use iceberg_catalog_hms::HmsCatalogBuilder; use iceberg_catalog_rest::RestCatalogBuilder; use iceberg_catalog_s3tables::S3TablesCatalogBuilder; +use iceberg_catalog_sql::SqlCatalogBuilder; /// A CatalogBuilderFactory creating a new catalog builder. type CatalogBuilderFactory = fn() -> Box; @@ -34,6 +35,7 @@ static CATALOG_REGISTRY: &[(&str, CatalogBuilderFactory)] = &[ ("glue", || Box::new(GlueCatalogBuilder::default())), ("s3tables", || Box::new(S3TablesCatalogBuilder::default())), ("hms", || Box::new(HmsCatalogBuilder::default())), + ("sql", || Box::new(SqlCatalogBuilder::default())), ]; /// Return the list of supported catalog types. @@ -108,6 +110,9 @@ impl CatalogLoader<'_> { mod tests { use std::collections::HashMap; + use sqlx::migrate::MigrateDatabase; + use tempfile::TempDir; + use crate::{CatalogLoader, load}; #[tokio::test] @@ -220,6 +225,35 @@ mod tests { assert!(catalog.is_ok()); } + fn temp_path() -> String { + let temp_dir = TempDir::new().unwrap(); + temp_dir.path().to_str().unwrap().to_string() + } + + #[tokio::test] + async fn test_catalog_loader_pattern_sql_catalog() { + use iceberg_catalog_sql::{SQL_CATALOG_PROP_URI, SQL_CATALOG_PROP_WAREHOUSE}; + + let uri = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&uri).await.unwrap(); + + let catalog_loader = load("sql").unwrap(); + let catalog = catalog_loader + .load( + "sql".to_string(), + HashMap::from([ + (SQL_CATALOG_PROP_URI.to_string(), uri), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + "s3://warehouse".to_string(), + ), + ]), + ) + .await; + + assert!(catalog.is_ok()); + } + #[tokio::test] async fn test_error_message_includes_supported_types() { let err = match load("does-not-exist") { diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 33ca700bf7..262422c284 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -32,10 +32,9 @@ repository = { workspace = true } async-trait = { workspace = true } iceberg = { workspace = true } sqlx = { version = "0.8.1", features = ["any"], default-features = false } -typed-builder = { workspace = true } +strum = { workspace = true } [dev-dependencies] -iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } itertools = { workspace = true } regex = "1.10.5" sqlx = { version = "0.8.1", features = [ diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 35889d451a..6602fa76dc 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -16,6 +16,7 @@ // under the License. use std::collections::{HashMap, HashSet}; +use std::str::FromStr; use std::time::Duration; use async_trait::async_trait; @@ -23,17 +24,23 @@ use iceberg::io::FileIO; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ - Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit, - TableCreation, TableIdent, + Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, + TableCommit, TableCreation, TableIdent, }; use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers}; use sqlx::{Any, AnyPool, Row, Transaction}; -use typed_builder::TypedBuilder; use crate::error::{ from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err, }; +/// catalog URI +pub const SQL_CATALOG_PROP_URI: &str = "uri"; +/// catalog warehouse location +pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; +/// catalog sql bind style +pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style"; + static CATALOG_TABLE_NAME: &str = "iceberg_tables"; static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name"; static CATALOG_FIELD_TABLE_NAME: &str = "table_name"; @@ -54,6 +61,126 @@ static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if n static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning +/// Builder for [`SqlCatalog`] +#[derive(Debug)] +pub struct SqlCatalogBuilder(SqlCatalogConfig); + +impl Default for SqlCatalogBuilder { + fn default() -> Self { + Self(SqlCatalogConfig { + uri: "".to_string(), + name: "".to_string(), + warehouse_location: "".to_string(), + sql_bind_style: SqlBindStyle::DollarNumeric, + props: HashMap::new(), + }) + } +} + +impl SqlCatalogBuilder { + /// Configure the database URI + /// + /// If `SQL_CATALOG_PROP_URI` has a value set in `props` during `SqlCatalogBuilder::load`, + /// that value takes precedence, and the value specified by this method will not be used. + pub fn uri(mut self, uri: impl Into) -> Self { + self.0.uri = uri.into(); + self + } + + /// Configure the warehouse location + /// + /// If `SQL_CATALOG_PROP_WAREHOUSE` has a value set in `props` during `SqlCatalogBuilder::load`, + /// that value takes precedence, and the value specified by this method will not be used. + pub fn warehouse_location(mut self, location: impl Into) -> Self { + self.0.warehouse_location = location.into(); + self + } + + /// Configure the bound SQL Statement + /// + /// If `SQL_CATALOG_PROP_BIND_STYLE` has a value set in `props` during `SqlCatalogBuilder::load`, + /// that value takes precedence, and the value specified by this method will not be used. + pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self { + self.0.sql_bind_style = sql_bind_style; + self + } + + /// Configure the any properties + /// + /// If the same key has values set in `props` during `SqlCatalogBuilder::load`, + /// those values will take precedence. + pub fn props(mut self, props: HashMap) -> Self { + for (k, v) in props { + self.0.props.insert(k, v); + } + self + } + + /// Set a new property on the property to be configured. + /// When multiple methods are executed with the same key, + /// the later-set value takes precedence. + /// + /// If the same key has values set in `props` during `SqlCatalogBuilder::load`, + /// those values will take precedence. + pub fn prop(mut self, key: impl Into, value: impl Into) -> Self { + self.0.props.insert(key.into(), value.into()); + self + } +} + +impl CatalogBuilder for SqlCatalogBuilder { + type C = SqlCatalog; + + fn load( + mut self, + name: impl Into, + props: HashMap, + ) -> impl Future> + Send { + let name = name.into(); + + for (k, v) in props { + self.0.props.insert(k, v); + } + + if let Some(uri) = self.0.props.remove(SQL_CATALOG_PROP_URI) { + self.0.uri = uri; + } + if let Some(warehouse_location) = self.0.props.remove(SQL_CATALOG_PROP_WAREHOUSE) { + self.0.warehouse_location = warehouse_location; + } + + let mut valid_sql_bind_style = true; + if let Some(sql_bind_style) = self.0.props.remove(SQL_CATALOG_PROP_BIND_STYLE) { + if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) { + self.0.sql_bind_style = sql_bind_style; + } else { + valid_sql_bind_style = false; + } + } + + async move { + if name.trim().is_empty() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog name cannot be empty", + )) + } else if !valid_sql_bind_style { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "`{}` values are valid only if they're `{}` or `{}`", + SQL_CATALOG_PROP_BIND_STYLE, + SqlBindStyle::DollarNumeric, + SqlBindStyle::QMark + ), + )) + } else { + SqlCatalog::new(self.0).await + } + } + } +} + /// A struct representing the SQL catalog configuration. /// /// This struct contains various parameters that are used to configure a SQL catalog, @@ -62,14 +189,12 @@ static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each con /// The options available for this parameter include: /// - `SqlBindStyle::DollarNumeric`: Binds SQL statements using `$1`, `$2`, etc., as placeholders. This is for PostgreSQL databases. /// - `SqlBindStyle::QuestionMark`: Binds SQL statements using `?` as a placeholder. This is for MySQL and SQLite databases. -#[derive(Debug, TypedBuilder)] -pub struct SqlCatalogConfig { +#[derive(Debug)] +struct SqlCatalogConfig { uri: String, name: String, warehouse_location: String, - file_io: FileIO, sql_bind_style: SqlBindStyle, - #[builder(default)] props: HashMap, } @@ -83,7 +208,7 @@ pub struct SqlCatalog { sql_bind_style: SqlBindStyle, } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, strum::EnumString, strum::Display)] /// Set the SQL parameter bind style to either $1..$N (Postgres style) or ? (SQLite/MySQL/MariaDB) pub enum SqlBindStyle { /// DollarNumeric uses parameters of the form `$1..$N``, which is the Postgres style @@ -94,7 +219,8 @@ pub enum SqlBindStyle { impl SqlCatalog { /// Create new sql catalog instance - pub async fn new(config: SqlCatalogConfig) -> Result { + async fn new(config: SqlCatalogConfig) -> Result { + let fileio = FileIO::from_path(&config.warehouse_location)?.build()?; install_default_drivers(); let max_connections: u32 = config .props @@ -150,7 +276,7 @@ impl SqlCatalog { name: config.name.to_owned(), connection: pool, warehouse_location: config.warehouse_location, - fileio: config.file_io, + fileio, sql_bind_style: config.sql_bind_style, }) } @@ -787,17 +913,19 @@ mod tests { use std::collections::{HashMap, HashSet}; use std::hash::Hash; - use iceberg::io::FileIOBuilder; use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; use iceberg::table::Table; - use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent}; + use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent}; use itertools::Itertools; use regex::Regex; use sqlx::migrate::MigrateDatabase; use tempfile::TempDir; - use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY; - use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig}; + use crate::catalog::{ + NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI, + SQL_CATALOG_PROP_WAREHOUSE, + }; + use crate::{SqlBindStyle, SqlCatalogBuilder}; const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; @@ -818,15 +946,18 @@ mod tests { let sql_lite_uri = format!("sqlite:{}", temp_path()); sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); - let config = SqlCatalogConfig::builder() - .uri(sql_lite_uri.to_string()) - .name("iceberg".to_string()) - .warehouse_location(warehouse_location) - .file_io(FileIOBuilder::new_fs_io().build().unwrap()) - .sql_bind_style(SqlBindStyle::QMark) - .build(); - - SqlCatalog::new(config).await.unwrap() + let props = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.to_string()), + (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::DollarNumeric.to_string(), + ), + ]); + SqlCatalogBuilder::default() + .load("iceberg", props) + .await + .unwrap() } async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) { @@ -925,6 +1056,203 @@ mod tests { new_sql_catalog(warehouse_loc.clone()).await; } + #[tokio::test] + async fn test_builder_method() { + let sql_lite_uri = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); + let warehouse_location = temp_path(); + + let catalog = SqlCatalogBuilder::default() + .uri(sql_lite_uri.to_string()) + .warehouse_location(warehouse_location.clone()) + .sql_bind_style(SqlBindStyle::QMark) + .load("iceberg", HashMap::default()) + .await; + assert!(catalog.is_ok()); + + let catalog = catalog.unwrap(); + assert!(catalog.warehouse_location == warehouse_location); + assert!(catalog.sql_bind_style == SqlBindStyle::QMark); + } + + /// Overwriting an sqlite database with a non-existent path causes + /// catalog generation to fail + #[tokio::test] + async fn test_builder_props_non_existent_path_fails() { + let sql_lite_uri = format!("sqlite:{}", temp_path()); + let sql_lite_uri2 = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); + let warehouse_location = temp_path(); + + let catalog = SqlCatalogBuilder::default() + .uri(sql_lite_uri) + .warehouse_location(warehouse_location) + .load( + "iceberg", + HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)]), + ) + .await; + assert!(catalog.is_err()); + } + + /// Even when an invalid URI is specified in a builder method, + /// it can be successfully overridden with a valid URI in props + /// for catalog generation to succeed. + #[tokio::test] + async fn test_builder_props_set_valid_uri() { + let sql_lite_uri = format!("sqlite:{}", temp_path()); + let sql_lite_uri2 = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); + let warehouse_location = temp_path(); + + let catalog = SqlCatalogBuilder::default() + .uri(sql_lite_uri2) + .warehouse_location(warehouse_location) + .load( + "iceberg", + HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone())]), + ) + .await; + assert!(catalog.is_ok()); + } + + /// values assigned via props take precedence + #[tokio::test] + async fn test_builder_props_take_precedence() { + let sql_lite_uri = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); + let warehouse_location = temp_path(); + let warehouse_location2 = temp_path(); + + let catalog = SqlCatalogBuilder::default() + .warehouse_location(warehouse_location2) + .sql_bind_style(SqlBindStyle::DollarNumeric) + .load( + "iceberg", + HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + warehouse_location.clone(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::QMark.to_string(), + ), + ]), + ) + .await; + + assert!(catalog.is_ok()); + + let catalog = catalog.unwrap(); + assert!(catalog.warehouse_location == warehouse_location); + assert!(catalog.sql_bind_style == SqlBindStyle::QMark); + } + + /// values assigned via props take precedence + #[tokio::test] + async fn test_builder_props_take_precedence_props() { + let sql_lite_uri = format!("sqlite:{}", temp_path()); + let sql_lite_uri2 = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); + let warehouse_location = temp_path(); + let warehouse_location2 = temp_path(); + + let props = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + warehouse_location.clone(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::QMark.to_string(), + ), + ]); + let props2 = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2.clone()), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + warehouse_location2.clone(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::DollarNumeric.to_string(), + ), + ]); + + let catalog = SqlCatalogBuilder::default() + .props(props2) + .load("iceberg", props) + .await; + + assert!(catalog.is_ok()); + + let catalog = catalog.unwrap(); + assert!(catalog.warehouse_location == warehouse_location); + assert!(catalog.sql_bind_style == SqlBindStyle::QMark); + } + + /// values assigned via props take precedence + #[tokio::test] + async fn test_builder_props_take_precedence_prop() { + let sql_lite_uri = format!("sqlite:{}", temp_path()); + let sql_lite_uri2 = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); + let warehouse_location = temp_path(); + let warehouse_location2 = temp_path(); + + let props = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + warehouse_location.clone(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::QMark.to_string(), + ), + ]); + + let catalog = SqlCatalogBuilder::default() + .prop(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2) + .prop(SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location2) + .prop( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::DollarNumeric.to_string(), + ) + .load("iceberg", props) + .await; + + assert!(catalog.is_ok()); + + let catalog = catalog.unwrap(); + assert!(catalog.warehouse_location == warehouse_location); + assert!(catalog.sql_bind_style == SqlBindStyle::QMark); + } + + /// invalid value for `SqlBindStyle` causes catalog creation to fail + #[tokio::test] + async fn test_builder_props_invalid_bind_style_fails() { + let sql_lite_uri = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); + let warehouse_location = temp_path(); + + let catalog = SqlCatalogBuilder::default() + .load( + "iceberg", + HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri), + (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location), + (SQL_CATALOG_PROP_BIND_STYLE.to_string(), "AAA".to_string()), + ]), + ) + .await; + + assert!(catalog.is_err()); + } + #[tokio::test] async fn test_list_namespaces_returns_empty_vector() { let warehouse_loc = temp_path(); diff --git a/crates/catalog/sql/src/lib.rs b/crates/catalog/sql/src/lib.rs index 6861dab3f8..b76006ed3b 100644 --- a/crates/catalog/sql/src/lib.rs +++ b/crates/catalog/sql/src/lib.rs @@ -16,6 +16,43 @@ // under the License. //! Iceberg sql catalog implementation. +//! +//! To build a sql catalog with configurations +//! # Example +//! +//! ```rust, no_run +//! use std::collections::HashMap; +//! +//! use iceberg::CatalogBuilder; +//! use iceberg_catalog_sql::{ +//! SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI, SQL_CATALOG_PROP_WAREHOUSE, +//! SqlBindStyle, SqlCatalogBuilder, +//! }; +//! +//! #[tokio::main] +//! async fn main() { +//! let catalog = SqlCatalogBuilder::default() +//! .load( +//! "sql", +//! HashMap::from_iter([ +//! ( +//! SQL_CATALOG_PROP_URI.to_string(), +//! "http://localhost:8181".to_string(), +//! ), +//! ( +//! SQL_CATALOG_PROP_WAREHOUSE.to_string(), +//! "s3://warehouse".to_string(), +//! ), +//! ( +//! SQL_CATALOG_PROP_BIND_STYLE.to_string(), +//! SqlBindStyle::QMark.to_string(), +//! ), +//! ]), +//! ) +//! .await +//! .unwrap(); +//! } +//! ``` #![deny(missing_docs)]