diff --git a/Cargo.lock b/Cargo.lock index 596e943bc..b3d01a985 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3628,6 +3628,7 @@ dependencies = [ "opendal", "ordered-float 4.6.0", "parquet", + "paste", "pretty_assertions", "rand 0.8.5", "regex", diff --git a/Cargo.toml b/Cargo.toml index 999b91175..60e79c151 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,10 +76,10 @@ futures = "0.3" hive_metastore = "0.2.0" http = "1.2" iceberg = { version = "0.7.0", path = "./crates/iceberg" } -iceberg-catalog-rest = { version = "0.7.0", path = "./crates/catalog/rest" } iceberg-catalog-glue = { version = "0.7.0", path = "./crates/catalog/glue" } -iceberg-catalog-s3tables = { version = "0.7.0", path = "./crates/catalog/s3tables" } iceberg-catalog-hms = { version = "0.7.0", path = "./crates/catalog/hms" } +iceberg-catalog-rest = { version = "0.7.0", path = "./crates/catalog/rest" } +iceberg-catalog-s3tables = { version = "0.7.0", path = "./crates/catalog/s3tables" } iceberg-datafusion = { version = "0.7.0", path = "./crates/integrations/datafusion" } indicatif = "0.17" itertools = "0.13" @@ -95,6 +95,7 @@ once_cell = "1.20" opendal = "0.54.0" ordered-float = "4" parquet = "55.1" +paste = "1.0.15" pilota = "0.11.10" port_scanner = "0.1.5" pretty_assertions = "1.4" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index d592700b7..3fe0651a0 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -73,9 +73,10 @@ once_cell = { workspace = true } opendal = { workspace = true } ordered-float = { workspace = true } parquet = { workspace = true, features = ["async"] } +paste = { workspace = true } rand = { workspace = true } -reqwest = { workspace = true } reqsign = { version = "0.16.3", optional = true, default-features = false } +reqwest = { workspace = true } roaring = { workspace = true } rust_decimal = { workspace = true } serde = { workspace = true } diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index 90dafcc11..44b35e5a6 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -30,6 +30,7 @@ mod sort; mod statistic_file; mod table_metadata; mod table_metadata_builder; +mod table_properties; mod transform; mod values; mod view_metadata; @@ -48,6 +49,7 @@ pub use snapshot_summary::*; pub use sort::*; pub use statistic_file::*; pub use table_metadata::*; +pub use table_properties::*; pub use transform::*; pub use values::*; pub use view_metadata::*; diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs new file mode 100644 index 000000000..2c4d28c19 --- /dev/null +++ b/crates/iceberg/src/spec/table_properties.rs @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +/// Macro to define table properties with type-safe access and validation. +/// +/// # Example +/// ```ignore +/// define_table_properties! { +/// TableProperties { +/// commit_num_retries: usize = "commit.retry.num-retries" => 4, +/// commit_min_retry_wait_ms: u64 = "commit.retry.min-wait-ms" => 100, +/// write_format_default: String = "write.format.default" => "parquet", +/// } +/// } +/// ``` +macro_rules! define_table_properties { + ( + $struct_name:ident { + $( + $(#[$field_doc:meta])* + $field_name:ident: $field_type:ty = $key:literal => $default:expr + ),* $(,)? + } + ) => { + /// Table properties with type-safe access and validation + #[derive(Clone)] + pub struct $struct_name { + $( + $(#[$field_doc])* + pub $field_name: $field_type, + )* + } + + impl $struct_name { + $( + paste::paste! { + #[doc = "Property key for " $key] + pub const []: &'static str = $key; + } + )* + + /// Create a new instance with default values + pub fn new() -> Self { + Self { + $($field_name: parse_default!($default, $field_type),)* + } + } + } + + impl Default for $struct_name { + fn default() -> Self { + Self::new() + } + } + + impl TryFrom> for $struct_name { + type Error = anyhow::Error; + + fn try_from(properties: HashMap) -> Result { + let mut result = Self::new(); + + $( + paste::paste! { + if let Some(value_str) = properties.get(Self::[]) { + result.$field_name = parse_value!( + value_str, + $field_type, + Self::[] + )?; + } + } + )* + + Ok(result) + } + } + + impl From<$struct_name> for HashMap { + fn from(properties: $struct_name) -> Self { + let mut map = HashMap::new(); + + $( + paste::paste! { + map.insert( + $struct_name::[].to_string(), + format_value!(properties.$field_name) + ); + } + )* + + map + } + } + }; +} + +/// Helper macro to parse default values based on type +#[macro_export] +macro_rules! parse_default { + ($value:expr, String) => { + $value.to_string() + }; + ($value:expr, $type:ty) => { + $value + }; +} + +/// Helper macro to parse values from strings based on type +#[macro_export] +macro_rules! parse_value { + ($value:expr, String, $key:expr) => { + Ok::($value.clone()) + }; + ($value:expr, $type:ty, $key:expr) => { + $value + .parse::<$type>() + .map_err(|e| anyhow::anyhow!("Invalid value for {}: {}", $key, e)) + }; +} + +/// Helper macro to format values for storage +#[macro_export] +macro_rules! format_value { + ($value:expr) => { + $value.to_string() + }; +} + +// Define the actual TableProperties struct using the macro +define_table_properties! { + TableProperties { + /// Number of commit retries + commit_num_retries: usize = "commit.retry.num-retries" => 4, + /// Minimum wait time (ms) between retries + commit_min_retry_wait_ms: u64 = "commit.retry.min-wait-ms" => 100, + /// Maximum wait time (ms) between retries + commit_max_retry_wait_ms: u64 = "commit.retry.max-wait-ms" => 60000, + /// Total maximum retry time (ms) + commit_total_retry_timeout_ms: u64 = "commit.retry.total-timeout-ms" => 1800000, + /// Default file format for data files + write_format_default: String = "write.format.default" => "parquet".to_string(), + /// Target file size in bytes + write_target_file_size_bytes: u64 = "write.target-file-size-bytes" => 536870912, + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_table_properties_default() { + let properties = TableProperties::new(); + assert_eq!(properties.commit_num_retries, 4); + assert_eq!(properties.commit_min_retry_wait_ms, 100); + assert_eq!(properties.commit_max_retry_wait_ms, 60000); + assert_eq!(properties.commit_total_retry_timeout_ms, 1800000); + assert_eq!(properties.write_format_default, "parquet"); + assert_eq!(properties.write_target_file_size_bytes, 536870912); + } + + #[test] + fn test_table_properties_from_map() { + let properties = TableProperties::try_from(HashMap::from([ + ("commit.retry.num-retries".to_string(), "5".to_string()), + ("commit.retry.min-wait-ms".to_string(), "10".to_string()), + ("write.format.default".to_string(), "avro".to_string()), + ])) + .unwrap(); + assert_eq!(properties.commit_num_retries, 5); + assert_eq!(properties.commit_min_retry_wait_ms, 10); + assert_eq!(properties.commit_max_retry_wait_ms, 60000); + assert_eq!(properties.commit_max_retry_wait_ms, 1800000); + assert_eq!(properties.write_format_default, "avro"); + assert_eq!(properties.write_target_file_size_bytes, 536870912); + } +}