Skip to content
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions crates/catalog/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ repository = { workspace = true }
async-trait = { workspace = true }
iceberg = { workspace = true }
sqlx = { version = "0.8.1", features = ["any"], default-features = false }
typed-builder = { workspace = true }

[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
itertools = { workspace = true }
regex = "1.10.5"
sqlx = { version = "0.8.1", features = [
Expand Down
105 changes: 91 additions & 14 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ use std::collections::{HashMap, HashSet};
use std::time::Duration;

use async_trait::async_trait;
use iceberg::io::FileIO;
use iceberg::io::{FileIO, FileIOBuilder};
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit,
TableCreation, TableIdent,
Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
TableCommit, TableCreation, TableIdent,
};
use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
use sqlx::{Any, AnyPool, Row, Transaction};
use typed_builder::TypedBuilder;

use crate::error::{
from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err,
Expand All @@ -54,6 +53,87 @@ static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if n
static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed
static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning

/// Builder for [`SqlCatalog`]
#[derive(Debug)]
pub struct SqlCatalogBuilder(SqlCatalogConfig);

impl Default for SqlCatalogBuilder {
fn default() -> Self {
Self(SqlCatalogConfig {
uri: "".to_string(),
name: "".to_string(),
warehouse_location: "".to_string(),
file_io: FileIOBuilder::new_fs_io().build().unwrap(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not panic here, instead we should report an error.

sql_bind_style: SqlBindStyle::DollarNumeric,
props: HashMap::new(),
})
}
}

impl SqlCatalogBuilder {
/// Configure the database URI
pub fn uri(mut self, uri: impl Into<String>) -> Self {
self.0.uri = uri.into();
self
}

/// Configure the warehouse location
pub fn warehouse_location(mut self, location: impl Into<String>) -> Self {
self.0.warehouse_location = location.into();
self
}

/// Configure the FileIO
pub fn file_io(mut self, file_io: FileIO) -> Self {
self.0.file_io = file_io;
self
}

/// Configure the bound SQL Statement
pub fn sql_bind_style(mut self, sql_bind_style: SqlBindStyle) -> Self {
self.0.sql_bind_style = sql_bind_style;
self
}

/// Configure the any properties
pub fn props(mut self, props: HashMap<String, String>) -> Self {
self.0.props = props;
self
}

/// Set a new property on the property to be configured.
/// When multiple methods are executed with the same key,
/// the later-set value takes precedence.
pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.0.props.insert(key.into(), value.into());
self
}
}

impl CatalogBuilder for SqlCatalogBuilder {
type C = SqlCatalog;

fn load(
mut self,
name: impl Into<String>,
props: HashMap<String, String>,
) -> impl Future<Output = Result<Self::C>> + Send {
let name = name.into();
self.0.props = props;

async move {
if name.trim().is_empty() {
Err(Error::new(
ErrorKind::DataInvalid,
"Catalog name cannot be empty",
))
} else {
SqlCatalog::new(self.0).await
}
}
}
}

/// A struct representing the SQL catalog configuration.
///
/// This struct contains various parameters that are used to configure a SQL catalog,
Expand All @@ -62,14 +142,13 @@ static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each con
/// The options available for this parameter include:
/// - `SqlBindStyle::DollarNumeric`: Binds SQL statements using `$1`, `$2`, etc., as placeholders. This is for PostgreSQL databases.
/// - `SqlBindStyle::QuestionMark`: Binds SQL statements using `?` as a placeholder. This is for MySQL and SQLite databases.
#[derive(Debug, TypedBuilder)]
#[derive(Debug)]
pub struct SqlCatalogConfig {
uri: String,
name: String,
warehouse_location: String,
file_io: FileIO,
sql_bind_style: SqlBindStyle,
#[builder(default)]
props: HashMap<String, String>,
}

Expand All @@ -94,7 +173,7 @@ pub enum SqlBindStyle {

impl SqlCatalog {
/// Create new sql catalog instance
pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
async fn new(config: SqlCatalogConfig) -> Result<Self> {
install_default_drivers();
let max_connections: u32 = config
.props
Expand Down Expand Up @@ -790,14 +869,14 @@ mod tests {
use iceberg::io::FileIOBuilder;
use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
use iceberg::table::Table;
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent};
use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
use itertools::Itertools;
use regex::Regex;
use sqlx::migrate::MigrateDatabase;
use tempfile::TempDir;

use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY;
use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig};
use crate::{SqlBindStyle, SqlCatalogBuilder};

const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";

Expand All @@ -818,15 +897,13 @@ mod tests {
let sql_lite_uri = format!("sqlite:{}", temp_path());
sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();

let config = SqlCatalogConfig::builder()
let builder = SqlCatalogBuilder::default()
.uri(sql_lite_uri.to_string())
.name("iceberg".to_string())
.warehouse_location(warehouse_location)
.file_io(FileIOBuilder::new_fs_io().build().unwrap())
.sql_bind_style(SqlBindStyle::QMark)
.build();
.sql_bind_style(SqlBindStyle::QMark);

SqlCatalog::new(config).await.unwrap()
builder.load("iceberg", HashMap::new()).await.unwrap()
}

async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
Expand Down
Loading