From 85e7469d1ee21c803feafed5cfc3090c9bd0f8ef Mon Sep 17 00:00:00 2001 From: Yoshiki Kudo Date: Sun, 7 Sep 2025 20:51:44 +0900 Subject: [PATCH 01/11] feat(catalog): Implement catalog loader for sql --- crates/catalog/sql/src/catalog.rs | 47 +++++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 35889d451a..183289c17b 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -19,12 +19,12 @@ use std::collections::{HashMap, HashSet}; use std::time::Duration; use async_trait::async_trait; -use iceberg::io::FileIO; +use iceberg::io::{FileIO, FileIOBuilder}; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ - Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit, - TableCreation, TableIdent, + Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, + TableCommit, TableCreation, TableIdent, }; use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers}; use sqlx::{Any, AnyPool, Row, Transaction}; @@ -54,6 +54,47 @@ static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if n static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning +/// Builder for [`SqlCatalog`] +#[derive(Debug)] +pub struct SqlCatalogBuilder(SqlCatalogConfig); + +impl Default for SqlCatalogBuilder { + fn default() -> Self { + Self(SqlCatalogConfig { + uri: "".to_string(), + name: "".to_string(), + warehouse_location: "".to_string(), + file_io: FileIOBuilder::new_fs_io().build().unwrap(), + sql_bind_style: SqlBindStyle::DollarNumeric, + props: HashMap::new(), + }) + } +} + +impl CatalogBuilder for SqlCatalogBuilder { + type C = SqlCatalog; + + fn load( + mut self, + name: impl Into, + props: HashMap, + ) -> impl Future> + Send { + let name = name.into(); + self.0.props = props; + + async move { + if name.trim().is_empty() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog name cannot be empty", + )) + } else { + SqlCatalog::new(self.0).await + } + } + } +} + /// A struct representing the SQL catalog configuration. /// /// This struct contains various parameters that are used to configure a SQL catalog, From 9f264150849e30e25f5c1d9c87e1a30349c25947 Mon Sep 17 00:00:00 2001 From: Yoshiki Kudo Date: Tue, 9 Sep 2025 18:44:13 +0900 Subject: [PATCH 02/11] feat(catalog): Implement catalog builder for sql --- crates/catalog/sql/src/catalog.rs | 58 +++++++++++++++++++++++++------ 1 file changed, 47 insertions(+), 11 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 183289c17b..bf582e0753 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -28,7 +28,6 @@ use iceberg::{ }; use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers}; use sqlx::{Any, AnyPool, Row, Transaction}; -use typed_builder::TypedBuilder; use crate::error::{ from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err, @@ -71,6 +70,46 @@ impl Default for SqlCatalogBuilder { } } +impl SqlCatalogBuilder { + /// Configure the database URI + pub fn uri(mut self, uri: impl Into) -> Self { + self.0.uri = uri.into(); + self + } + + /// Configure the warehouse location + pub fn warehouse_location(mut self, location: impl Into) -> Self { + self.0.warehouse_location = location.into(); + self + } + + /// Configure the FileIO + pub fn file_io(mut self, file_io: FileIO) -> Self { + self.0.file_io = file_io; + self + } + + /// Configure the bound SQL Statement + pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self { + self.0.sql_bind_style = sql_bind_style; + self + } + + /// Configure the any properties + pub fn props(mut self, props: HashMap) -> Self { + self.0.props = props; + self + } + + /// Set a new property on the property to be configured. + /// When multiple methods are executed with the same key, + /// the later-set value takes precedence. + pub fn prop(mut self, key: impl Into, value: impl Into) -> Self { + self.0.props.insert(key.into(), value.into()); + self + } +} + impl CatalogBuilder for SqlCatalogBuilder { type C = SqlCatalog; @@ -103,14 +142,13 @@ impl CatalogBuilder for SqlCatalogBuilder { /// The options available for this parameter include: /// - `SqlBindStyle::DollarNumeric`: Binds SQL statements using `$1`, `$2`, etc., as placeholders. This is for PostgreSQL databases. /// - `SqlBindStyle::QuestionMark`: Binds SQL statements using `?` as a placeholder. This is for MySQL and SQLite databases. -#[derive(Debug, TypedBuilder)] +#[derive(Debug)] pub struct SqlCatalogConfig { uri: String, name: String, warehouse_location: String, file_io: FileIO, sql_bind_style: SqlBindStyle, - #[builder(default)] props: HashMap, } @@ -135,7 +173,7 @@ pub enum SqlBindStyle { impl SqlCatalog { /// Create new sql catalog instance - pub async fn new(config: SqlCatalogConfig) -> Result { + async fn new(config: SqlCatalogConfig) -> Result { install_default_drivers(); let max_connections: u32 = config .props @@ -831,14 +869,14 @@ mod tests { use iceberg::io::FileIOBuilder; use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; use iceberg::table::Table; - use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent}; + use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent}; use itertools::Itertools; use regex::Regex; use sqlx::migrate::MigrateDatabase; use tempfile::TempDir; use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY; - use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig}; + use crate::{SqlBindStyle, SqlCatalogBuilder}; const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; @@ -859,15 +897,13 @@ mod tests { let sql_lite_uri = format!("sqlite:{}", temp_path()); sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); - let config = SqlCatalogConfig::builder() + let builder = SqlCatalogBuilder::default() .uri(sql_lite_uri.to_string()) - .name("iceberg".to_string()) .warehouse_location(warehouse_location) .file_io(FileIOBuilder::new_fs_io().build().unwrap()) - .sql_bind_style(SqlBindStyle::QMark) - .build(); + .sql_bind_style(SqlBindStyle::QMark); - SqlCatalog::new(config).await.unwrap() + builder.load("iceberg", HashMap::new()).await.unwrap() } async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) { From 958bb4eb2425b45b72095d2786d340ce1157fa4a Mon Sep 17 00:00:00 2001 From: Yoshiki Kudo Date: Tue, 9 Sep 2025 22:53:17 +0900 Subject: [PATCH 03/11] chore(deps): Remove unused deps --- Cargo.lock | 2 -- crates/catalog/sql/Cargo.toml | 2 -- 2 files changed, 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c1c34fdddd..d309de08f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3715,13 +3715,11 @@ version = "0.6.0" dependencies = [ "async-trait", "iceberg", - "iceberg_test_utils", "itertools 0.13.0", "regex", "sqlx", "tempfile", "tokio", - "typed-builder 0.20.1", ] [[package]] diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 33ca700bf7..469da0bc37 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -32,10 +32,8 @@ repository = { workspace = true } async-trait = { workspace = true } iceberg = { workspace = true } sqlx = { version = "0.8.1", features = ["any"], default-features = false } -typed-builder = { workspace = true } [dev-dependencies] -iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } itertools = { workspace = true } regex = "1.10.5" sqlx = { version = "0.8.1", features = [ From a553e8a1747278a4c9eba314c1b9f38951b0bded Mon Sep 17 00:00:00 2001 From: Yoshiki Kudo Date: Mon, 22 Sep 2025 23:28:05 +0900 Subject: [PATCH 04/11] feat(catalog): SqlCatalogConfig is private --- crates/catalog/sql/src/catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index bf582e0753..1af08bebbf 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -143,7 +143,7 @@ impl CatalogBuilder for SqlCatalogBuilder { /// - `SqlBindStyle::DollarNumeric`: Binds SQL statements using `$1`, `$2`, etc., as placeholders. This is for PostgreSQL databases. /// - `SqlBindStyle::QuestionMark`: Binds SQL statements using `?` as a placeholder. This is for MySQL and SQLite databases. #[derive(Debug)] -pub struct SqlCatalogConfig { +struct SqlCatalogConfig { uri: String, name: String, warehouse_location: String, From 41b47c5fecf975312493e7158185544f686e47eb Mon Sep 17 00:00:00 2001 From: Yoshiki Kudo Date: Mon, 22 Sep 2025 23:30:24 +0900 Subject: [PATCH 05/11] feat(catalog): No need to set FileIO --- crates/catalog/sql/src/catalog.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 1af08bebbf..ab03fc7093 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -83,12 +83,6 @@ impl SqlCatalogBuilder { self } - /// Configure the FileIO - pub fn file_io(mut self, file_io: FileIO) -> Self { - self.0.file_io = file_io; - self - } - /// Configure the bound SQL Statement pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self { self.0.sql_bind_style = sql_bind_style; @@ -866,7 +860,6 @@ mod tests { use std::collections::{HashMap, HashSet}; use std::hash::Hash; - use iceberg::io::FileIOBuilder; use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; use iceberg::table::Table; use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent}; @@ -900,7 +893,6 @@ mod tests { let builder = SqlCatalogBuilder::default() .uri(sql_lite_uri.to_string()) .warehouse_location(warehouse_location) - .file_io(FileIOBuilder::new_fs_io().build().unwrap()) .sql_bind_style(SqlBindStyle::QMark); builder.load("iceberg", HashMap::new()).await.unwrap() From e2b4e237b535c0bbe61e14c1c107a824b4f0b4fe Mon Sep 17 00:00:00 2001 From: Yoshiki Kudo Date: Tue, 23 Sep 2025 01:58:29 +0900 Subject: [PATCH 06/11] feat(catalog): Add some prop keys for SqlCatalogBuilder --- Cargo.lock | 1 + crates/catalog/sql/Cargo.toml | 1 + crates/catalog/sql/src/catalog.rs | 177 ++++++++++++++++++++++++++++-- 3 files changed, 170 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d309de08f0..684c1f3494 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3718,6 +3718,7 @@ dependencies = [ "itertools 0.13.0", "regex", "sqlx", + "strum 0.27.2", "tempfile", "tokio", ] diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 469da0bc37..262422c284 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -32,6 +32,7 @@ repository = { workspace = true } async-trait = { workspace = true } iceberg = { workspace = true } sqlx = { version = "0.8.1", features = ["any"], default-features = false } +strum = { workspace = true } [dev-dependencies] itertools = { workspace = true } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index ab03fc7093..961b1d8c14 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -16,6 +16,7 @@ // under the License. use std::collections::{HashMap, HashSet}; +use std::str::FromStr; use std::time::Duration; use async_trait::async_trait; @@ -33,6 +34,13 @@ use crate::error::{ from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err, }; +/// catalog URI +pub const SQL_CATALOG_PROP_URI: &str = "uri"; +/// catalog warehouse location +pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; +/// catalog sql bind style +pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style"; + static CATALOG_TABLE_NAME: &str = "iceberg_tables"; static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name"; static CATALOG_FIELD_TABLE_NAME: &str = "table_name"; @@ -110,9 +118,26 @@ impl CatalogBuilder for SqlCatalogBuilder { fn load( mut self, name: impl Into, - props: HashMap, + mut props: HashMap, ) -> impl Future> + Send { let name = name.into(); + + if let Some(uri) = props.remove(SQL_CATALOG_PROP_URI) { + self.0.uri = uri; + } + if let Some(warehouse_location) = props.remove(SQL_CATALOG_PROP_WAREHOUSE) { + self.0.warehouse_location = warehouse_location; + } + + let mut valid_sql_bind_style = true; + if let Some(sql_bind_style) = props.remove(SQL_CATALOG_PROP_BIND_STYLE) { + if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) { + self.0.sql_bind_style = sql_bind_style; + } else { + valid_sql_bind_style = false; + } + } + self.0.props = props; async move { @@ -121,6 +146,16 @@ impl CatalogBuilder for SqlCatalogBuilder { ErrorKind::DataInvalid, "Catalog name cannot be empty", )) + } else if !valid_sql_bind_style { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "`{}` values are valid only if they're `{}` or `{}`", + SQL_CATALOG_PROP_BIND_STYLE, + SqlBindStyle::DollarNumeric, + SqlBindStyle::QMark + ), + )) } else { SqlCatalog::new(self.0).await } @@ -156,7 +191,7 @@ pub struct SqlCatalog { sql_bind_style: SqlBindStyle, } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, strum::EnumString, strum::Display, PartialOrd, Eq)] /// Set the SQL parameter bind style to either $1..$N (Postgres style) or ? (SQLite/MySQL/MariaDB) pub enum SqlBindStyle { /// DollarNumeric uses parameters of the form `$1..$N``, which is the Postgres style @@ -868,7 +903,10 @@ mod tests { use sqlx::migrate::MigrateDatabase; use tempfile::TempDir; - use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY; + use crate::catalog::{ + NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI, + SQL_CATALOG_PROP_WAREHOUSE, + }; use crate::{SqlBindStyle, SqlCatalogBuilder}; const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; @@ -890,12 +928,18 @@ mod tests { let sql_lite_uri = format!("sqlite:{}", temp_path()); sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); - let builder = SqlCatalogBuilder::default() - .uri(sql_lite_uri.to_string()) - .warehouse_location(warehouse_location) - .sql_bind_style(SqlBindStyle::QMark); - - builder.load("iceberg", HashMap::new()).await.unwrap() + let props = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.to_string()), + (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::DollarNumeric.to_string(), + ), + ]); + SqlCatalogBuilder::default() + .load("iceberg", props) + .await + .unwrap() } async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) { @@ -994,6 +1038,121 @@ mod tests { new_sql_catalog(warehouse_loc.clone()).await; } + #[tokio::test] + async fn test_builder_method() { + let sql_lite_uri = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); + let warehouse_location = temp_path(); + + let catalog = SqlCatalogBuilder::default() + .uri(sql_lite_uri.to_string()) + .warehouse_location(warehouse_location.clone()) + .sql_bind_style(SqlBindStyle::QMark) + .load("iceberg", HashMap::default()) + .await; + assert!(catalog.is_ok()); + + let catalog = catalog.unwrap(); + assert!(catalog.warehouse_location == warehouse_location); + assert!(catalog.sql_bind_style == SqlBindStyle::QMark); + } + + /// Overwriting an sqlite database with a non-existent path causes + /// catalog generation to fail + #[tokio::test] + async fn test_builder_props_non_existent_path_fails() { + let sql_lite_uri = format!("sqlite:{}", temp_path()); + let sql_lite_uri2 = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); + let warehouse_location = temp_path(); + + let catalog = SqlCatalogBuilder::default() + .uri(sql_lite_uri) + .warehouse_location(warehouse_location) + .load( + "iceberg", + HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2)]), + ) + .await; + assert!(catalog.is_err()); + } + + /// Even when an invalid URI is specified in a builder method, + /// it can be successfully overridden with a valid URI in props + /// for catalog generation to succeed. + #[tokio::test] + async fn test_builder_props_set_valid_uri() { + let sql_lite_uri = format!("sqlite:{}", temp_path()); + let sql_lite_uri2 = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); + let warehouse_location = temp_path(); + + let catalog = SqlCatalogBuilder::default() + .uri(sql_lite_uri2) + .warehouse_location(warehouse_location) + .load( + "iceberg", + HashMap::from_iter([(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone())]), + ) + .await; + assert!(catalog.is_ok()); + } + + /// values assigned via props take precedence + #[tokio::test] + async fn test_builder_props_take_precedence() { + let sql_lite_uri = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); + let warehouse_location = temp_path(); + let warehouse_location2 = temp_path(); + + let catalog = SqlCatalogBuilder::default() + .warehouse_location(warehouse_location2) + .sql_bind_style(SqlBindStyle::DollarNumeric) + .load( + "iceberg", + HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + warehouse_location.clone(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::QMark.to_string(), + ), + ]), + ) + .await; + + assert!(catalog.is_ok()); + + let catalog = catalog.unwrap(); + assert!(catalog.warehouse_location == warehouse_location); + assert!(catalog.sql_bind_style == SqlBindStyle::QMark); + } + + /// invalid value for `SqlBindStyle` causes catalog creation to fail + #[tokio::test] + async fn test_builder_props_invalid_bind_style_fails() { + let sql_lite_uri = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); + let warehouse_location = temp_path(); + + let catalog = SqlCatalogBuilder::default() + .load( + "iceberg", + HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri), + (SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location), + (SQL_CATALOG_PROP_BIND_STYLE.to_string(), "AAA".to_string()), + ]), + ) + .await; + + assert!(catalog.is_err()); + } + #[tokio::test] async fn test_list_namespaces_returns_empty_vector() { let warehouse_loc = temp_path(); From 52d039de4c1bd0a10ad2166dfd06bf6ca51bcd43 Mon Sep 17 00:00:00 2001 From: Yoshiki Kudo Date: Tue, 23 Sep 2025 02:30:38 +0900 Subject: [PATCH 07/11] fix(catalog): prioritize props & add docs --- crates/catalog/sql/src/catalog.rs | 111 ++++++++++++++++++++++++++++-- crates/catalog/sql/src/lib.rs | 37 ++++++++++ 2 files changed, 142 insertions(+), 6 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 961b1d8c14..cb6222fd56 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -80,24 +80,36 @@ impl Default for SqlCatalogBuilder { impl SqlCatalogBuilder { /// Configure the database URI + /// + /// If `SQL_CATALOG_PROP_URI` has a value set in `props` during `SqlCatalogBuilder::load`, + /// that value takes precedence, and the value specified by this method will not be used. pub fn uri(mut self, uri: impl Into) -> Self { self.0.uri = uri.into(); self } /// Configure the warehouse location + /// + /// If `SQL_CATALOG_PROP_WAREHOUSE` has a value set in `props` during `SqlCatalogBuilder::load`, + /// that value takes precedence, and the value specified by this method will not be used. pub fn warehouse_location(mut self, location: impl Into) -> Self { self.0.warehouse_location = location.into(); self } /// Configure the bound SQL Statement + /// + /// If `SQL_CATALOG_PROP_BIND_STYLE` has a value set in `props` during `SqlCatalogBuilder::load`, + /// that value takes precedence, and the value specified by this method will not be used. pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self { self.0.sql_bind_style = sql_bind_style; self } /// Configure the any properties + /// + /// If the same key has values set in `props` during `SqlCatalogBuilder::load`, + /// those values will take precedence. pub fn props(mut self, props: HashMap) -> Self { self.0.props = props; self @@ -106,6 +118,9 @@ impl SqlCatalogBuilder { /// Set a new property on the property to be configured. /// When multiple methods are executed with the same key, /// the later-set value takes precedence. + /// + /// If the same key has values set in `props` during `SqlCatalogBuilder::load`, + /// those values will take precedence. pub fn prop(mut self, key: impl Into, value: impl Into) -> Self { self.0.props.insert(key.into(), value.into()); self @@ -118,19 +133,23 @@ impl CatalogBuilder for SqlCatalogBuilder { fn load( mut self, name: impl Into, - mut props: HashMap, + props: HashMap, ) -> impl Future> + Send { let name = name.into(); - if let Some(uri) = props.remove(SQL_CATALOG_PROP_URI) { + for (k, v) in props { + self.0.props.insert(k, v); + } + + if let Some(uri) = self.0.props.remove(SQL_CATALOG_PROP_URI) { self.0.uri = uri; } - if let Some(warehouse_location) = props.remove(SQL_CATALOG_PROP_WAREHOUSE) { + if let Some(warehouse_location) = self.0.props.remove(SQL_CATALOG_PROP_WAREHOUSE) { self.0.warehouse_location = warehouse_location; } let mut valid_sql_bind_style = true; - if let Some(sql_bind_style) = props.remove(SQL_CATALOG_PROP_BIND_STYLE) { + if let Some(sql_bind_style) = self.0.props.remove(SQL_CATALOG_PROP_BIND_STYLE) { if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) { self.0.sql_bind_style = sql_bind_style; } else { @@ -138,8 +157,6 @@ impl CatalogBuilder for SqlCatalogBuilder { } } - self.0.props = props; - async move { if name.trim().is_empty() { Err(Error::new( @@ -1132,6 +1149,88 @@ mod tests { assert!(catalog.sql_bind_style == SqlBindStyle::QMark); } + /// values assigned via props take precedence + #[tokio::test] + async fn test_builder_props_take_precedence_props() { + let sql_lite_uri = format!("sqlite:{}", temp_path()); + let sql_lite_uri2 = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); + let warehouse_location = temp_path(); + let warehouse_location2 = temp_path(); + + let props = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + warehouse_location.clone(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::QMark.to_string(), + ), + ]); + let props2 = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2.clone()), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + warehouse_location2.clone(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::DollarNumeric.to_string(), + ), + ]); + + let catalog = SqlCatalogBuilder::default() + .props(props2) + .load("iceberg", props) + .await; + + assert!(catalog.is_ok()); + + let catalog = catalog.unwrap(); + assert!(catalog.warehouse_location == warehouse_location); + assert!(catalog.sql_bind_style == SqlBindStyle::QMark); + } + + /// values assigned via props take precedence + #[tokio::test] + async fn test_builder_props_take_precedence_prop() { + let sql_lite_uri = format!("sqlite:{}", temp_path()); + let sql_lite_uri2 = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); + let warehouse_location = temp_path(); + let warehouse_location2 = temp_path(); + + let props = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri.clone()), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + warehouse_location.clone(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::QMark.to_string(), + ), + ]); + + let catalog = SqlCatalogBuilder::default() + .prop(SQL_CATALOG_PROP_URI.to_string(), sql_lite_uri2) + .prop(SQL_CATALOG_PROP_WAREHOUSE.to_string(), warehouse_location2) + .prop( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::DollarNumeric.to_string(), + ) + .load("iceberg", props) + .await; + + assert!(catalog.is_ok()); + + let catalog = catalog.unwrap(); + assert!(catalog.warehouse_location == warehouse_location); + assert!(catalog.sql_bind_style == SqlBindStyle::QMark); + } + /// invalid value for `SqlBindStyle` causes catalog creation to fail #[tokio::test] async fn test_builder_props_invalid_bind_style_fails() { diff --git a/crates/catalog/sql/src/lib.rs b/crates/catalog/sql/src/lib.rs index 6861dab3f8..b76006ed3b 100644 --- a/crates/catalog/sql/src/lib.rs +++ b/crates/catalog/sql/src/lib.rs @@ -16,6 +16,43 @@ // under the License. //! Iceberg sql catalog implementation. +//! +//! To build a sql catalog with configurations +//! # Example +//! +//! ```rust, no_run +//! use std::collections::HashMap; +//! +//! use iceberg::CatalogBuilder; +//! use iceberg_catalog_sql::{ +//! SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI, SQL_CATALOG_PROP_WAREHOUSE, +//! SqlBindStyle, SqlCatalogBuilder, +//! }; +//! +//! #[tokio::main] +//! async fn main() { +//! let catalog = SqlCatalogBuilder::default() +//! .load( +//! "sql", +//! HashMap::from_iter([ +//! ( +//! SQL_CATALOG_PROP_URI.to_string(), +//! "http://localhost:8181".to_string(), +//! ), +//! ( +//! SQL_CATALOG_PROP_WAREHOUSE.to_string(), +//! "s3://warehouse".to_string(), +//! ), +//! ( +//! SQL_CATALOG_PROP_BIND_STYLE.to_string(), +//! SqlBindStyle::QMark.to_string(), +//! ), +//! ]), +//! ) +//! .await +//! .unwrap(); +//! } +//! ``` #![deny(missing_docs)] From a46c36c67758130f4fd60c213ddf369329b05f31 Mon Sep 17 00:00:00 2001 From: Yoshiki Kudo Date: Sun, 28 Sep 2025 00:11:27 +0900 Subject: [PATCH 08/11] fix(catalog): Remove unused traits --- crates/catalog/sql/src/catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index cb6222fd56..3b6f91dc6f 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -208,7 +208,7 @@ pub struct SqlCatalog { sql_bind_style: SqlBindStyle, } -#[derive(Debug, PartialEq, strum::EnumString, strum::Display, PartialOrd, Eq)] +#[derive(Debug, PartialEq, strum::EnumString, strum::Display)] /// Set the SQL parameter bind style to either $1..$N (Postgres style) or ? (SQLite/MySQL/MariaDB) pub enum SqlBindStyle { /// DollarNumeric uses parameters of the form `$1..$N``, which is the Postgres style From a6dfdd506783fb4bdc29b7b05163c53b381b8bb9 Mon Sep 17 00:00:00 2001 From: Yoshiki Kudo Date: Sun, 28 Sep 2025 00:18:39 +0900 Subject: [PATCH 09/11] feat(catalog): Merge props --- crates/catalog/sql/src/catalog.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 3b6f91dc6f..03645ec093 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -111,7 +111,9 @@ impl SqlCatalogBuilder { /// If the same key has values set in `props` during `SqlCatalogBuilder::load`, /// those values will take precedence. pub fn props(mut self, props: HashMap) -> Self { - self.0.props = props; + for (k, v) in props { + self.0.props.insert(k, v); + } self } From 52364e405d0c29b92c0ef718f70f19e5d9839157 Mon Sep 17 00:00:00 2001 From: Yoshiki Kudo Date: Mon, 29 Sep 2025 22:38:21 +0900 Subject: [PATCH 10/11] feat(catalog): Inferred from warehouse location --- crates/catalog/sql/src/catalog.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 03645ec093..6602fa76dc 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -20,7 +20,7 @@ use std::str::FromStr; use std::time::Duration; use async_trait::async_trait; -use iceberg::io::{FileIO, FileIOBuilder}; +use iceberg::io::FileIO; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ @@ -71,7 +71,6 @@ impl Default for SqlCatalogBuilder { uri: "".to_string(), name: "".to_string(), warehouse_location: "".to_string(), - file_io: FileIOBuilder::new_fs_io().build().unwrap(), sql_bind_style: SqlBindStyle::DollarNumeric, props: HashMap::new(), }) @@ -195,7 +194,6 @@ struct SqlCatalogConfig { uri: String, name: String, warehouse_location: String, - file_io: FileIO, sql_bind_style: SqlBindStyle, props: HashMap, } @@ -222,6 +220,7 @@ pub enum SqlBindStyle { impl SqlCatalog { /// Create new sql catalog instance async fn new(config: SqlCatalogConfig) -> Result { + let fileio = FileIO::from_path(&config.warehouse_location)?.build()?; install_default_drivers(); let max_connections: u32 = config .props @@ -277,7 +276,7 @@ impl SqlCatalog { name: config.name.to_owned(), connection: pool, warehouse_location: config.warehouse_location, - fileio: config.file_io, + fileio, sql_bind_style: config.sql_bind_style, }) } From 33bcd229f0117b8a87fb985ddf709506bde0bfdc Mon Sep 17 00:00:00 2001 From: Yoshiki Kudo Date: Tue, 30 Sep 2025 22:54:57 +0900 Subject: [PATCH 11/11] feat(catalog): Enable creation of SqlCatalog via load --- Cargo.lock | 3 +++ Cargo.toml | 1 + crates/catalog/loader/Cargo.toml | 9 +++++++++ crates/catalog/loader/src/lib.rs | 34 ++++++++++++++++++++++++++++++++ 4 files changed, 47 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 684c1f3494..4f40b928e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3670,6 +3670,9 @@ dependencies = [ "iceberg-catalog-hms", "iceberg-catalog-rest", "iceberg-catalog-s3tables", + "iceberg-catalog-sql", + "sqlx", + "tempfile", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index fe8078d446..c62df93d9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,6 +80,7 @@ iceberg-catalog-rest = { version = "0.6.0", path = "./crates/catalog/rest" } iceberg-catalog-glue = { version = "0.6.0", path = "./crates/catalog/glue" } iceberg-catalog-s3tables = { version = "0.6.0", path = "./crates/catalog/s3tables" } iceberg-catalog-hms = { version = "0.6.0", path = "./crates/catalog/hms" } +iceberg-catalog-sql = { version = "0.6.0", path = "./crates/catalog/sql" } iceberg-datafusion = { version = "0.6.0", path = "./crates/integrations/datafusion" } indicatif = "0.17" itertools = "0.13" diff --git a/crates/catalog/loader/Cargo.toml b/crates/catalog/loader/Cargo.toml index 014f84a3a1..4228ed7d5a 100644 --- a/crates/catalog/loader/Cargo.toml +++ b/crates/catalog/loader/Cargo.toml @@ -34,5 +34,14 @@ iceberg-catalog-rest = { workspace = true } iceberg-catalog-glue = { workspace = true } iceberg-catalog-s3tables = { workspace = true } iceberg-catalog-hms = { workspace = true } +iceberg-catalog-sql = { workspace = true } tokio = { workspace = true } async-trait = { workspace = true } + +[dev-dependencies] +sqlx = { version = "0.8.1", features = [ + "runtime-tokio", + "sqlite", + "migrate", +], default-features = false } +tempfile = { workspace = true } diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs index 9c18ab4e5f..e118ef86a9 100644 --- a/crates/catalog/loader/src/lib.rs +++ b/crates/catalog/loader/src/lib.rs @@ -24,6 +24,7 @@ use iceberg_catalog_glue::GlueCatalogBuilder; use iceberg_catalog_hms::HmsCatalogBuilder; use iceberg_catalog_rest::RestCatalogBuilder; use iceberg_catalog_s3tables::S3TablesCatalogBuilder; +use iceberg_catalog_sql::SqlCatalogBuilder; /// A CatalogBuilderFactory creating a new catalog builder. type CatalogBuilderFactory = fn() -> Box; @@ -34,6 +35,7 @@ static CATALOG_REGISTRY: &[(&str, CatalogBuilderFactory)] = &[ ("glue", || Box::new(GlueCatalogBuilder::default())), ("s3tables", || Box::new(S3TablesCatalogBuilder::default())), ("hms", || Box::new(HmsCatalogBuilder::default())), + ("sql", || Box::new(SqlCatalogBuilder::default())), ]; /// Return the list of supported catalog types. @@ -108,6 +110,9 @@ impl CatalogLoader<'_> { mod tests { use std::collections::HashMap; + use sqlx::migrate::MigrateDatabase; + use tempfile::TempDir; + use crate::{CatalogLoader, load}; #[tokio::test] @@ -220,6 +225,35 @@ mod tests { assert!(catalog.is_ok()); } + fn temp_path() -> String { + let temp_dir = TempDir::new().unwrap(); + temp_dir.path().to_str().unwrap().to_string() + } + + #[tokio::test] + async fn test_catalog_loader_pattern_sql_catalog() { + use iceberg_catalog_sql::{SQL_CATALOG_PROP_URI, SQL_CATALOG_PROP_WAREHOUSE}; + + let uri = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&uri).await.unwrap(); + + let catalog_loader = load("sql").unwrap(); + let catalog = catalog_loader + .load( + "sql".to_string(), + HashMap::from([ + (SQL_CATALOG_PROP_URI.to_string(), uri), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + "s3://warehouse".to_string(), + ), + ]), + ) + .await; + + assert!(catalog.is_ok()); + } + #[tokio::test] async fn test_error_message_includes_supported_types() { let err = match load("does-not-exist") {