Skip to content

Commit 835f528

Browse files
authored
fix(reader): Equality delete files with partial schemas (containing only equality columns) (#1782)
## What issue does this PR close? Partially address #1749. ## Rationale for this change Equality delete files with partial schemas (containing only equality columns) were hitting Arrow validation errors: "Column 'id' is declared as non-nullable but contains null values” in Iceberg Java’s TestSparkReaderDeletes suite. The bug occurs because `evolve_schema()` adds missing columns with NULL values, which fails when those columns are declared REQUIRED in the table schema. ## What changes are included in this PR? Change the `evolve_schema()` call to take `equality_ids` because per the Iceberg spec, evolve schema for equality deletes but only for the `equality_ids` columns, not all table columns. ## Are these changes tested? `test_partial_schema_equality_deletes_evolve_succeeds`
1 parent 05ba2d3 commit 835f528

File tree

2 files changed

+124
-18
lines changed

2 files changed

+124
-18
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 110 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -224,16 +224,22 @@ impl CachingDeleteFileLoader {
224224
let (sender, receiver) = channel();
225225
del_filter.insert_equality_delete(&task.file_path, receiver);
226226

227+
// Per the Iceberg spec, evolve schema for equality deletes but only for the
228+
// equality_ids columns, not all table columns.
229+
let equality_ids_vec = task.equality_ids.clone().unwrap();
230+
let evolved_stream = BasicDeleteFileLoader::evolve_schema(
231+
basic_delete_file_loader
232+
.parquet_to_batch_stream(&task.file_path)
233+
.await?,
234+
schema,
235+
&equality_ids_vec,
236+
)
237+
.await?;
238+
227239
Ok(DeleteFileContext::FreshEqDel {
228-
batch_stream: BasicDeleteFileLoader::evolve_schema(
229-
basic_delete_file_loader
230-
.parquet_to_batch_stream(&task.file_path)
231-
.await?,
232-
schema,
233-
)
234-
.await?,
240+
batch_stream: evolved_stream,
235241
sender,
236-
equality_ids: HashSet::from_iter(task.equality_ids.clone().unwrap()),
242+
equality_ids: HashSet::from_iter(equality_ids_vec),
237243
})
238244
}
239245

@@ -536,6 +542,7 @@ mod tests {
536542
use std::fs::File;
537543
use std::sync::Arc;
538544

545+
use arrow_array::cast::AsArray;
539546
use arrow_array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, StructArray};
540547
use arrow_schema::{DataType, Field, Fields};
541548
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
@@ -686,4 +693,99 @@ mod tests {
686693
let result = delete_filter.get_delete_vector(&file_scan_tasks[1]);
687694
assert!(result.is_none()); // no pos dels for file 3
688695
}
696+
697+
/// Verifies that evolve_schema on partial-schema equality deletes works correctly
698+
/// when only equality_ids columns are evolved, not all table columns.
699+
///
700+
/// Per the [Iceberg spec](https://iceberg.apache.org/spec/#equality-delete-files),
701+
/// equality delete files can contain only a subset of columns.
702+
#[tokio::test]
703+
async fn test_partial_schema_equality_deletes_evolve_succeeds() {
704+
let tmp_dir = TempDir::new().unwrap();
705+
let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
706+
707+
// Create table schema with REQUIRED fields
708+
let table_schema = Arc::new(
709+
Schema::builder()
710+
.with_schema_id(1)
711+
.with_fields(vec![
712+
crate::spec::NestedField::required(
713+
1,
714+
"id",
715+
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Int),
716+
)
717+
.into(),
718+
crate::spec::NestedField::required(
719+
2,
720+
"data",
721+
crate::spec::Type::Primitive(crate::spec::PrimitiveType::String),
722+
)
723+
.into(),
724+
])
725+
.build()
726+
.unwrap(),
727+
);
728+
729+
// Write equality delete file with PARTIAL schema (only 'data' column)
730+
let delete_file_path = {
731+
let data_vals = vec!["a", "d", "g"];
732+
let data_col = Arc::new(StringArray::from(data_vals)) as ArrayRef;
733+
734+
let delete_schema = Arc::new(arrow_schema::Schema::new(vec![simple_field(
735+
"data",
736+
DataType::Utf8,
737+
false,
738+
"2", // field ID
739+
)]));
740+
741+
let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![data_col]).unwrap();
742+
743+
let path = format!("{}/partial-eq-deletes.parquet", &table_location);
744+
let file = File::create(&path).unwrap();
745+
let props = WriterProperties::builder()
746+
.set_compression(Compression::SNAPPY)
747+
.build();
748+
let mut writer =
749+
ArrowWriter::try_new(file, delete_batch.schema(), Some(props)).unwrap();
750+
writer.write(&delete_batch).expect("Writing batch");
751+
writer.close().unwrap();
752+
path
753+
};
754+
755+
let file_io = FileIO::from_path(table_location).unwrap().build().unwrap();
756+
let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
757+
758+
let batch_stream = basic_delete_file_loader
759+
.parquet_to_batch_stream(&delete_file_path)
760+
.await
761+
.unwrap();
762+
763+
// Only evolve the equality_ids columns (field 2), not all table columns
764+
let equality_ids = vec![2];
765+
let evolved_stream =
766+
BasicDeleteFileLoader::evolve_schema(batch_stream, table_schema, &equality_ids)
767+
.await
768+
.unwrap();
769+
770+
let result = evolved_stream.try_collect::<Vec<_>>().await;
771+
772+
assert!(
773+
result.is_ok(),
774+
"Expected success when evolving only equality_ids columns, got error: {:?}",
775+
result.err()
776+
);
777+
778+
let batches = result.unwrap();
779+
assert_eq!(batches.len(), 1);
780+
781+
let batch = &batches[0];
782+
assert_eq!(batch.num_rows(), 3);
783+
assert_eq!(batch.num_columns(), 1); // Only 'data' column
784+
785+
// Verify the actual values are preserved after schema evolution
786+
let data_col = batch.column(0).as_string::<i32>();
787+
assert_eq!(data_col.value(0), "a");
788+
assert_eq!(data_col.value(1), "d");
789+
assert_eq!(data_col.value(2), "g");
790+
}
689791
}

