Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 29 additions & 39 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use tokio::sync::OnceCell;
use typed_builder::TypedBuilder;

use crate::client::{
HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
deserialize_catalog_response, deserialize_unexpected_catalog_error, HttpClient,
};
use crate::types::{
CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest,
Expand Down Expand Up @@ -619,7 +619,7 @@ impl Catalog for RestCatalog {
.config
.unwrap_or_default()
.into_iter()
.chain(self.user_config.props.clone().into_iter())
.chain(self.user_config.props.clone())
.collect();

let file_io = self
Expand Down Expand Up @@ -670,7 +670,7 @@ impl Catalog for RestCatalog {
.config
.unwrap_or_default()
.into_iter()
.chain(self.user_config.props.clone().into_iter())
.chain(self.user_config.props.clone())
.collect();

let file_io = self
Expand Down Expand Up @@ -1600,12 +1600,10 @@ mod tests {

let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());

assert!(
catalog
.namespace_exists(&NamespaceIdent::new("ns1".to_string()))
.await
.unwrap()
);
assert!(catalog
.namespace_exists(&NamespaceIdent::new("ns1".to_string()))
.await
.unwrap());

config_mock.assert_async().await;
get_ns_mock.assert_async().await;
Expand Down Expand Up @@ -1922,15 +1920,13 @@ mod tests {

let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());

assert!(
catalog
.table_exists(&TableIdent::new(
NamespaceIdent::new("ns1".to_string()),
"table1".to_string(),
))
.await
.unwrap()
);
assert!(catalog
.table_exists(&TableIdent::new(
NamespaceIdent::new("ns1".to_string()),
"table1".to_string(),
))
.await
.unwrap());

config_mock.assert_async().await;
check_table_exists_mock.assert_async().await;
Expand Down Expand Up @@ -2147,13 +2143,11 @@ mod tests {
.properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
.partition_spec(
UnboundPartitionSpec::builder()
.add_partition_fields(vec![
UnboundPartitionField::builder()
.source_id(1)
.transform(Transform::Truncate(3))
.name("id".to_string())
.build(),
])
.add_partition_fields(vec![UnboundPartitionField::builder()
.source_id(1)
.transform(Transform::Truncate(3))
.name("id".to_string())
.build()])
.unwrap()
.build(),
)
Expand Down Expand Up @@ -2298,13 +2292,11 @@ mod tests {
.await;

assert!(table_result.is_err());
assert!(
table_result
.err()
.unwrap()
.message()
.contains("already exists")
);
assert!(table_result
.err()
.unwrap()
.message()
.contains("already exists"));

config_mock.assert_async().await;
create_table_mock.assert_async().await;
Expand Down Expand Up @@ -2509,13 +2501,11 @@ mod tests {
.await;

assert!(table_result.is_err());
assert!(
table_result
.err()
.unwrap()
.message()
.contains("does not exist")
);
assert!(table_result
.err()
.unwrap()
.message()
.contains("does not exist"));

config_mock.assert_async().await;
update_table_mock.assert_async().await;
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/spec/snapshot_summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ pub(crate) fn update_snapshot_summaries(
if summary.operation != Operation::Append
&& summary.operation != Operation::Overwrite
&& summary.operation != Operation::Delete
&& summary.operation != Operation::Replace
{
return Err(Error::new(
ErrorKind::DataInvalid,
Expand Down
22 changes: 10 additions & 12 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ use std::sync::Arc;
use async_trait::async_trait;
use uuid::Uuid;

use super::{
MANIFEST_MERGE_ENABLED, MANIFEST_MERGE_ENABLED_DEFAULT, MANIFEST_MIN_MERGE_COUNT,
MANIFEST_MIN_MERGE_COUNT_DEFAULT, MANIFEST_TARGET_SIZE_BYTES,
MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
};
use crate::error::Result;
use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
use crate::table::Table;
Expand All @@ -31,16 +36,6 @@ use crate::transaction::snapshot::{
use crate::transaction::{ActionCommit, TransactionAction};
use crate::{Error, ErrorKind};

/// Target size of manifest file when merging manifests.
pub const MANIFEST_TARGET_SIZE_BYTES: &str = "commit.manifest.target-size-bytes";
const MANIFEST_TARGET_SIZE_BYTES_DEFAULT: u32 = 8 * 1024 * 1024; // 8 MB
/// Minimum number of manifests to merge.
pub const MANIFEST_MIN_MERGE_COUNT: &str = "commit.manifest.min-count-to-merge";
const MANIFEST_MIN_MERGE_COUNT_DEFAULT: u32 = 100;
/// Whether allow to merge manifests.
pub const MANIFEST_MERGE_ENABLED: &str = "commit.manifest-merge.enabled";
const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false;

/// FastAppendAction is a transaction action for fast append data files to the table.
pub struct FastAppendAction {
check_duplicate: bool,
Expand Down Expand Up @@ -138,6 +133,8 @@ impl TransactionAction for FastAppendAction {
self.snapshot_properties.clone(),
self.added_data_files.clone(),
self.added_delete_files.clone(),
vec![],
vec![],
snapshot_id,
);

Expand Down Expand Up @@ -178,7 +175,7 @@ impl SnapshotProduceOperation for FastAppendOperation {

async fn existing_manifest(
&self,
snapshot_produce: &SnapshotProducer<'_>,
snapshot_produce: &mut SnapshotProducer<'_>,
) -> Result<Vec<ManifestFile>> {
let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else {
return Ok(vec![]);
Expand Down Expand Up @@ -219,7 +216,6 @@ pub struct MergeAppendAction {
}

impl MergeAppendAction {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new() -> Self {
Self {
target_size_bytes: MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
Expand Down Expand Up @@ -305,6 +301,8 @@ impl TransactionAction for MergeAppendAction {
self.snapshot_properties.clone(),
self.added_data_files.clone(),
self.added_delete_files.clone(),
vec![],
vec![],
snapshot_id,
);

Expand Down
21 changes: 20 additions & 1 deletion crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use std::collections::HashMap;
pub use action::*;
mod append;
mod remove_snapshots;
mod rewrite_files;
mod snapshot;
mod sort_order;
mod update_location;
Expand All @@ -67,9 +68,9 @@ mod upgrade_format_version;
use std::sync::Arc;
use std::time::Duration;

pub use append::{MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT, MANIFEST_TARGET_SIZE_BYTES};
use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
use remove_snapshots::RemoveSnapshotAction;
use rewrite_files::RewriteFilesAction;

use crate::error::Result;
use crate::spec::{
Expand All @@ -87,6 +88,19 @@ use crate::transaction::update_statistics::UpdateStatisticsAction;
use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};

/// Target size of manifest file when merging manifests.
pub const MANIFEST_TARGET_SIZE_BYTES: &str = "commit.manifest.target-size-bytes";
/// This is the default value for `MANIFEST_TARGET_SIZE_BYTES`.
pub const MANIFEST_TARGET_SIZE_BYTES_DEFAULT: u32 = 8 * 1024 * 1024; // 8 MB
/// Minimum number of manifests to merge.
pub const MANIFEST_MIN_MERGE_COUNT: &str = "commit.manifest.min-count-to-merge";
/// This is the default value for `MANIFEST_MIN_MERGE_COUNT`.
pub const MANIFEST_MIN_MERGE_COUNT_DEFAULT: u32 = 100;
/// Whether allow to merge manifests.
pub const MANIFEST_MERGE_ENABLED: &str = "commit.manifest-merge.enabled";
/// This is the default value for `MANIFEST_MERGE_ENABLED`.
pub const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false;

/// Table transaction.
#[derive(Clone)]
pub struct Transaction {
Expand Down Expand Up @@ -175,6 +189,11 @@ impl Transaction {
UpdateStatisticsAction::new()
}

/// Creates rewrite files action.
pub fn rewrite_files(self) -> RewriteFilesAction {
RewriteFilesAction::new()
}

/// Commit transaction.
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
if self.actions.is_empty() {
Expand Down
Loading
Loading