Skip to content
Open
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ futures = "0.3"
hive_metastore = "0.2.0"
http = "1.2"
iceberg = { version = "0.7.0", path = "./crates/iceberg" }
iceberg-catalog-rest = { version = "0.7.0", path = "./crates/catalog/rest" }
iceberg-catalog-glue = { version = "0.7.0", path = "./crates/catalog/glue" }
iceberg-catalog-s3tables = { version = "0.7.0", path = "./crates/catalog/s3tables" }
iceberg-catalog-hms = { version = "0.7.0", path = "./crates/catalog/hms" }
iceberg-catalog-rest = { version = "0.7.0", path = "./crates/catalog/rest" }
iceberg-catalog-s3tables = { version = "0.7.0", path = "./crates/catalog/s3tables" }
iceberg-datafusion = { version = "0.7.0", path = "./crates/integrations/datafusion" }
indicatif = "0.17"
itertools = "0.13"
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ opendal = { workspace = true }
ordered-float = { workspace = true }
parquet = { workspace = true, features = ["async"] }
rand = { workspace = true }
reqwest = { workspace = true }
reqsign = { version = "0.16.3", optional = true, default-features = false }
reqwest = { workspace = true }
roaring = { workspace = true }
rust_decimal = { workspace = true }
serde = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod sort;
mod statistic_file;
mod table_metadata;
mod table_metadata_builder;
mod table_properties;
mod transform;
mod values;
mod view_metadata;
Expand All @@ -48,6 +49,7 @@ pub use snapshot_summary::*;
pub use sort::*;
pub use statistic_file::*;
pub use table_metadata::*;
pub use table_properties::*;
pub use transform::*;
pub use values::*;
pub use view_metadata::*;
Expand Down
32 changes: 0 additions & 32 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,38 +99,6 @@ pub const RESERVED_PROPERTIES: [&str; 9] = [
PROPERTY_DEFAULT_SORT_ORDER,
];

/// Property key for number of commit retries.
pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
/// Default value for number of commit retries.
pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;

/// Property key for minimum wait time (ms) between retries.
pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
/// Default value for minimum wait time (ms) between retries.
pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;

/// Property key for maximum wait time (ms) between retries.
pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
/// Default value for maximum wait time (ms) between retries.
pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 minute

/// Property key for total maximum retry time (ms).
pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms";
/// Default value for total maximum retry time (ms).
pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes

/// Default file format for data files
pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
/// Default file format for delete files
pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default";
/// Default value for data file format
pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";

/// Target file size for newly written files.
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
/// Default target file size
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB

/// Reference to [`TableMetadata`].
pub type TableMetadataRef = Arc<TableMetadata>;

Expand Down
125 changes: 125 additions & 0 deletions crates/iceberg/src/spec/table_properties.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

// Helper function to parse a property from a HashMap
// If the property is not found, use the default value
fn parse_property<T: std::str::FromStr>(
properties: &HashMap<String, String>,
key: &str,
default: T,
) -> Result<T, anyhow::Error>
where
<T as std::str::FromStr>::Err: std::fmt::Display,
{
properties.get(key).map_or(Ok(default), |value| {
value
.parse::<T>()
.map_err(|e| anyhow::anyhow!("Invalid value for {}: {}", key, e))
})
}

/// TableProperties that contains the properties of a table.
pub struct TableProperties {
/// The number of times to retry a commit.
pub commit_num_retries: usize,
/// The minimum wait time between retries.
pub commit_min_retry_wait_ms: u64,
/// The maximum wait time between retries.
pub commit_max_retry_wait_ms: u64,
/// The total timeout for commit retries.
pub commit_total_retry_timeout_ms: u64,
/// The default format for files.
pub write_format_default: String,
/// The target file size for files.
pub write_target_file_size_bytes: usize,
}

impl TableProperties {
/// Property key for number of commit retries.
pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
/// Default value for number of commit retries.
pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;

/// Property key for minimum wait time (ms) between retries.
pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
/// Default value for minimum wait time (ms) between retries.
pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;

/// Property key for maximum wait time (ms) between retries.
pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
/// Default value for maximum wait time (ms) between retries.
pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 minute

/// Property key for total maximum retry time (ms).
pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms";
/// Default value for total maximum retry time (ms).
pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes

/// Default file format for data files
pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
/// Default file format for delete files
pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default";
/// Default value for data file format
pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";

/// Target file size for newly written files.
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
/// Default target file size
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB
}

impl TryFrom<&HashMap<String, String>> for TableProperties {
// parse by entry key or use default value
type Error = anyhow::Error;

fn try_from(props: &HashMap<String, String>) -> Result<Self, Self::Error> {
Ok(TableProperties {
commit_num_retries: parse_property(
props,
TableProperties::PROPERTY_COMMIT_NUM_RETRIES,
TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
)?,
commit_min_retry_wait_ms: parse_property(
props,
TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS,
TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
)?,
commit_max_retry_wait_ms: parse_property(
props,
TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS,
TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
)?,
commit_total_retry_timeout_ms: parse_property(
props,
TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS,
TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
)?,
write_format_default: parse_property(
props,
TableProperties::PROPERTY_DEFAULT_FILE_FORMAT,
TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string(),
)?,
write_target_file_size_bytes: parse_property(
props,
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
)?,
})
}
}
69 changes: 14 additions & 55 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@
/// that allows users to apply a transaction action to a `Transaction`.
mod action;

