diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index cfa25deccb..585cb3e2bc 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -37,6 +37,7 @@ pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataB use super::{ DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef, SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType, + TableProperties, }; use crate::error::{Result, timestamp_ms_to_utc}; use crate::io::FileIO; @@ -360,6 +361,13 @@ impl TableMetadata { &self.properties } + /// Returns typed table properties parsed from the raw properties map with defaults. + pub fn table_properties(&self) -> Result { + TableProperties::try_from(&self.properties).map_err(|e| { + Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e) + }) + } + /// Return location of statistics files. #[inline] pub fn statistics_iter(&self) -> impl ExactSizeIterator { @@ -1561,7 +1569,6 @@ mod tests { use uuid::Uuid; use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder}; - use crate::TableCreation; use crate::io::FileIOBuilder; use crate::spec::table_metadata::TableMetadata; use crate::spec::{ @@ -1570,6 +1577,7 @@ mod tests { SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile, Summary, Transform, Type, UnboundPartitionField, }; + use crate::{ErrorKind, TableCreation}; fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) { let desered_type: TableMetadata = serde_json::from_str(json).unwrap(); @@ -3861,4 +3869,113 @@ mod tests { "Parsing should fail for sort order ID 0 with fields" ); } + + #[test] + fn test_table_properties_with_defaults() { + use crate::spec::TableProperties; + + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + + let metadata = TableMetadataBuilder::new( + schema, + PartitionSpec::unpartition_spec().into_unbound(), + SortOrder::unsorted_order(), + "s3://test/location".to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata; + + let props = metadata.table_properties().unwrap(); + + assert_eq!( + props.commit_num_retries, + TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT + ); + assert_eq!( + props.write_target_file_size_bytes, + TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT + ); + } + + #[test] + fn test_table_properties_with_custom_values() { + use crate::spec::TableProperties; + + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + + let properties = HashMap::from([ + ( + TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(), + "10".to_string(), + ), + ( + TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(), + "1024".to_string(), + ), + ]); + + let metadata = TableMetadataBuilder::new( + schema, + PartitionSpec::unpartition_spec().into_unbound(), + SortOrder::unsorted_order(), + "s3://test/location".to_string(), + FormatVersion::V2, + properties, + ) + .unwrap() + .build() + .unwrap() + .metadata; + + let props = metadata.table_properties().unwrap(); + + assert_eq!(props.commit_num_retries, 10); + assert_eq!(props.write_target_file_size_bytes, 1024); + } + + #[test] + fn test_table_properties_with_invalid_value() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + + let properties = HashMap::from([( + "commit.retry.num-retries".to_string(), + "not_a_number".to_string(), + )]); + + let metadata = TableMetadataBuilder::new( + schema, + PartitionSpec::unpartition_spec().into_unbound(), + SortOrder::unsorted_order(), + "s3://test/location".to_string(), + FormatVersion::V2, + properties, + ) + .unwrap() + .build() + .unwrap() + .metadata; + + let err = metadata.table_properties().unwrap_err(); + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!(err.message().contains("Invalid table properties")); + } } diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 8ddaa26698..074c7fefe4 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -76,7 +76,7 @@ use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; use crate::transaction::update_statistics::UpdateStatisticsAction; use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction; -use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; +use crate::{Catalog, TableCommit, TableRequirement, TableUpdate}; /// Table transaction. #[derive(Clone)] @@ -163,10 +163,7 @@ impl Transaction { return Ok(self.table); } - let table_props = - TableProperties::try_from(self.table.metadata().properties()).map_err(|e| { - Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e) - })?; + let table_props = self.table.metadata().table_properties()?; let backoff = Self::build_backoff(table_props)?; let tx = self; diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index fdfddf877b..0dea150d31 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -36,7 +36,7 @@ use datafusion::physical_plan::{ }; use futures::StreamExt; use iceberg::arrow::FieldMatchMode; -use iceberg::spec::{DataFileFormat, TableProperties, serialize_data_file_to_json}; +use iceberg::spec::{DataFileFormat, serialize_data_file_to_json}; use iceberg::table::Table; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; @@ -208,15 +208,16 @@ impl ExecutionPlan for IcebergWriteExec { let partition_type = self.table.metadata().default_partition_type().clone(); let format_version = self.table.metadata().format_version(); + // Get typed table properties + let table_props = self + .table + .metadata() + .table_properties() + .map_err(to_datafusion_error)?; + // Check data file format - let file_format = DataFileFormat::from_str( - self.table - .metadata() - .properties() - .get(TableProperties::PROPERTY_DEFAULT_FILE_FORMAT) - .unwrap_or(&TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()), - ) - .map_err(to_datafusion_error)?; + let file_format = DataFileFormat::from_str(&table_props.write_format_default) + .map_err(to_datafusion_error)?; if file_format != DataFileFormat::Parquet { return Err(to_datafusion_error(Error::new( ErrorKind::FeatureUnsupported, @@ -230,24 +231,7 @@ impl ExecutionPlan for IcebergWriteExec { self.table.metadata().current_schema().clone(), FieldMatchMode::Name, ); - let target_file_size = match self - .table - .metadata() - .properties() - .get(TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES) - { - Some(value_str) => value_str - .parse::() - .map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - "Invalid value for write.target-file-size-bytes", - ) - .with_source(e) - }) - .map_err(to_datafusion_error)?, - None => TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, - }; + let target_file_size = table_props.write_target_file_size_bytes; let file_io = self.table.file_io().clone(); // todo location_gen and file_name_gen should be configurable @@ -266,28 +250,7 @@ impl ExecutionPlan for IcebergWriteExec { let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); // Create TaskWriter - let fanout_enabled = self - .table - .metadata() - .properties() - .get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED) - .map(|value| { - value - .parse::() - .map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - format!( - "Invalid value for {}, expected 'true' or 'false'", - TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED - ), - ) - .with_source(e) - }) - .map_err(to_datafusion_error) - }) - .transpose()? - .unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT); + let fanout_enabled = table_props.write_datafusion_fanout_enabled; let schema = self.table.metadata().current_schema().clone(); let partition_spec = self.table.metadata().default_partition_spec().clone(); let task_writer = TaskWriter::try_new(