From 16ce618e78af304f4d301a51bb101a0f775ec0b8 Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 7 May 2025 15:29:38 +0800 Subject: [PATCH] feat(iceberg): rewrite files action (#47) * feat(iceberg): introduce rewrite files action * fix(iceberg): add test * fix test --- crates/catalog/rest/src/catalog.rs | 68 ++-- crates/iceberg/src/spec/snapshot_summary.rs | 1 + crates/iceberg/src/transaction/append.rs | 22 +- crates/iceberg/src/transaction/mod.rs | 21 +- .../iceberg/src/transaction/rewrite_files.rs | 339 ++++++++++++++++++ crates/iceberg/src/transaction/snapshot.rs | 27 +- .../tests/shared_tests/mod.rs | 1 + .../tests/shared_tests/rewrite_files_test.rs | 334 +++++++++++++++++ 8 files changed, 758 insertions(+), 55 deletions(-) create mode 100644 crates/iceberg/src/transaction/rewrite_files.rs create mode 100644 crates/integration_tests/tests/shared_tests/rewrite_files_test.rs diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 7d81982f55..b51884eec9 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -37,7 +37,7 @@ use tokio::sync::OnceCell; use typed_builder::TypedBuilder; use crate::client::{ - HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error, + deserialize_catalog_response, deserialize_unexpected_catalog_error, HttpClient, }; use crate::types::{ CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest, @@ -619,7 +619,7 @@ impl Catalog for RestCatalog { .config .unwrap_or_default() .into_iter() - .chain(self.user_config.props.clone().into_iter()) + .chain(self.user_config.props.clone()) .collect(); let file_io = self @@ -670,7 +670,7 @@ impl Catalog for RestCatalog { .config .unwrap_or_default() .into_iter() - .chain(self.user_config.props.clone().into_iter()) + .chain(self.user_config.props.clone()) .collect(); let file_io = self @@ -1600,12 +1600,10 @@ mod tests { let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()); - assert!( - catalog - .namespace_exists(&NamespaceIdent::new("ns1".to_string())) - .await - .unwrap() - ); + assert!(catalog + .namespace_exists(&NamespaceIdent::new("ns1".to_string())) + .await + .unwrap()); config_mock.assert_async().await; get_ns_mock.assert_async().await; @@ -1922,15 +1920,13 @@ mod tests { let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()); - assert!( - catalog - .table_exists(&TableIdent::new( - NamespaceIdent::new("ns1".to_string()), - "table1".to_string(), - )) - .await - .unwrap() - ); + assert!(catalog + .table_exists(&TableIdent::new( + NamespaceIdent::new("ns1".to_string()), + "table1".to_string(), + )) + .await + .unwrap()); config_mock.assert_async().await; check_table_exists_mock.assert_async().await; @@ -2147,13 +2143,11 @@ mod tests { .properties(HashMap::from([("owner".to_string(), "testx".to_string())])) .partition_spec( UnboundPartitionSpec::builder() - .add_partition_fields(vec![ - UnboundPartitionField::builder() - .source_id(1) - .transform(Transform::Truncate(3)) - .name("id".to_string()) - .build(), - ]) + .add_partition_fields(vec![UnboundPartitionField::builder() + .source_id(1) + .transform(Transform::Truncate(3)) + .name("id".to_string()) + .build()]) .unwrap() .build(), ) @@ -2298,13 +2292,11 @@ mod tests { .await; assert!(table_result.is_err()); - assert!( - table_result - .err() - .unwrap() - .message() - .contains("already exists") - ); + assert!(table_result + .err() + .unwrap() + .message() + .contains("already exists")); config_mock.assert_async().await; create_table_mock.assert_async().await; @@ -2509,13 +2501,11 @@ mod tests { .await; assert!(table_result.is_err()); - assert!( - table_result - .err() - .unwrap() - .message() - .contains("does not exist") - ); + assert!(table_result + .err() + .unwrap() + .message() + .contains("does not exist")); config_mock.assert_async().await; update_table_mock.assert_async().await; diff --git a/crates/iceberg/src/spec/snapshot_summary.rs b/crates/iceberg/src/spec/snapshot_summary.rs index 4bf68adbdc..84ba17b4ab 100644 --- a/crates/iceberg/src/spec/snapshot_summary.rs +++ b/crates/iceberg/src/spec/snapshot_summary.rs @@ -339,6 +339,7 @@ pub(crate) fn update_snapshot_summaries( if summary.operation != Operation::Append && summary.operation != Operation::Overwrite && summary.operation != Operation::Delete + && summary.operation != Operation::Replace { return Err(Error::new( ErrorKind::DataInvalid, diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 99935c0b8b..c26c7b133d 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -21,6 +21,11 @@ use std::sync::Arc; use async_trait::async_trait; use uuid::Uuid; +use super::{ + MANIFEST_MERGE_ENABLED, MANIFEST_MERGE_ENABLED_DEFAULT, MANIFEST_MIN_MERGE_COUNT, + MANIFEST_MIN_MERGE_COUNT_DEFAULT, MANIFEST_TARGET_SIZE_BYTES, + MANIFEST_TARGET_SIZE_BYTES_DEFAULT, +}; use crate::error::Result; use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation}; use crate::table::Table; @@ -31,16 +36,6 @@ use crate::transaction::snapshot::{ use crate::transaction::{ActionCommit, TransactionAction}; use crate::{Error, ErrorKind}; -/// Target size of manifest file when merging manifests. -pub const MANIFEST_TARGET_SIZE_BYTES: &str = "commit.manifest.target-size-bytes"; -const MANIFEST_TARGET_SIZE_BYTES_DEFAULT: u32 = 8 * 1024 * 1024; // 8 MB -/// Minimum number of manifests to merge. -pub const MANIFEST_MIN_MERGE_COUNT: &str = "commit.manifest.min-count-to-merge"; -const MANIFEST_MIN_MERGE_COUNT_DEFAULT: u32 = 100; -/// Whether allow to merge manifests. -pub const MANIFEST_MERGE_ENABLED: &str = "commit.manifest-merge.enabled"; -const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false; - /// FastAppendAction is a transaction action for fast append data files to the table. pub struct FastAppendAction { check_duplicate: bool, @@ -138,6 +133,8 @@ impl TransactionAction for FastAppendAction { self.snapshot_properties.clone(), self.added_data_files.clone(), self.added_delete_files.clone(), + vec![], + vec![], snapshot_id, ); @@ -178,7 +175,7 @@ impl SnapshotProduceOperation for FastAppendOperation { async fn existing_manifest( &self, - snapshot_produce: &SnapshotProducer<'_>, + snapshot_produce: &mut SnapshotProducer<'_>, ) -> Result> { let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else { return Ok(vec![]); @@ -219,7 +216,6 @@ pub struct MergeAppendAction { } impl MergeAppendAction { - #[allow(clippy::too_many_arguments)] pub(crate) fn new() -> Self { Self { target_size_bytes: MANIFEST_TARGET_SIZE_BYTES_DEFAULT, @@ -305,6 +301,8 @@ impl TransactionAction for MergeAppendAction { self.snapshot_properties.clone(), self.added_data_files.clone(), self.added_delete_files.clone(), + vec![], + vec![], snapshot_id, ); diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 6f57326319..206c613e28 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -57,6 +57,7 @@ use std::collections::HashMap; pub use action::*; mod append; mod remove_snapshots; +mod rewrite_files; mod snapshot; mod sort_order; mod update_location; @@ -67,9 +68,9 @@ mod upgrade_format_version; use std::sync::Arc; use std::time::Duration; -pub use append::{MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT, MANIFEST_TARGET_SIZE_BYTES}; use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext}; use remove_snapshots::RemoveSnapshotAction; +use rewrite_files::RewriteFilesAction; use crate::error::Result; use crate::spec::{ @@ -87,6 +88,19 @@ use crate::transaction::update_statistics::UpdateStatisticsAction; use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; +/// Target size of manifest file when merging manifests. +pub const MANIFEST_TARGET_SIZE_BYTES: &str = "commit.manifest.target-size-bytes"; +/// This is the default value for `MANIFEST_TARGET_SIZE_BYTES`. +pub const MANIFEST_TARGET_SIZE_BYTES_DEFAULT: u32 = 8 * 1024 * 1024; // 8 MB +/// Minimum number of manifests to merge. +pub const MANIFEST_MIN_MERGE_COUNT: &str = "commit.manifest.min-count-to-merge"; +/// This is the default value for `MANIFEST_MIN_MERGE_COUNT`. +pub const MANIFEST_MIN_MERGE_COUNT_DEFAULT: u32 = 100; +/// Whether allow to merge manifests. +pub const MANIFEST_MERGE_ENABLED: &str = "commit.manifest-merge.enabled"; +/// This is the default value for `MANIFEST_MERGE_ENABLED`. +pub const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false; + /// Table transaction. #[derive(Clone)] pub struct Transaction { @@ -175,6 +189,11 @@ impl Transaction { UpdateStatisticsAction::new() } + /// Creates rewrite files action. + pub fn rewrite_files(self) -> RewriteFilesAction { + RewriteFilesAction::new() + } + /// Commit transaction. pub async fn commit(self, catalog: &dyn Catalog) -> Result { if self.actions.is_empty() { diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs new file mode 100644 index 0000000000..4b5e880fc4 --- /dev/null +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -0,0 +1,339 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use uuid::Uuid; + +use super::snapshot::{ + DefaultManifestProcess, MergeManifestProcess, SnapshotProduceOperation, SnapshotProducer, +}; +use super::{ + MANIFEST_MERGE_ENABLED, MANIFEST_MERGE_ENABLED_DEFAULT, MANIFEST_MIN_MERGE_COUNT, + MANIFEST_MIN_MERGE_COUNT_DEFAULT, MANIFEST_TARGET_SIZE_BYTES, + MANIFEST_TARGET_SIZE_BYTES_DEFAULT, +}; +use crate::error::Result; +use crate::spec::{ + DataContentType, DataFile, ManifestContentType, ManifestEntry, ManifestFile, ManifestStatus, + Operation, +}; +use crate::table::Table; +use crate::transaction::snapshot::generate_unique_snapshot_id; +use crate::transaction::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind}; + +/// Transaction action for rewriting files. +pub struct RewriteFilesAction { + // snapshot_produce_action: SnapshotProduceAction<'a>, + target_size_bytes: u32, + min_count_to_merge: u32, + merge_enabled: bool, + + check_duplicate: bool, + // below are properties used to create SnapshotProducer when commit + commit_uuid: Option, + key_metadata: Option>, + snapshot_properties: HashMap, + added_data_files: Vec, + added_delete_files: Vec, + removed_data_files: Vec, + removed_delete_files: Vec, + snapshot_id: Option, +} + +struct RewriteFilesOperation; + +impl RewriteFilesAction { + pub fn new() -> Self { + Self { + target_size_bytes: MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + min_count_to_merge: MANIFEST_MIN_MERGE_COUNT_DEFAULT, + merge_enabled: MANIFEST_MERGE_ENABLED_DEFAULT, + check_duplicate: true, + commit_uuid: None, + key_metadata: None, + snapshot_properties: HashMap::new(), + added_data_files: Vec::new(), + added_delete_files: Vec::new(), + removed_data_files: Vec::new(), + removed_delete_files: Vec::new(), + snapshot_id: None, + } + } + + /// Add data files to the snapshot. + pub fn add_data_files(mut self, data_files: impl IntoIterator) -> Self { + for file in data_files { + match file.content_type() { + DataContentType::Data => self.added_data_files.push(file), + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + self.added_delete_files.push(file) + } + } + } + + self + } + + /// Add remove files to the snapshot. + pub fn delete_files(mut self, remove_data_files: impl IntoIterator) -> Self { + for file in remove_data_files { + match file.content_type() { + DataContentType::Data => self.removed_data_files.push(file), + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + self.removed_delete_files.push(file) + } + } + } + + self + } + + pub fn set_snapshot_properties(&mut self, properties: HashMap) -> &mut Self { + let target_size_bytes: u32 = properties + .get(MANIFEST_TARGET_SIZE_BYTES) + .and_then(|s| s.parse().ok()) + .unwrap_or(MANIFEST_TARGET_SIZE_BYTES_DEFAULT); + let min_count_to_merge: u32 = properties + .get(MANIFEST_MIN_MERGE_COUNT) + .and_then(|s| s.parse().ok()) + .unwrap_or(MANIFEST_MIN_MERGE_COUNT_DEFAULT); + let merge_enabled = properties + .get(MANIFEST_MERGE_ENABLED) + .and_then(|s| s.parse().ok()) + .unwrap_or(MANIFEST_MERGE_ENABLED_DEFAULT); + + self.target_size_bytes = target_size_bytes; + self.min_count_to_merge = min_count_to_merge; + self.merge_enabled = merge_enabled; + self.snapshot_properties = properties; + self + } + + /// Set commit UUID for the snapshot. + pub fn set_commit_uuid(&mut self, commit_uuid: Uuid) -> &mut Self { + self.commit_uuid = Some(commit_uuid); + self + } + + /// Set key metadata for manifest files. + pub fn set_key_metadata(mut self, key_metadata: Vec) -> Self { + self.key_metadata = Some(key_metadata); + self + } + + /// Set snapshot id + pub fn set_snapshot_id(mut self, snapshot_id: i64) -> Self { + self.snapshot_id = Some(snapshot_id); + self + } +} + +impl SnapshotProduceOperation for RewriteFilesOperation { + fn operation(&self) -> Operation { + Operation::Replace + } + + async fn delete_entries( + &self, + snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + // generate delete manifest entries from removed files + let snapshot = snapshot_produce.table.metadata().current_snapshot(); + + if let Some(snapshot) = snapshot { + let gen_manifest_entry = |old_entry: &Arc| { + let builder = ManifestEntry::builder() + .status(ManifestStatus::Deleted) + .snapshot_id(old_entry.snapshot_id().unwrap()) + .sequence_number(old_entry.sequence_number().unwrap()) + .file_sequence_number(old_entry.file_sequence_number().unwrap()) + .data_file(old_entry.data_file().clone()); + + builder.build() + }; + + let manifest_list = snapshot + .load_manifest_list( + snapshot_produce.table.file_io(), + snapshot_produce.table.metadata(), + ) + .await?; + + let mut deleted_entries = Vec::new(); + + for manifest_file in manifest_list.entries() { + let manifest = manifest_file + .load_manifest(snapshot_produce.table.file_io()) + .await?; + + for entry in manifest.entries() { + if entry.content_type() == DataContentType::Data + && snapshot_produce + .removed_data_file_paths + .contains(entry.data_file().file_path()) + { + deleted_entries.push(gen_manifest_entry(entry)); + } + + if entry.content_type() == DataContentType::PositionDeletes + || entry.content_type() == DataContentType::EqualityDeletes + && snapshot_produce + .removed_delete_file_paths + .contains(entry.data_file().file_path()) + { + deleted_entries.push(gen_manifest_entry(entry)); + } + } + } + + Ok(deleted_entries) + } else { + Ok(vec![]) + } + } + + async fn existing_manifest( + &self, + snapshot_produce: &mut SnapshotProducer<'_>, + ) -> Result> { + let table_metadata_ref = snapshot_produce.table.metadata(); + let file_io_ref = snapshot_produce.table.file_io(); + + let Some(snapshot) = table_metadata_ref.current_snapshot() else { + return Ok(vec![]); + }; + + let manifest_list = snapshot + .load_manifest_list(file_io_ref, table_metadata_ref) + .await?; + + let mut existing_files = Vec::new(); + + for manifest_file in manifest_list.entries() { + let manifest = manifest_file.load_manifest(file_io_ref).await?; + + let found_deleted_files: HashSet<_> = manifest + .entries() + .iter() + .filter_map(|entry| { + if snapshot_produce + .removed_data_file_paths + .contains(entry.data_file().file_path()) + || snapshot_produce + .removed_delete_file_paths + .contains(entry.data_file().file_path()) + { + Some(entry.data_file().file_path().to_string()) + } else { + None + } + }) + .collect(); + + if found_deleted_files.is_empty() { + existing_files.push(manifest_file.clone()); + } else { + // Rewrite the manifest file without the deleted data files + if manifest + .entries() + .iter() + .any(|entry| !found_deleted_files.contains(entry.data_file().file_path())) + { + let mut manifest_writer = snapshot_produce.new_manifest_writer( + ManifestContentType::Data, + table_metadata_ref.default_partition_spec_id(), + )?; + + for entry in manifest.entries() { + if !found_deleted_files.contains(entry.data_file().file_path()) { + manifest_writer.add_entry((**entry).clone())?; + } + } + + existing_files.push(manifest_writer.write_manifest_file().await?); + } + } + } + + Ok(existing_files) + } +} + +#[async_trait::async_trait] +impl TransactionAction for RewriteFilesAction { + async fn commit(self: Arc, table: &Table) -> Result { + let snapshot_id = if let Some(snapshot_id) = self.snapshot_id { + if table + .metadata() + .snapshots() + .any(|s| s.snapshot_id() == snapshot_id) + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Snapshot id {} already exists", snapshot_id), + )); + } + snapshot_id + } else { + generate_unique_snapshot_id(table) + }; + + let snapshot_producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.added_data_files.clone(), + self.added_delete_files.clone(), + self.removed_data_files.clone(), + self.removed_delete_files.clone(), + snapshot_id, + ); + + // Checks duplicate files + if self.check_duplicate { + snapshot_producer + .validate_duplicate_files(&self.added_data_files) + .await?; + + snapshot_producer + .validate_duplicate_files(&self.added_delete_files) + .await?; + } + + if self.merge_enabled { + let process = + MergeManifestProcess::new(self.target_size_bytes, self.min_count_to_merge); + snapshot_producer + .commit(RewriteFilesOperation, process) + .await + } else { + snapshot_producer + .commit(RewriteFilesOperation, DefaultManifestProcess) + .await + } + } +} + +impl Default for RewriteFilesAction { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index cb42b1983f..bfc2e665bb 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -45,9 +45,10 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync { &self, snapshot_produce: &SnapshotProducer, ) -> impl Future>> + Send; + fn existing_manifest( &self, - snapshot_produce: &SnapshotProducer<'_>, + snapshot_produce: &mut SnapshotProducer<'_>, ) -> impl Future>> + Send; } @@ -80,7 +81,12 @@ pub(crate) struct SnapshotProducer<'a> { key_metadata: Option>, snapshot_properties: HashMap, pub added_data_files: Vec, - added_delete_files: Vec, + pub added_delete_files: Vec, + + // for filtering out files that are removed by action + pub removed_data_file_paths: HashSet, + pub removed_delete_file_paths: HashSet, + // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). @@ -88,6 +94,7 @@ pub(crate) struct SnapshotProducer<'a> { } impl<'a> SnapshotProducer<'a> { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( table: &'a Table, commit_uuid: Uuid, @@ -95,8 +102,20 @@ impl<'a> SnapshotProducer<'a> { snapshot_properties: HashMap, added_data_files: Vec, added_delete_files: Vec, + removed_data_file_paths: Vec, + removed_delete_file_paths: Vec, snapshot_id: i64, ) -> Self { + let removed_data_file_paths = removed_data_file_paths + .into_iter() + .map(|df| df.file_path) + .collect(); + + let removed_delete_file_paths = removed_delete_file_paths + .into_iter() + .map(|df| df.file_path) + .collect(); + Self { table, snapshot_id, @@ -105,6 +124,8 @@ impl<'a> SnapshotProducer<'a> { snapshot_properties, added_data_files, added_delete_files, + removed_data_file_paths, + removed_delete_file_paths, manifest_counter: (0..), } } @@ -167,7 +188,7 @@ impl<'a> SnapshotProducer<'a> { Ok(()) } - fn new_manifest_writer( + pub(crate) fn new_manifest_writer( &mut self, content: ManifestContentType, partition_spec_id: i32, diff --git a/crates/integration_tests/tests/shared_tests/mod.rs b/crates/integration_tests/tests/shared_tests/mod.rs index af47517606..74815ba630 100644 --- a/crates/integration_tests/tests/shared_tests/mod.rs +++ b/crates/integration_tests/tests/shared_tests/mod.rs @@ -31,6 +31,7 @@ mod merge_append_test; mod read_evolved_schema; mod read_positional_deletes; mod remove_snapshots_test; +mod rewrite_files_test; mod scan_all_type; pub async fn random_ns() -> Namespace { diff --git a/crates/integration_tests/tests/shared_tests/rewrite_files_test.rs b/crates/integration_tests/tests/shared_tests/rewrite_files_test.rs new file mode 100644 index 0000000000..7572dfe335 --- /dev/null +++ b/crates/integration_tests/tests/shared_tests/rewrite_files_test.rs @@ -0,0 +1,334 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for rest catalog. + +use std::sync::Arc; + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use futures::TryStreamExt; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, TableCreation}; +use iceberg_catalog_rest::RestCatalog; +use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::file::properties::WriterProperties; + +use crate::get_shared_containers; +use crate::shared_tests::{random_ns, test_schema}; + +#[tokio::test] +async fn test_rewrite_data_files() { + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); + let ns = random_ns().await; + let schema = test_schema(); + + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + + let table = rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Create the writer and write the data + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); + let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + let data_file = data_file_writer.close().await.unwrap(); + + // check parquet file schema + let content = table + .file_io() + .new_input(data_file[0].file_path()) + .unwrap() + .read() + .await + .unwrap(); + let parquet_reader = parquet::arrow::arrow_reader::ArrowReaderMetadata::load( + &content, + ArrowReaderOptions::default(), + ) + .unwrap(); + let field_ids: Vec = parquet_reader + .parquet_schema() + .columns() + .iter() + .map(|col| col.self_type().get_basic_info().id()) + .collect(); + assert_eq!(field_ids, vec![1, 2, 3]); + + // commit result + let tx = Transaction::new(&table); + // First append + let append_action = tx.fast_append().add_data_files(data_file.clone()); + let tx = append_action.apply(tx).unwrap(); + let table = tx.commit(&rest_catalog).await.unwrap(); + + // check result + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0], batch); + + // commit result again + // Second append of the SAME data file: disable duplicate check to preserve original test logic + let tx = Transaction::new(&table); + let append_action = tx + .fast_append() + .with_check_duplicate(false) + .add_data_files(data_file.clone()); + let tx = append_action.apply(tx).unwrap(); + let table = tx.commit(&rest_catalog).await.unwrap(); + + // check result again + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 2); + assert_eq!(batches[0], batch); + assert_eq!(batches[1], batch); + + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + let data_file_rewrite = data_file_writer.close().await.unwrap(); + + // commit result again + let tx = Transaction::new(&table); + // Clone tx so we can consume one for building the action (rewrite_files takes self) + let rewrite_action = tx + .clone() + .rewrite_files() + .add_data_files(data_file_rewrite.clone()) + .delete_files(data_file.clone()); + let tx = rewrite_action.apply(tx).unwrap(); + let table = tx.commit(&rest_catalog).await.unwrap(); + + // check result + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0], batch); +} + + +#[tokio::test] +async fn test_multiple_file_rewrite() { + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); + let ns = random_ns().await; + let schema = test_schema(); + + let table_creation = TableCreation::builder() + .name("t3".to_string()) + .schema(schema.clone()) + .build(); + + let table = rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + let mut data_file_writer = data_file_writer_builder.clone().build().await.unwrap(); + let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + let data_file1 = data_file_writer.close().await.unwrap(); + + let mut data_file_writer = data_file_writer_builder.clone().build().await.unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + let data_file2 = data_file_writer.close().await.unwrap(); + + let tx = Transaction::new(&table); + let rewrite_action = tx + .clone() + .rewrite_files() + .add_data_files(data_file1.clone()) + .add_data_files(data_file2.clone()); + let tx = rewrite_action.apply(tx).unwrap(); + let table = tx.commit(&rest_catalog).await.unwrap(); + + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 2); + assert_eq!(batches[0], batch); + assert_eq!(batches[1], batch); +} + +#[tokio::test] +async fn test_rewrite_nonexistent_file() { + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); + let ns = random_ns().await; + let schema = test_schema(); + + let table_creation = TableCreation::builder() + .name("t4".to_string()) + .schema(schema.clone()) + .build(); + + let table = rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + + // Create a valid data file + let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); + let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + let valid_data_file = data_file_writer.close().await.unwrap(); + + // Create a nonexistent data file (simulated by not writing it) + let nonexistent_data_file = valid_data_file.clone(); + + // Build rewrite action deleting a nonexistent file; we only ensure builder compiles and does not panic + let _unused_action = Transaction::new(&table).rewrite_files().delete_files(nonexistent_data_file); + // No commit since removing a nonexistent file would not create a valid snapshot under new semantics +}