use std::collections::HashMap;

pub use action::*;
mod append;
mod snapshot;
Expand All @@ -69,12 +67,7 @@ use std::time::Duration;
use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};

use crate::error::Result;
use crate::spec::{
PROPERTY_COMMIT_MAX_RETRY_WAIT_MS, PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
PROPERTY_COMMIT_MIN_RETRY_WAIT_MS, PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
PROPERTY_COMMIT_NUM_RETRIES, PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS, PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
};
use crate::spec::TableProperties;
use crate::table::Table;
use crate::transaction::action::BoxedTransactionAction;
use crate::transaction::append::FastAppendAction;
Expand Down Expand Up @@ -170,7 +163,12 @@ impl Transaction {
return Ok(self.table);
}

let backoff = Self::build_backoff(self.table.metadata().properties())?;
let table_props =
TableProperties::try_from(self.table.metadata().properties()).map_err(|e| {
Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e)
})?;

let backoff = Self::build_backoff(table_props)?;
let tx = self;

(|mut tx: Transaction| async {
Expand All @@ -185,53 +183,14 @@ impl Transaction {
.1
}

fn build_backoff(props: &HashMap<String, String>) -> Result<ExponentialBackoff> {
let min_delay = match props.get(PROPERTY_COMMIT_MIN_RETRY_WAIT_MS) {
Some(value_str) => value_str.parse::<u64>().map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
"Invalid value for commit.retry.min-wait-ms",
)
.with_source(e)
})?,
None => PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
};
let max_delay = match props.get(PROPERTY_COMMIT_MAX_RETRY_WAIT_MS) {
Some(value_str) => value_str.parse::<u64>().map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
"Invalid value for commit.retry.max-wait-ms",
)
.with_source(e)
})?,
None => PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
};
let total_delay = match props.get(PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS) {
Some(value_str) => value_str.parse::<u64>().map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
"Invalid value for commit.retry.total-timeout-ms",
)
.with_source(e)
})?,
None => PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
};
let max_times = match props.get(PROPERTY_COMMIT_NUM_RETRIES) {
Some(value_str) => value_str.parse::<usize>().map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
"Invalid value for commit.retry.num-retries",
)
.with_source(e)
})?,
None => PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
};

fn build_backoff(props: TableProperties) -> Result<ExponentialBackoff> {
Ok(ExponentialBuilder::new()
.with_min_delay(Duration::from_millis(min_delay))
.with_max_delay(Duration::from_millis(max_delay))
.with_total_delay(Some(Duration::from_millis(total_delay)))
.with_max_times(max_times)
.with_min_delay(Duration::from_millis(props.commit_min_retry_wait_ms))
.with_max_delay(Duration::from_millis(props.commit_max_retry_wait_ms))
.with_total_delay(Some(Duration::from_millis(
props.commit_total_retry_timeout_ms,
)))
.with_max_times(props.commit_num_retries)
.with_factor(2.0)
.build())
}
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/writer/file_writer/rolling_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::fmt::{Debug, Formatter};
use arrow_array::RecordBatch;

use crate::io::{FileIO, OutputFile};
use crate::spec::{DataFileBuilder, PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, PartitionKey};
use crate::spec::{DataFileBuilder, PartitionKey, TableProperties};
use crate::writer::CurrentFileStatus;
use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator};
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
Expand Down Expand Up @@ -95,7 +95,7 @@ where
) -> Self {
Self {
inner_builder,
target_file_size: PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
target_file_size: TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
file_io,
location_generator,
file_name_generator,
Expand Down
14 changes: 5 additions & 9 deletions crates/integrations/datafusion/src/physical_plan/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@ use datafusion::physical_plan::{
};
use futures::StreamExt;
use iceberg::arrow::{FieldMatchMode, schema_to_arrow_schema};
use iceberg::spec::{
DataFileFormat, PROPERTY_DEFAULT_FILE_FORMAT, PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT,
PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES, PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
serialize_data_file_to_json,
};
use iceberg::spec::{DataFileFormat, TableProperties, serialize_data_file_to_json};
use iceberg::table::Table;
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::ParquetWriterBuilder;
Expand Down Expand Up @@ -226,8 +222,8 @@ impl ExecutionPlan for IcebergWriteExec {
self.table
.metadata()
.properties()
.get(PROPERTY_DEFAULT_FILE_FORMAT)
.unwrap_or(&PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()),
.get(TableProperties::PROPERTY_DEFAULT_FILE_FORMAT)
.unwrap_or(&TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()),
)
.map_err(to_datafusion_error)?;
if file_format != DataFileFormat::Parquet {
Expand All @@ -250,7 +246,7 @@ impl ExecutionPlan for IcebergWriteExec {
.table
.metadata()
.properties()
.get(PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES)
.get(TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES)
{
Some(value_str) => value_str
.parse::<usize>()
Expand All @@ -262,7 +258,7 @@ impl ExecutionPlan for IcebergWriteExec {
.with_source(e)
})
.map_err(to_datafusion_error)?,
None => PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
None => TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
};

let file_io = self.table.file_io().clone();
Expand Down
Loading