crates/iceberg/src/arrow/delete_file_loader.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,20 +72,17 @@ impl BasicDeleteFileLoader {
7272
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
7373
}
7474

75-
/// Evolves the schema of the RecordBatches from an equality delete file
75+
/// Evolves the schema of the RecordBatches from an equality delete file.
76+
///
77+
/// Per the [Iceberg spec](https://iceberg.apache.org/spec/#equality-delete-files),
78+
/// only evolves the specified `equality_ids` columns, not all table columns.
7679
pub(crate) async fn evolve_schema(
7780
record_batch_stream: ArrowRecordBatchStream,
7881
target_schema: Arc<Schema>,
82+
equality_ids: &[i32],
7983
) -> Result<ArrowRecordBatchStream> {
80-
let eq_ids = target_schema
81-
.as_ref()
82-
.field_id_to_name_map()
83-
.keys()
84-
.cloned()
85-
.collect::<Vec<_>>();
86-
8784
let mut record_batch_transformer =
88-
RecordBatchTransformer::build(target_schema.clone(), &eq_ids);
85+
RecordBatchTransformer::build(target_schema.clone(), equality_ids);
8986

9087
let record_batch_stream = record_batch_stream.map(move |record_batch| {
9188
record_batch.and_then(|record_batch| {
@@ -106,7 +103,14 @@ impl DeleteFileLoader for BasicDeleteFileLoader {
106103
) -> Result<ArrowRecordBatchStream> {
107104
let raw_batch_stream = self.parquet_to_batch_stream(&task.file_path).await?;
108105

109-
Self::evolve_schema(raw_batch_stream, schema).await
106+
// For equality deletes, only evolve the equality_ids columns.
107+
// For positional deletes (equality_ids is None), use all field IDs.
108+
let field_ids = match &task.equality_ids {
109+
Some(ids) => ids.clone(),
110+
None => schema.field_id_to_name_map().keys().cloned().collect(),
111+
};
112+
113+
Self::evolve_schema(raw_batch_stream, schema, &field_ids).await
110114
}
111115
}
112116

0 commit comments

Comments
 (0)