diff --git a/crates/iceberg/src/spec/manifest/entry.rs b/crates/iceberg/src/spec/manifest/entry.rs index 7ba9efb3b9..628a8857af 100644 --- a/crates/iceberg/src/spec/manifest/entry.rs +++ b/crates/iceberg/src/spec/manifest/entry.rs @@ -147,6 +147,12 @@ impl ManifestEntry { pub fn data_file(&self) -> &DataFile { &self.data_file } + + /// File sequence number indicating when the file was added. Inherited when null and status is 1 (added). + #[inline] + pub fn file_sequence_number(&self) -> Option { + self.file_sequence_number + } } /// Used to track additions and deletions in ManifestEntry. diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 0278f2abe5..61ee08d820 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -25,11 +25,22 @@ use crate::error::Result; use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation}; use crate::table::Table; use crate::transaction::snapshot::{ - generate_unique_snapshot_id, DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, + generate_unique_snapshot_id, DefaultManifestProcess, MergeManifestProcess, + SnapshotProduceOperation, SnapshotProducer, }; use crate::transaction::{ActionCommit, TransactionAction}; use crate::{Error, ErrorKind}; +/// Target size of manifest file when merging manifests. +pub const MANIFEST_TARGET_SIZE_BYTES: &str = "commit.manifest.target-size-bytes"; +const MANIFEST_TARGET_SIZE_BYTES_DEFAULT: u32 = 8 * 1024 * 1024; // 8 MB +/// Minimum number of manifests to merge. +pub const MANIFEST_MIN_MERGE_COUNT: &str = "commit.manifest.min-count-to-merge"; +const MANIFEST_MIN_MERGE_COUNT_DEFAULT: u32 = 100; +/// Whether allow to merge manifests. +pub const MANIFEST_MERGE_ENABLED: &str = "commit.manifest-merge.enabled"; +const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false; + /// FastAppendAction is a transaction action for fast append data files to the table. pub struct FastAppendAction { check_duplicate: bool, @@ -189,6 +200,141 @@ impl SnapshotProduceOperation for FastAppendOperation { } } +/// MergeAppendAction is a transaction action similar to fast append except that it will merge manifests +/// based on the target size. +pub struct MergeAppendAction { + // snapshot_produce_action: SnapshotProducer<'_>, + target_size_bytes: u32, + min_count_to_merge: u32, + merge_enabled: bool, + + check_duplicate: bool, + // below are properties used to create SnapshotProducer when commit + commit_uuid: Option, + key_metadata: Option>, + snapshot_properties: HashMap, + added_data_files: Vec, + added_delete_files: Vec, + snapshot_id: Option, +} + +impl MergeAppendAction { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new() -> Self { + Self { + target_size_bytes: MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + min_count_to_merge: MANIFEST_MIN_MERGE_COUNT_DEFAULT, + merge_enabled: MANIFEST_MERGE_ENABLED_DEFAULT, + check_duplicate: true, + commit_uuid: None, + key_metadata: None, + snapshot_properties: HashMap::default(), + added_data_files: vec![], + added_delete_files: vec![], + snapshot_id: None, + } + } + + pub fn set_target_size_bytes(mut self, v: u32) -> Self { + self.target_size_bytes = v; + self + } + + pub fn set_min_count_to_merge(mut self, v: u32) -> Self { + self.min_count_to_merge = v; + self + } + + pub fn set_merge_enabled(mut self, v: bool) -> Self { + self.merge_enabled = v; + self + } + + pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap) -> Self { + let target_size_bytes: u32 = snapshot_properties + .get(MANIFEST_TARGET_SIZE_BYTES) + .and_then(|s| s.parse().ok()) + .unwrap_or(MANIFEST_TARGET_SIZE_BYTES_DEFAULT); + let min_count_to_merge: u32 = snapshot_properties + .get(MANIFEST_MIN_MERGE_COUNT) + .and_then(|s| s.parse().ok()) + .unwrap_or(MANIFEST_MIN_MERGE_COUNT_DEFAULT); + let merge_enabled = snapshot_properties + .get(MANIFEST_MERGE_ENABLED) + .and_then(|s| s.parse().ok()) + .unwrap_or(MANIFEST_MERGE_ENABLED_DEFAULT); + + self.snapshot_properties = snapshot_properties; + self.target_size_bytes = target_size_bytes; + self.min_count_to_merge = min_count_to_merge; + self.merge_enabled = merge_enabled; + + self + } + + /// Add data files to the snapshot. + pub fn add_data_files(mut self, data_files: impl IntoIterator) -> Self { + self.added_data_files.extend(data_files); + self + } +} + +#[async_trait] +impl TransactionAction for MergeAppendAction { + async fn commit(self: Arc, table: &Table) -> Result { + let snapshot_id = if let Some(snapshot_id) = self.snapshot_id { + if table + .metadata() + .snapshots() + .any(|s| s.snapshot_id() == snapshot_id) + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Snapshot id {} already exists", snapshot_id), + )); + } + snapshot_id + } else { + generate_unique_snapshot_id(table) + }; + + let snapshot_producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.added_data_files.clone(), + self.added_delete_files.clone(), + snapshot_id, + ); + + // validate added files + snapshot_producer.validate_added_data_files(&self.added_data_files)?; + snapshot_producer.validate_added_data_files(&self.added_delete_files)?; + + // Checks duplicate files + if self.check_duplicate { + snapshot_producer + .validate_duplicate_files(&self.added_data_files) + .await?; + + snapshot_producer + .validate_duplicate_files(&self.added_delete_files) + .await?; + } + + if self.merge_enabled { + let process = + MergeManifestProcess::new(self.target_size_bytes, self.min_count_to_merge); + snapshot_producer.commit(FastAppendOperation, process).await + } else { + snapshot_producer + .commit(FastAppendOperation, DefaultManifestProcess) + .await + } + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 07d864f81b..6f57326319 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -56,7 +56,7 @@ use std::collections::HashMap; pub use action::*; mod append; -pub mod remove_snapshots; +mod remove_snapshots; mod snapshot; mod sort_order; mod update_location; @@ -67,7 +67,9 @@ mod upgrade_format_version; use std::sync::Arc; use std::time::Duration; +pub use append::{MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT, MANIFEST_TARGET_SIZE_BYTES}; use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext}; +use remove_snapshots::RemoveSnapshotAction; use crate::error::Result; use crate::spec::{ @@ -77,9 +79,7 @@ use crate::spec::{ PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS, PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, }; use crate::table::Table; -use crate::transaction::action::BoxedTransactionAction; -use crate::transaction::append::FastAppendAction; -use crate::transaction::remove_snapshots::RemoveSnapshotAction; +use crate::transaction::append::{FastAppendAction, MergeAppendAction}; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; @@ -150,6 +150,11 @@ impl Transaction { FastAppendAction::new() } + /// Creates a merge append action. + pub fn merge_append(&self) -> MergeAppendAction { + MergeAppendAction::new() + } + /// Creates replace sort order action. pub fn replace_sort_order(&self) -> ReplaceSortOrderAction { ReplaceSortOrderAction::new() @@ -169,7 +174,7 @@ impl Transaction { pub fn update_statistics(&self) -> UpdateStatisticsAction { UpdateStatisticsAction::new() } - + /// Commit transaction. pub async fn commit(self, catalog: &dyn Catalog) -> Result { if self.actions.is_empty() { diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 15a21682bd..b02bc70c32 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -15,22 +15,26 @@ // specific language governing permissions and limitations // under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::future::Future; use std::ops::RangeFrom; +use std::pin::Pin; +use async_trait::async_trait; use uuid::Uuid; use crate::error::Result; +use crate::io::FileIO; use crate::spec::{ update_snapshot_summaries, DataContentType, DataFile, DataFileFormat, FormatVersion, - ManifestContentType, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, - ManifestWriterBuilder, Operation, Snapshot, SnapshotReference, SnapshotRetention, - SnapshotSummaryCollector, Struct, StructType, Summary, MAIN_BRANCH, + ManifestContentType, ManifestEntry, ManifestFile, ManifestListWriter, ManifestStatus, + ManifestWriter, ManifestWriterBuilder, Operation, Snapshot, SnapshotReference, + SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary, MAIN_BRANCH, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, }; use crate::table::Table; use crate::transaction::ActionCommit; +use crate::utils::bin::ListPacker; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; @@ -50,22 +54,24 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync { pub(crate) struct DefaultManifestProcess; +#[async_trait] impl ManifestProcess for DefaultManifestProcess { - fn process_manifests( + async fn process_manifests( &self, - _snapshot_produce: &SnapshotProducer<'_>, + _snapshot_produce: &mut SnapshotProducer<'_>, manifests: Vec, - ) -> Vec { - manifests + ) -> Result> { + Ok(manifests) } } +#[async_trait] pub(crate) trait ManifestProcess: Send + Sync { - fn process_manifests( + async fn process_manifests( &self, - snapshot_produce: &SnapshotProducer<'_>, + snapshot_produce: &mut SnapshotProducer<'_>, manifests: Vec, - ) -> Vec; + ) -> Result>; } pub(crate) struct SnapshotProducer<'a> { @@ -312,8 +318,9 @@ impl<'a> SnapshotProducer<'a> { manifest_files.push(added_manifest); } - let manifest_files = manifest_process.process_manifests(self, manifest_files); - Ok(manifest_files) + manifest_process + .process_manifests(self, manifest_files) + .await } // Returns a `Summary` of the current snapshot @@ -475,3 +482,191 @@ pub fn generate_unique_snapshot_id(table: &Table) -> i64 { } snapshot_id } + +pub(crate) struct MergeManifestProcess { + target_size_bytes: u32, + min_count_to_merge: u32, +} + +impl MergeManifestProcess { + pub fn new(target_size_bytes: u32, min_count_to_merge: u32) -> Self { + Self { + target_size_bytes, + min_count_to_merge, + } + } +} + +#[async_trait] +impl ManifestProcess for MergeManifestProcess { + async fn process_manifests( + &self, + snapshot_produce: &mut SnapshotProducer<'_>, + manifests: Vec, + ) -> Result> { + let (unmerge_data_manifest, unmerge_delete_manifest): (Vec<_>, Vec<_>) = manifests + .into_iter() + .partition(|manifest| matches!(manifest.content, ManifestContentType::Data)); + let mut data_manifest = { + let manifest_merge_manager = MergeManifestManager::new( + self.target_size_bytes, + self.min_count_to_merge, + ManifestContentType::Data, + ); + manifest_merge_manager + .merge_manifest(snapshot_produce, unmerge_data_manifest) + .await? + }; + data_manifest.extend(unmerge_delete_manifest); + Ok(data_manifest) + } +} + +struct MergeManifestManager { + target_size_bytes: u32, + min_count_to_merge: u32, + content: ManifestContentType, +} + +impl MergeManifestManager { + pub fn new( + target_size_bytes: u32, + min_count_to_merge: u32, + content: ManifestContentType, + ) -> Self { + Self { + target_size_bytes, + min_count_to_merge, + content, + } + } + + fn group_by_spec(&self, manifests: Vec) -> BTreeMap> { + let mut grouped_manifests = BTreeMap::new(); + for manifest in manifests { + grouped_manifests + .entry(manifest.partition_spec_id) + .or_insert_with(Vec::new) + .push(manifest); + } + grouped_manifests + } + + async fn merge_bin( + &self, + snapshot_id: i64, + file_io: FileIO, + manifest_bin: Vec, + mut writer: ManifestWriter, + ) -> Result { + for manifest_file in manifest_bin { + let manifest_file = manifest_file.load_manifest(&file_io).await?; + for manifest_entry in manifest_file.entries() { + if manifest_entry.status() == ManifestStatus::Deleted + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //only files deleted by this snapshot should be added to the new manifest + writer.add_delete_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() == ManifestStatus::Added + && manifest_entry + .snapshot_id() + .is_some_and(|id| id == snapshot_id) + { + //added entries from this snapshot are still added, otherwise they should be existing + writer.add_entry(manifest_entry.as_ref().clone())?; + } else if manifest_entry.status() != ManifestStatus::Deleted { + // add all non-deleted files from the old manifest as existing files + writer.add_existing_entry(manifest_entry.as_ref().clone())?; + } + } + } + + writer.write_manifest_file().await + } + + async fn merge_group( + &self, + snapshot_produce: &mut SnapshotProducer<'_>, + first_manifest: &ManifestFile, + group_manifests: Vec, + ) -> Result> { + let packer: ListPacker = ListPacker::new(self.target_size_bytes); + let manifest_bins = + packer.pack(group_manifests, |manifest| manifest.manifest_length as u32); + + let manifest_merge_futures = manifest_bins + .into_iter() + .map(|manifest_bin| { + if manifest_bin.len() == 1 { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box>> + Send>, + >) + } + // if the bin has the first manifest (the new data files or an appended manifest file) then only + // merge it if the number of manifests is above the minimum count. this is applied only to bins + // with an in-memory manifest so that large manifests don't prevent merging older groups. + else if manifest_bin + .iter() + .any(|manifest| manifest == first_manifest) + && manifest_bin.len() < self.min_count_to_merge as usize + { + Ok(Box::pin(async { Ok(manifest_bin) }) + as Pin< + Box>> + Send>, + >) + } else { + let writer = snapshot_produce.new_manifest_writer(self.content)?; + let snapshot_id = snapshot_produce.snapshot_id; + let file_io = snapshot_produce.table.file_io().clone(); + Ok((Box::pin(async move { + Ok(vec![ + self.merge_bin( + snapshot_id, + file_io, + manifest_bin, + writer, + ) + .await?, + ]) + })) + as Pin>> + Send>>) + } + }) + .collect::>> + Send>>>>>()?; + + let merged_bins: Vec> = + futures::future::join_all(manifest_merge_futures.into_iter()) + .await + .into_iter() + .collect::>>()?; + + Ok(merged_bins.into_iter().flatten().collect()) + } + + pub(crate) async fn merge_manifest( + &self, + snapshot_produce: &mut SnapshotProducer<'_>, + manifests: Vec, + ) -> Result> { + if manifests.is_empty() { + return Ok(manifests); + } + + let first_manifest = manifests[0].clone(); + + let group_manifests = self.group_by_spec(manifests); + + let mut merge_manifests = vec![]; + for (_spec_id, manifests) in group_manifests.into_iter().rev() { + merge_manifests.extend( + self.merge_group(snapshot_produce, &first_manifest, manifests) + .await?, + ); + } + + Ok(merge_manifests) + } +} diff --git a/crates/iceberg/src/utils.rs b/crates/iceberg/src/utils.rs index 00d3e69bd3..1a60c82a77 100644 --- a/crates/iceberg/src/utils.rs +++ b/crates/iceberg/src/utils.rs @@ -40,3 +40,148 @@ pub(crate) fn available_parallelism() -> NonZeroUsize { NonZeroUsize::new(DEFAULT_PARALLELISM).unwrap() }) } + +pub mod bin { + use std::iter::Iterator; + use std::marker::PhantomData; + + use itertools::Itertools; + + struct Bin { + bin_weight: u32, + target_weight: u32, + items: Vec, + } + + impl Bin { + pub fn new(target_weight: u32) -> Self { + Bin { + bin_weight: 0, + target_weight, + items: Vec::new(), + } + } + + pub fn can_add(&self, weight: u32) -> bool { + self.bin_weight + weight <= self.target_weight + } + + pub fn add(&mut self, item: T, weight: u32) { + self.bin_weight += weight; + self.items.push(item); + } + + pub fn into_vec(self) -> Vec { + self.items + } + } + + /// ListPacker help to pack item into bin of item. Each bin has close to + /// target_weight. + pub(crate) struct ListPacker { + target_weight: u32, + _marker: PhantomData, + } + + impl ListPacker { + pub fn new(target_weight: u32) -> Self { + ListPacker { + target_weight, + _marker: PhantomData, + } + } + + pub fn pack(&self, items: Vec, weight_func: F) -> Vec> + where F: Fn(&T) -> u32 { + let mut bins: Vec> = vec![]; + for item in items { + let cur_weight = weight_func(&item); + let addable_bin = + if let Some(bin) = bins.iter_mut().find(|bin| bin.can_add(cur_weight)) { + bin + } else { + bins.push(Bin::new(self.target_weight)); + bins.last_mut().unwrap() + }; + addable_bin.add(item, cur_weight); + } + + bins.into_iter().map(|bin| bin.into_vec()).collect_vec() + } + } + + #[cfg(test)] + mod tests { + use super::*; + + #[test] + fn test_list_packer_basic_packing() { + let packer = ListPacker::new(10); + let items = vec![3, 4, 5, 6, 2, 1]; + + let packed = packer.pack(items, |&x| x); + + assert_eq!(packed.len(), 3); + assert!(packed[0].iter().sum::() == 10); + assert!(packed[1].iter().sum::() == 5); + assert!(packed[2].iter().sum::() == 6); + } + + #[test] + fn test_list_packer_with_complex_items() { + #[derive(Debug, PartialEq)] + struct Item { + name: String, + size: u32, + } + + let packer = ListPacker::new(15); + let items = vec![ + Item { + name: "A".to_string(), + size: 7, + }, + Item { + name: "B".to_string(), + size: 8, + }, + Item { + name: "C".to_string(), + size: 5, + }, + Item { + name: "D".to_string(), + size: 6, + }, + ]; + + let packed = packer.pack(items, |item| item.size); + + assert_eq!(packed.len(), 2); + assert!(packed[0].iter().map(|x| x.size).sum::() <= 15); + assert!(packed[1].iter().map(|x| x.size).sum::() <= 15); + } + + #[test] + fn test_list_packer_single_large_item() { + let packer = ListPacker::new(10); + let items = vec![15, 5, 3]; + + let packed = packer.pack(items, |&x| x); + + assert_eq!(packed.len(), 2); + assert!(packed[0].contains(&15)); + assert!(packed[1].iter().sum::() <= 10); + } + + #[test] + fn test_list_packer_empty_input() { + let packer = ListPacker::new(10); + let items: Vec = vec![]; + + let packed = packer.pack(items, |&x| x); + + assert_eq!(packed.len(), 0); + } + } +} diff --git a/crates/integration_tests/testdata/docker-compose.yaml b/crates/integration_tests/testdata/docker-compose.yaml index e2f9b7da36..e7d0d15dac 100644 --- a/crates/integration_tests/testdata/docker-compose.yaml +++ b/crates/integration_tests/testdata/docker-compose.yaml @@ -22,6 +22,12 @@ services: rest: image: apache/iceberg-rest-fixture:1.9.2 environment: + - http_proxy= + - https_proxy= + - HTTP_PROXY= + - HTTPS_PROXY= + - NO_PROXY=localhost,127.0.0.1,minio + - no_proxy=localhost,127.0.0.1,minio - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 @@ -42,6 +48,12 @@ services: minio: image: minio/minio:RELEASE.2025-05-24T17-08-30Z environment: + - http_proxy= + - https_proxy= + - HTTP_PROXY= + - HTTPS_PROXY= + - NO_PROXY=localhost,127.0.0.1,minio + - no_proxy=localhost,127.0.0.1,minio - MINIO_ROOT_USER=admin - MINIO_ROOT_PASSWORD=password - MINIO_DOMAIN=minio @@ -61,6 +73,12 @@ services: - minio image: minio/mc:RELEASE.2025-05-21T01-59-54Z environment: + - http_proxy= + - https_proxy= + - HTTP_PROXY= + - HTTPS_PROXY= + - NO_PROXY=localhost,127.0.0.1,minio + - no_proxy=localhost,127.0.0.1,minio - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 @@ -77,6 +95,12 @@ services: - rest - minio environment: + - http_proxy= + - https_proxy= + - HTTP_PROXY= + - HTTPS_PROXY= + - NO_PROXY=localhost,127.0.0.1,minio + - no_proxy=localhost,127.0.0.1,minio - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 diff --git a/crates/integration_tests/tests/shared_tests/merge_append_test.rs b/crates/integration_tests/tests/shared_tests/merge_append_test.rs new file mode 100644 index 0000000000..2141f74d9b --- /dev/null +++ b/crates/integration_tests/tests/shared_tests/merge_append_test.rs @@ -0,0 +1,253 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for rest catalog. + +use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use iceberg::spec::{DataFile, NestedField, PrimitiveType, Schema, Type}; +use iceberg::table::Table; +use iceberg::transaction::{ + ApplyTransactionAction, Transaction, MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT, + MANIFEST_TARGET_SIZE_BYTES, +}; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, TableCreation}; +use iceberg_catalog_rest::RestCatalog; +use parquet::file::properties::WriterProperties; + +use crate::get_shared_containers; + +static FILE_COUNTER: AtomicU64 = AtomicU64::new(0); +use crate::shared_tests::random_ns; + +async fn write_new_data_file(table: &Table) -> Vec { + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + // Use a short unique prefix each time so repeated calls don't produce identical paths + // while keeping file name length small to mimic original manifest sizing for merge logic. + let unique_prefix = format!("test{:04}", FILE_COUNTER.fetch_add(1, Ordering::Relaxed)); + let file_name_generator = + DefaultFileNameGenerator::new(unique_prefix, None, iceberg::spec::DataFileFormat::Parquet); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); + let col1 = StringArray::from(vec![Some("foo"); 100]); + let col2 = Int32Array::from(vec![Some(1); 100]); + let col3 = BooleanArray::from(vec![Some(true); 100]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + data_file_writer.close().await.unwrap() +} + +// Helper: create table with 3 single-entry manifests produced by fast append +async fn create_table_with_three_manifests() -> (RestCatalog, Table) { + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); + let ns = random_ns().await; + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + let mut table = rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + for _ in 0..3 { + let data_file = write_new_data_file(&table).await; + let tx = Transaction::new(&table); + let append_action = tx.fast_append().add_data_files(data_file.clone()); + let tx = append_action.apply(tx).unwrap(); + table = tx.commit(&rest_catalog).await.unwrap(); + } + (rest_catalog, table) +} + +// Scenario 1: Every manifest is packed into its own bin so merge is skipped (covers `manifest_bin.len()==1` branch) +#[tokio::test] +async fn test_merge_append_no_merge_each_single_bin() { + let (rest_catalog, mut table) = create_table_with_three_manifests().await; + let manifest_list_before = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list_before.entries().len(), 3); + + let snapshot_properties = HashMap::from([ + (MANIFEST_MERGE_ENABLED.to_string(), "true".to_string()), + (MANIFEST_MIN_MERGE_COUNT.to_string(), "4".to_string()), // original setting + (MANIFEST_TARGET_SIZE_BYTES.to_string(), "7000".to_string()), // small target -> each manifest isolated + ]); + + let data_file = write_new_data_file(&table).await; + let tx = Transaction::new(&table); + let merge_action = tx + .merge_append() + .set_snapshot_properties(snapshot_properties) + .add_data_files(data_file); + let tx = merge_action.apply(tx).unwrap(); + table = tx.commit(&rest_catalog).await.unwrap(); + + let manifest_list_after = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + // No merge => 3 (existing) + 1 (new) = 4 + assert_eq!(manifest_list_after.entries().len(), 4); + for entry in manifest_list_after.entries() { + let m = entry.load_manifest(table.file_io()).await.unwrap(); + assert_eq!( + m.entries().len(), + 1, + "each manifest should still have exactly one data file" + ); + } +} + +// Scenario 2: All manifests fit into one bin but merge is skipped because bin contains the first manifest and count < min_count_to_merge +#[tokio::test] +async fn test_merge_append_no_merge_min_count_protects() { + let (rest_catalog, mut table) = create_table_with_three_manifests().await; + let manifest_list_before = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list_before.entries().len(), 3); + + let snapshot_properties = HashMap::from([ + (MANIFEST_MERGE_ENABLED.to_string(), "true".to_string()), + (MANIFEST_MIN_MERGE_COUNT.to_string(), "10".to_string()), // larger than manifest count (4) + (MANIFEST_TARGET_SIZE_BYTES.to_string(), "500000".to_string()), // large target -> all in one bin + ]); + + let data_file = write_new_data_file(&table).await; + let tx = Transaction::new(&table); + let merge_action = tx + .merge_append() + .set_snapshot_properties(snapshot_properties) + .add_data_files(data_file); + let tx = merge_action.apply(tx).unwrap(); + table = tx.commit(&rest_catalog).await.unwrap(); + + let manifest_list_after = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + // len(4) < min_count(10) and bin contains first manifest => skip merge + assert_eq!(manifest_list_after.entries().len(), 4); +} + +// Scenario 3: Merge all manifests into a single manifest +#[tokio::test] +async fn test_merge_append_full_merge() { + let (rest_catalog, mut table) = create_table_with_three_manifests().await; + let manifest_list_before = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list_before.entries().len(), 3); + + let snapshot_properties = HashMap::from([ + (MANIFEST_MERGE_ENABLED.to_string(), "true".to_string()), + (MANIFEST_MIN_MERGE_COUNT.to_string(), "2".to_string()), // low threshold allows merge + (MANIFEST_TARGET_SIZE_BYTES.to_string(), "500000".to_string()), // large -> all in one bin + ]); + + let data_file = write_new_data_file(&table).await; + let tx = Transaction::new(&table); + let merge_action = tx + .merge_append() + .set_snapshot_properties(snapshot_properties) + .add_data_files(data_file); + let tx = merge_action.apply(tx).unwrap(); + table = tx.commit(&rest_catalog).await.unwrap(); + + let manifest_list_after = table + .metadata() + .current_snapshot() + .unwrap() + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + + // Expect all 4 single-file manifests merged into 1 manifest + assert_eq!( + manifest_list_after.entries().len(), + 1, + "should merge into a single manifest file" + ); + let merged_manifest = manifest_list_after.entries()[0] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert_eq!( + merged_manifest.entries().len(), + 4, + "merged manifest should contain 4 data file entries" + ); +} diff --git a/crates/integration_tests/tests/shared_tests/mod.rs b/crates/integration_tests/tests/shared_tests/mod.rs index 4bab534ee6..af47517606 100644 --- a/crates/integration_tests/tests/shared_tests/mod.rs +++ b/crates/integration_tests/tests/shared_tests/mod.rs @@ -27,6 +27,7 @@ mod append_data_file_test; mod append_partition_data_file_test; mod conflict_commit_test; mod datafusion; +mod merge_append_test; mod read_evolved_schema; mod read_positional_deletes; mod remove_snapshots_test;