Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 118 additions & 1 deletion crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataB
use super::{
DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef,
SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType,
TableProperties,
};
use crate::error::{Result, timestamp_ms_to_utc};
use crate::io::FileIO;
Expand Down Expand Up @@ -360,6 +361,13 @@ impl TableMetadata {
&self.properties
}

/// Returns typed table properties parsed from the raw properties map with defaults.
pub fn table_properties(&self) -> Result<TableProperties> {
TableProperties::try_from(&self.properties).map_err(|e| {
Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e)
})
}

/// Return location of statistics files.
#[inline]
pub fn statistics_iter(&self) -> impl ExactSizeIterator<Item = &StatisticsFile> {
Expand Down Expand Up @@ -1561,7 +1569,6 @@ mod tests {
use uuid::Uuid;

use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
use crate::TableCreation;
use crate::io::FileIOBuilder;
use crate::spec::table_metadata::TableMetadata;
use crate::spec::{
Expand All @@ -1570,6 +1577,7 @@ mod tests {
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile,
Summary, Transform, Type, UnboundPartitionField,
};
use crate::{ErrorKind, TableCreation};

fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
Expand Down Expand Up @@ -3861,4 +3869,113 @@ mod tests {
"Parsing should fail for sort order ID 0 with fields"
);
}

#[test]
fn test_table_properties_with_defaults() {
use crate::spec::TableProperties;

let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
])
.build()
.unwrap();

let metadata = TableMetadataBuilder::new(
schema,
PartitionSpec::unpartition_spec().into_unbound(),
SortOrder::unsorted_order(),
"s3://test/location".to_string(),
FormatVersion::V2,
HashMap::new(),
)
.unwrap()
.build()
.unwrap()
.metadata;

let props = metadata.table_properties().unwrap();

assert_eq!(
props.commit_num_retries,
TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
);
assert_eq!(
props.write_target_file_size_bytes,
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
);
}

#[test]
fn test_table_properties_with_custom_values() {
use crate::spec::TableProperties;

let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
])
.build()
.unwrap();

let properties = HashMap::from([
(
TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
"10".to_string(),
),
(
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
"1024".to_string(),
),
]);

let metadata = TableMetadataBuilder::new(
schema,
PartitionSpec::unpartition_spec().into_unbound(),
SortOrder::unsorted_order(),
"s3://test/location".to_string(),
FormatVersion::V2,
properties,
)
.unwrap()
.build()
.unwrap()
.metadata;

let props = metadata.table_properties().unwrap();

assert_eq!(props.commit_num_retries, 10);
assert_eq!(props.write_target_file_size_bytes, 1024);
}

#[test]
fn test_table_properties_with_invalid_value() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
])
.build()
.unwrap();

let properties = HashMap::from([(
"commit.retry.num-retries".to_string(),
"not_a_number".to_string(),
)]);

let metadata = TableMetadataBuilder::new(
schema,
PartitionSpec::unpartition_spec().into_unbound(),
SortOrder::unsorted_order(),
"s3://test/location".to_string(),
FormatVersion::V2,
properties,
)
.unwrap()
.build()
.unwrap()
.metadata;

let err = metadata.table_properties().unwrap_err();
assert_eq!(err.kind(), ErrorKind::DataInvalid);
assert!(err.message().contains("Invalid table properties"));
}
}
7 changes: 2 additions & 5 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use crate::transaction::update_location::UpdateLocationAction;
use crate::transaction::update_properties::UpdatePropertiesAction;
use crate::transaction::update_statistics::UpdateStatisticsAction;
use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
use crate::{Catalog, TableCommit, TableRequirement, TableUpdate};

/// Table transaction.
#[derive(Clone)]
Expand Down Expand Up @@ -163,10 +163,7 @@ impl Transaction {
return Ok(self.table);
}

let table_props =
TableProperties::try_from(self.table.metadata().properties()).map_err(|e| {
Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e)
})?;
let table_props = self.table.metadata().table_properties()?;

let backoff = Self::build_backoff(table_props)?;
let tx = self;
Expand Down
61 changes: 12 additions & 49 deletions crates/integrations/datafusion/src/physical_plan/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion::physical_plan::{
};
use futures::StreamExt;
use iceberg::arrow::FieldMatchMode;
use iceberg::spec::{DataFileFormat, TableProperties, serialize_data_file_to_json};
use iceberg::spec::{DataFileFormat, serialize_data_file_to_json};
use iceberg::table::Table;
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::ParquetWriterBuilder;
Expand Down Expand Up @@ -208,15 +208,16 @@ impl ExecutionPlan for IcebergWriteExec {
let partition_type = self.table.metadata().default_partition_type().clone();
let format_version = self.table.metadata().format_version();

// Get typed table properties
let table_props = self
.table
.metadata()
.table_properties()
.map_err(to_datafusion_error)?;

// Check data file format
let file_format = DataFileFormat::from_str(
self.table
.metadata()
.properties()
.get(TableProperties::PROPERTY_DEFAULT_FILE_FORMAT)
.unwrap_or(&TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()),
)
.map_err(to_datafusion_error)?;
let file_format = DataFileFormat::from_str(&table_props.write_format_default)
.map_err(to_datafusion_error)?;
if file_format != DataFileFormat::Parquet {
return Err(to_datafusion_error(Error::new(
ErrorKind::FeatureUnsupported,
Expand All @@ -230,24 +231,7 @@ impl ExecutionPlan for IcebergWriteExec {
self.table.metadata().current_schema().clone(),
FieldMatchMode::Name,
);
let target_file_size = match self
.table
.metadata()
.properties()
.get(TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES)
{
Some(value_str) => value_str
.parse::<usize>()
.map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
"Invalid value for write.target-file-size-bytes",
)
.with_source(e)
})
.map_err(to_datafusion_error)?,
None => TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
};
let target_file_size = table_props.write_target_file_size_bytes;

let file_io = self.table.file_io().clone();
// todo location_gen and file_name_gen should be configurable
Expand All @@ -266,28 +250,7 @@ impl ExecutionPlan for IcebergWriteExec {
let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);

// Create TaskWriter
let fanout_enabled = self
.table
.metadata()
.properties()
.get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED)
.map(|value| {
value
.parse::<bool>()
.map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
format!(
"Invalid value for {}, expected 'true' or 'false'",
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
),
)
.with_source(e)
})
.map_err(to_datafusion_error)
})
.transpose()?
.unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT);
let fanout_enabled = table_props.write_datafusion_fanout_enabled;
let schema = self.table.metadata().current_schema().clone();
let partition_spec = self.table.metadata().default_partition_spec().clone();
let task_writer = TaskWriter::try_new(
Expand Down
Loading