From 451d3b6fef87b9accf4135f327fa3c51887cf93e Mon Sep 17 00:00:00 2001 From: ZENOTME <43447882+ZENOTME@users.noreply.github.com> Date: Tue, 18 Mar 2025 18:14:41 +0800 Subject: [PATCH] feat: add process delete enrty in snapshot produce (#33) * support spec id in data file * support proccess delete entry * fullfill partition spec id * fix * fix spelling mistake Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: ZENOTME Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: xxchan --- .../arrow/record_batch_partition_spliter.rs | 6 +- crates/iceberg/src/spec/manifest/data_file.rs | 4 + crates/iceberg/src/transaction/append.rs | 2 + crates/iceberg/src/transaction/snapshot.rs | 84 ++++++++++++- .../base_writer/equality_delete_writer.rs | 115 ++++++++---------- .../sort_position_delete_writer.rs | 14 ++- .../src/writer/file_writer/parquet_writer.rs | 3 +- .../function_writer/equality_delta_writer.rs | 6 +- .../fanout_partition_writer.rs | 1 + .../shared_tests/remove_snapshots_test.rs | 62 ++++++++-- 10 files changed, 209 insertions(+), 88 deletions(-) diff --git a/crates/iceberg/src/arrow/record_batch_partition_spliter.rs b/crates/iceberg/src/arrow/record_batch_partition_spliter.rs index e9816d89a2..cd91cd21f6 100644 --- a/crates/iceberg/src/arrow/record_batch_partition_spliter.rs +++ b/crates/iceberg/src/arrow/record_batch_partition_spliter.rs @@ -27,7 +27,7 @@ use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use super::record_batch_projector::RecordBatchProjector; use crate::arrow::{arrow_struct_to_literal, type_to_arrow_type}; -use crate::spec::{Literal, PartitionSpecRef, SchemaRef, Struct, StructType, Type}; +use crate::spec::{Literal, PartitionSpec, PartitionSpecRef, SchemaRef, Struct, StructType, Type}; use crate::transform::{create_transform_function, BoxedTransformFunction}; use crate::{Error, ErrorKind, Result}; @@ -186,6 +186,10 @@ impl RecordBatchPartitionSpliter { }) } + pub(crate) fn partition_spec(&self) -> &PartitionSpec { + self.partition_spec.as_ref() + } + /// Split the record batch into multiple record batches by the partition spec. pub(crate) fn split(&self, batch: &RecordBatch) -> Result> { // get array using partition spec diff --git a/crates/iceberg/src/spec/manifest/data_file.rs b/crates/iceberg/src/spec/manifest/data_file.rs index d69f5f1fc1..b9f960f56e 100644 --- a/crates/iceberg/src/spec/manifest/data_file.rs +++ b/crates/iceberg/src/spec/manifest/data_file.rs @@ -284,6 +284,10 @@ impl DataFile { pub(crate) fn rewrite_partition(&mut self, partition: Struct) { self.partition = partition; } + + pub(crate) fn rewrite_partition_id(&mut self, partition_spec_id: i32) { + self.partition_spec_id = partition_spec_id; + } } /// Convert data files to avro bytes and write to writer. diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 61ee08d820..99935c0b8b 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -435,6 +435,7 @@ mod tests { // check add data file with incompatible partition value let data_file = DataFileBuilder::default() + .partition_spec_id(0) .content(DataContentType::Data) .file_path("test/3.parquet".to_string()) .file_format(DataFileFormat::Parquet) @@ -457,6 +458,7 @@ mod tests { let action = tx.fast_append(); let data_file = DataFileBuilder::default() + .partition_spec_id(0) .content(DataContentType::Data) .file_path("test/3.parquet".to_string()) .file_format(DataFileFormat::Parquet) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index b02bc70c32..cb42b1983f 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -41,7 +41,6 @@ const META_ROOT_PATH: &str = "metadata"; pub(crate) trait SnapshotProduceOperation: Send + Sync { fn operation(&self) -> Operation; - #[allow(unused)] fn delete_entries( &self, snapshot_produce: &SnapshotProducer, @@ -168,7 +167,11 @@ impl<'a> SnapshotProducer<'a> { Ok(()) } - fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result { + fn new_manifest_writer( + &mut self, + content: ManifestContentType, + partition_spec_id: i32, + ) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", self.table.metadata().location(), @@ -185,7 +188,14 @@ impl<'a> SnapshotProducer<'a> { self.table.metadata().current_schema().clone(), self.table .metadata() - .default_partition_spec() + .partition_spec_by_id(partition_spec_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Invalid partition spec id for new manifest writer", + ) + .with_context("partition spec id", partition_spec_id.to_string()) + })? .as_ref() .clone(), ); @@ -278,13 +288,72 @@ impl<'a> SnapshotProducer<'a> { builder.build() } }); - let mut writer = self.new_manifest_writer(manifest_content_type)?; + + let mut writer = self.new_manifest_writer( + manifest_content_type, + self.table.metadata().default_partition_spec_id(), + )?; for entry in manifest_entries { writer.add_entry(entry)?; } writer.write_manifest_file().await } + async fn write_delete_manifest( + &mut self, + deleted_entries: Vec, + ) -> Result> { + if deleted_entries.is_empty() { + return Ok(vec![]); + } + + // Group deleted entries by spec_id + let mut partition_groups = HashMap::new(); + for entry in deleted_entries { + partition_groups + .entry(entry.data_file().partition_spec_id) + .or_insert_with(Vec::new) + .push(entry); + } + + // Write a delete manifest per spec_id group + let mut deleted_manifests = Vec::new(); + for (spec_id, entries) in partition_groups { + let mut data_file_writer: Option = None; + let mut delete_file_writer: Option = None; + for entry in entries { + match entry.content_type() { + DataContentType::Data => { + if data_file_writer.is_none() { + data_file_writer = + Some(self.new_manifest_writer(ManifestContentType::Data, spec_id)?); + } + data_file_writer.as_mut().unwrap().add_delete_entry(entry)?; + } + DataContentType::EqualityDeletes | DataContentType::PositionDeletes => { + if delete_file_writer.is_none() { + delete_file_writer = Some( + self.new_manifest_writer(ManifestContentType::Deletes, spec_id)?, + ); + } + delete_file_writer + .as_mut() + .unwrap() + .add_delete_entry(entry)?; + } + } + } + if let Some(writer) = data_file_writer { + deleted_manifests.push(writer.write_manifest_file().await?); + } + if let Some(writer) = delete_file_writer { + deleted_manifests.push(writer.write_manifest_file().await?); + } + } + + Ok(deleted_manifests) + } + async fn manifest_file( &mut self, snapshot_produce_operation: &OP, @@ -318,6 +387,11 @@ impl<'a> SnapshotProducer<'a> { manifest_files.push(added_manifest); } + let delete_manifests = self + .write_delete_manifest(snapshot_produce_operation.delete_entries(self).await?) + .await?; + manifest_files.extend(delete_manifests); + manifest_process .process_manifests(self, manifest_files) .await @@ -618,7 +692,7 @@ impl MergeManifestManager { Box>> + Send>, >) } else { - let writer = snapshot_produce.new_manifest_writer(self.content)?; + let writer = snapshot_produce.new_manifest_writer(self.content, snapshot_produce.table.metadata().default_partition_spec_id())?; let snapshot_id = snapshot_produce.snapshot_id; let file_io = snapshot_produce.table.file_io().clone(); Ok((Box::pin(async move { diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 3a200cf056..922e803dad 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -182,8 +182,8 @@ mod test { use arrow_schema::{DataType, Field, Fields}; use arrow_select::concat::concat_batches; use itertools::Itertools; - use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use parquet::file::properties::WriterProperties; use tempfile::TempDir; use uuid::Uuid; @@ -197,9 +197,9 @@ mod test { use crate::writer::base_writer::equality_delete_writer::{ EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig, }; - use crate::writer::file_writer::ParquetWriterBuilder; - use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::ParquetWriterBuilder; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; async fn check_parquet_data_file_with_equality_delete_write( @@ -296,10 +296,12 @@ mod test { NestedField::required( 1, "col1", - Type::Struct(StructType::new(vec![ - NestedField::required(5, "sub_col", Type::Primitive(PrimitiveType::Int)) - .into(), - ])), + Type::Struct(StructType::new(vec![NestedField::required( + 5, + "sub_col", + Type::Primitive(PrimitiveType::Int), + ) + .into()])), ) .into(), NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(), @@ -315,21 +317,17 @@ mod test { NestedField::required( 4, "col4", - Type::Struct(StructType::new(vec![ - NestedField::required( - 7, - "sub_col", - Type::Struct(StructType::new(vec![ - NestedField::required( - 8, - "sub_sub_col", - Type::Primitive(PrimitiveType::Int), - ) - .into(), - ])), + Type::Struct(StructType::new(vec![NestedField::required( + 7, + "sub_col", + Type::Struct(StructType::new(vec![NestedField::required( + 8, + "sub_sub_col", + Type::Primitive(PrimitiveType::Int), ) - .into(), - ])), + .into()])), + ) + .into()])), ) .into(), ]) @@ -441,27 +439,23 @@ mod test { NestedField::required( 3, "col3", - Type::Struct(StructType::new(vec![ - NestedField::required( - 4, - "sub_col", - Type::Primitive(PrimitiveType::Int), - ) - .into(), - ])), + Type::Struct(StructType::new(vec![NestedField::required( + 4, + "sub_col", + Type::Primitive(PrimitiveType::Int), + ) + .into()])), ) .into(), NestedField::optional( 5, "col4", - Type::Struct(StructType::new(vec![ - NestedField::required( - 6, - "sub_col2", - Type::Primitive(PrimitiveType::Int), - ) - .into(), - ])), + Type::Struct(StructType::new(vec![NestedField::required( + 6, + "sub_col2", + Type::Primitive(PrimitiveType::Int), + ) + .into()])), ) .into(), NestedField::required( @@ -680,30 +674,28 @@ mod test { NestedField::optional( 1, "col1", - Type::Struct(StructType::new(vec![ - NestedField::optional(2, "sub_col", Type::Primitive(PrimitiveType::Int)) - .into(), - ])), + Type::Struct(StructType::new(vec![NestedField::optional( + 2, + "sub_col", + Type::Primitive(PrimitiveType::Int), + ) + .into()])), ) .into(), NestedField::optional( 3, "col2", - Type::Struct(StructType::new(vec![ - NestedField::optional( - 4, - "sub_struct_col", - Type::Struct(StructType::new(vec![ - NestedField::optional( - 5, - "sub_sub_col", - Type::Primitive(PrimitiveType::Int), - ) - .into(), - ])), + Type::Struct(StructType::new(vec![NestedField::optional( + 4, + "sub_struct_col", + Type::Struct(StructType::new(vec![NestedField::optional( + 5, + "sub_sub_col", + Type::Primitive(PrimitiveType::Int), ) - .into(), - ])), + .into()])), + ) + .into()])), ) .into(), ]) @@ -730,14 +722,11 @@ mod test { let inner_col = { let nulls = NullBuffer::from(vec![true, false, true]); Arc::new(StructArray::new( - Fields::from(vec![ - Field::new("sub_sub_col", DataType::Int32, true).with_metadata( - HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "5".to_string(), - )]), - ), - ]), + Fields::from(vec![Field::new("sub_sub_col", DataType::Int32, true) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )]))]), vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), None]))], Some(nulls), )) diff --git a/crates/iceberg/src/writer/base_writer/sort_position_delete_writer.rs b/crates/iceberg/src/writer/base_writer/sort_position_delete_writer.rs index ecdb617bdd..12c8f09a97 100644 --- a/crates/iceberg/src/writer/base_writer/sort_position_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/sort_position_delete_writer.rs @@ -35,15 +35,22 @@ pub struct SortPositionDeleteWriterBuilder { inner: B, cache_num: usize, partition_value: Option, + partition_spec_id: Option, } impl SortPositionDeleteWriterBuilder { /// Create a new `SortPositionDeleteWriterBuilder` using a `FileWriterBuilder`. - pub fn new(inner: B, cache_num: usize, partition_value: Option) -> Self { + pub fn new( + inner: B, + cache_num: usize, + partition_value: Option, + partition_spec_id: Option, + ) -> Self { Self { inner, cache_num, partition_value, + partition_spec_id, } } } @@ -86,6 +93,7 @@ impl IcebergWriterBuilder { cache: BTreeMap>, data_files: Vec, partition_value: Struct, + partition_spec_id: i32, } impl SortPositionDeleteWriter { @@ -140,6 +149,7 @@ impl SortPositionDeleteWriter { .extend(writer.close().await?.into_iter().map(|mut res| { res.content(crate::spec::DataContentType::PositionDeletes); res.partition(self.partition_value.clone()); + res.partition_spec_id(self.partition_spec_id); res.build().expect("Guaranteed to be valid") })); Ok(()) @@ -204,7 +214,7 @@ mod test { location_gen, file_name_gen, ); - let mut position_delete_writer = SortPositionDeleteWriterBuilder::new(pw, 10, None) + let mut position_delete_writer = SortPositionDeleteWriterBuilder::new(pw, 10, None, None) .build() .await?; diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 599fc887d4..f04261ed7a 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -439,6 +439,7 @@ impl ParquetWriter { .file_path(file_path) .file_format(DataFileFormat::Parquet) .partition(Struct::empty()) + .partition_spec_id(0) .record_count(metadata.file_metadata().num_rows() as u64) .file_size_in_bytes(written_size as u64) .column_sizes(column_sizes) @@ -1591,7 +1592,7 @@ mod tests { // .next() // .unwrap() // .content(crate::spec::DataContentType::Data) - // .partition(Struct::empty()) + // .partition(Struct::empty()).partition_spec_id(0) // .build() // .unwrap(); // assert_eq!( diff --git a/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs b/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs index c988e3da62..f28dd44ce9 100644 --- a/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs +++ b/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs @@ -318,7 +318,7 @@ mod test { location_gen.clone(), file_name_gen.clone(), ); - DataFileWriterBuilder::new(pw.clone(), None) + DataFileWriterBuilder::new(pw.clone(), None, None) }; let position_delete_writer_builder = { let pw = ParquetWriterBuilder::new( @@ -328,10 +328,10 @@ mod test { location_gen.clone(), file_name_gen.clone(), ); - SortPositionDeleteWriterBuilder::new(pw.clone(), 100, None) + SortPositionDeleteWriterBuilder::new(pw.clone(), 100, None, None) }; let equality_delete_writer_builder = { - let config = EqualityDeleteWriterConfig::new(vec![1, 2], schema, None)?; + let config = EqualityDeleteWriterConfig::new(vec![1, 2], schema, None, None)?; let pw = ParquetWriterBuilder::new( WriterProperties::builder().build(), arrow_schema_to_schema(config.projected_arrow_schema_ref()) diff --git a/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs b/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs index d3b3d63215..48e37a2c7f 100644 --- a/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs +++ b/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs @@ -134,6 +134,7 @@ impl IcebergWriter for FanoutPartitionWriter { let mut data_files = writer.close().await?; for data_file in data_files.iter_mut() { data_file.rewrite_partition(partition_value.clone()); + data_file.rewrite_partition_id(self.partition_splitter.partition_spec().spec_id()); } result.append(&mut data_files); } diff --git a/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs b/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs index fddb2dc37e..fe5110c9e3 100644 --- a/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs +++ b/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs @@ -29,6 +29,7 @@ use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, TableCreation}; use iceberg_catalog_rest::RestCatalog; +use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::file::properties::WriterProperties; use crate::get_shared_containers; @@ -74,38 +75,73 @@ async fn test_expire_snapshots_by_count() { file_name_generator.clone(), ); let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + let mut data_file_writer = data_file_writer_builder.clone().build().await.unwrap(); + let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + let data_file = data_file_writer.close().await.unwrap(); + + // check parquet file schema + let content = table + .file_io() + .new_input(data_file[0].file_path()) + .unwrap() + .read() + .await + .unwrap(); + let parquet_reader = parquet::arrow::arrow_reader::ArrowReaderMetadata::load( + &content, + ArrowReaderOptions::default(), + ) + .unwrap(); + let field_ids: Vec = parquet_reader + .parquet_schema() + .columns() + .iter() + .map(|col| col.self_type().get_basic_info().id()) + .collect(); + assert_eq!(field_ids, vec![1, 2, 3]); // commit result for i in 0..10 { // Create a new data file writer for each iteration let mut data_file_writer = data_file_writer_builder.clone().build().await.unwrap(); - + // Create different data for each iteration let col1 = StringArray::from(vec![ - Some(format!("foo_{}", i)), - Some(format!("bar_{}", i)), - None, - Some(format!("baz_{}", i)) + Some(format!("foo_{}", i)), + Some(format!("bar_{}", i)), + None, + Some(format!("baz_{}", i)), + ]); + let col2 = Int32Array::from(vec![Some(i), Some(i + 1), Some(i + 2), Some(i + 3)]); + let col3 = BooleanArray::from(vec![ + Some(i % 2 == 0), + Some(i % 2 == 1), + None, + Some(i % 3 == 0), ]); - let col2 = Int32Array::from(vec![Some(i), Some(i+1), Some(i+2), Some(i+3)]); - let col3 = BooleanArray::from(vec![Some(i % 2 == 0), Some(i % 2 == 1), None, Some(i % 3 == 0)]); let batch = RecordBatch::try_new(schema.clone(), vec![ Arc::new(col1) as ArrayRef, Arc::new(col2) as ArrayRef, Arc::new(col3) as ArrayRef, ]) .unwrap(); - + // Write the unique data and get the data file data_file_writer.write(batch.clone()).await.unwrap(); let data_file = data_file_writer.close().await.unwrap(); - + let tx = Transaction::new(&table); let append_action = tx.fast_append(); - let tx = append_action - .add_data_files(data_file) - .apply(tx) - .unwrap(); + let tx = append_action.add_data_files(data_file).apply(tx).unwrap(); table = tx.commit(&rest_catalog).await.unwrap(); }