diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 77f29b7f1..078635c9b 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -224,16 +224,22 @@ impl CachingDeleteFileLoader { let (sender, receiver) = channel(); del_filter.insert_equality_delete(&task.file_path, receiver); + // Per the Iceberg spec, evolve schema for equality deletes but only for the + // equality_ids columns, not all table columns. + let equality_ids_vec = task.equality_ids.clone().unwrap(); + let evolved_stream = BasicDeleteFileLoader::evolve_schema( + basic_delete_file_loader + .parquet_to_batch_stream(&task.file_path) + .await?, + schema, + &equality_ids_vec, + ) + .await?; + Ok(DeleteFileContext::FreshEqDel { - batch_stream: BasicDeleteFileLoader::evolve_schema( - basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path) - .await?, - schema, - ) - .await?, + batch_stream: evolved_stream, sender, - equality_ids: HashSet::from_iter(task.equality_ids.clone().unwrap()), + equality_ids: HashSet::from_iter(equality_ids_vec), }) } @@ -536,6 +542,7 @@ mod tests { use std::fs::File; use std::sync::Arc; + use arrow_array::cast::AsArray; use arrow_array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, StructArray}; use arrow_schema::{DataType, Field, Fields}; use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; @@ -686,4 +693,99 @@ mod tests { let result = delete_filter.get_delete_vector(&file_scan_tasks[1]); assert!(result.is_none()); // no pos dels for file 3 } + + /// Verifies that evolve_schema on partial-schema equality deletes works correctly + /// when only equality_ids columns are evolved, not all table columns. + /// + /// Per the [Iceberg spec](https://iceberg.apache.org/spec/#equality-delete-files), + /// equality delete files can contain only a subset of columns. + #[tokio::test] + async fn test_partial_schema_equality_deletes_evolve_succeeds() { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().as_os_str().to_str().unwrap(); + + // Create table schema with REQUIRED fields + let table_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + crate::spec::NestedField::required( + 1, + "id", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into(), + crate::spec::NestedField::required( + 2, + "data", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + // Write equality delete file with PARTIAL schema (only 'data' column) + let delete_file_path = { + let data_vals = vec!["a", "d", "g"]; + let data_col = Arc::new(StringArray::from(data_vals)) as ArrayRef; + + let delete_schema = Arc::new(arrow_schema::Schema::new(vec![simple_field( + "data", + DataType::Utf8, + false, + "2", // field ID + )])); + + let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![data_col]).unwrap(); + + let path = format!("{}/partial-eq-deletes.parquet", &table_location); + let file = File::create(&path).unwrap(); + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let mut writer = + ArrowWriter::try_new(file, delete_batch.schema(), Some(props)).unwrap(); + writer.write(&delete_batch).expect("Writing batch"); + writer.close().unwrap(); + path + }; + + let file_io = FileIO::from_path(table_location).unwrap().build().unwrap(); + let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); + + let batch_stream = basic_delete_file_loader + .parquet_to_batch_stream(&delete_file_path) + .await + .unwrap(); + + // Only evolve the equality_ids columns (field 2), not all table columns + let equality_ids = vec![2]; + let evolved_stream = + BasicDeleteFileLoader::evolve_schema(batch_stream, table_schema, &equality_ids) + .await + .unwrap(); + + let result = evolved_stream.try_collect::>().await; + + assert!( + result.is_ok(), + "Expected success when evolving only equality_ids columns, got error: {:?}", + result.err() + ); + + let batches = result.unwrap(); + assert_eq!(batches.len(), 1); + + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 1); // Only 'data' column + + // Verify the actual values are preserved after schema evolution + let data_col = batch.column(0).as_string::(); + assert_eq!(data_col.value(0), "a"); + assert_eq!(data_col.value(1), "d"); + assert_eq!(data_col.value(2), "g"); + } } diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index f2c63cc59..c0b1392dc 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -72,20 +72,17 @@ impl BasicDeleteFileLoader { Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } - /// Evolves the schema of the RecordBatches from an equality delete file + /// Evolves the schema of the RecordBatches from an equality delete file. + /// + /// Per the [Iceberg spec](https://iceberg.apache.org/spec/#equality-delete-files), + /// only evolves the specified `equality_ids` columns, not all table columns. pub(crate) async fn evolve_schema( record_batch_stream: ArrowRecordBatchStream, target_schema: Arc, + equality_ids: &[i32], ) -> Result { - let eq_ids = target_schema - .as_ref() - .field_id_to_name_map() - .keys() - .cloned() - .collect::>(); - let mut record_batch_transformer = - RecordBatchTransformer::build(target_schema.clone(), &eq_ids); + RecordBatchTransformer::build(target_schema.clone(), equality_ids); let record_batch_stream = record_batch_stream.map(move |record_batch| { record_batch.and_then(|record_batch| { @@ -106,7 +103,14 @@ impl DeleteFileLoader for BasicDeleteFileLoader { ) -> Result { let raw_batch_stream = self.parquet_to_batch_stream(&task.file_path).await?; - Self::evolve_schema(raw_batch_stream, schema).await + // For equality deletes, only evolve the equality_ids columns. + // For positional deletes (equality_ids is None), use all field IDs. + let field_ids = match &task.equality_ids { + Some(ids) => ids.clone(), + None => schema.field_id_to_name_map().keys().cloned().collect(), + }; + + Self::evolve_schema(raw_batch_stream, schema, &field_ids).await } }