Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 203 additions & 7 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,12 @@ impl CachingDeleteFileLoader {
let (sender, receiver) = channel();
del_filter.insert_equality_delete(&task.file_path, receiver);

// Equality deletes intentionally have partial schemas. Schema evolution would add
// NULL values for missing REQUIRED columns, causing Arrow validation to fail.
Ok(DeleteFileContext::FreshEqDel {
batch_stream: BasicDeleteFileLoader::evolve_schema(
basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path)
.await?,
schema,
)
.await?,
batch_stream: basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path)
.await?,
sender,
equality_ids: HashSet::from_iter(task.equality_ids.clone().unwrap()),
})
Expand Down Expand Up @@ -686,4 +684,202 @@ mod tests {
let result = delete_filter.get_delete_vector(&file_scan_tasks[1]);
assert!(result.is_none()); // no pos dels for file 3
}

/// Verifies that evolve_schema on partial-schema equality deletes fails with Arrow
/// validation errors when missing REQUIRED columns are filled with NULLs.
///
/// Reproduces the issue that caused 14 TestSparkReaderDeletes failures in Iceberg Java.
#[tokio::test]
async fn test_partial_schema_equality_deletes_evolve_fails() {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().as_os_str().to_str().unwrap();

// Create table schema with REQUIRED fields
let table_schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
crate::spec::NestedField::required(
1,
"id",
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Int),
)
.into(),
crate::spec::NestedField::required(
2,
"data",
crate::spec::Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
])
.build()
.unwrap(),
);

// Write equality delete file with PARTIAL schema (only 'data' column)
let delete_file_path = {
let data_vals = vec!["a", "d", "g"];
let data_col = Arc::new(StringArray::from(data_vals)) as ArrayRef;

let delete_schema = Arc::new(arrow_schema::Schema::new(vec![simple_field(
"data",
DataType::Utf8,
false,
"2", // field ID
)]));

let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![data_col]).unwrap();

let path = format!("{}/partial-eq-deletes.parquet", &table_location);
let file = File::create(&path).unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let mut writer =
ArrowWriter::try_new(file, delete_batch.schema(), Some(props)).unwrap();
writer.write(&delete_batch).expect("Writing batch");
writer.close().unwrap();
path
};

let file_io = FileIO::from_path(table_location).unwrap().build().unwrap();
let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());

let batch_stream = basic_delete_file_loader
.parquet_to_batch_stream(&delete_file_path)
.await
.unwrap();

let mut evolved_stream = BasicDeleteFileLoader::evolve_schema(batch_stream, table_schema)
.await
.unwrap();

let result = evolved_stream.next().await.unwrap();

assert!(
result.is_err(),
"Expected error from evolve_schema adding NULL to non-nullable column"
);

let err = result.unwrap_err();
let err_msg = err.to_string();
assert!(
err_msg.contains("non-nullable") || err_msg.contains("null values"),
"Expected null value error, got: {}",
err_msg
);
}

/// Test loading a FileScanTask with BOTH positional and equality deletes.
/// Verifies the fix for the inverted condition that caused "Missing predicate for equality delete file" errors.
#[tokio::test]
async fn test_load_deletes_with_mixed_types() {
use crate::scan::FileScanTask;
use crate::spec::{DataFileFormat, Schema};

let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path();
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
.unwrap()
.build()
.unwrap();

// Create the data file schema
let data_file_schema = Arc::new(
Schema::builder()
.with_fields(vec![
crate::spec::NestedField::optional(
2,
"y",
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
)
.into(),
crate::spec::NestedField::optional(
3,
"z",
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
)
.into(),
])
.build()
.unwrap(),
);

// Write positional delete file
let positional_delete_schema = crate::arrow::delete_filter::tests::create_pos_del_schema();
let file_path_values =
vec![format!("{}/data-1.parquet", table_location.to_str().unwrap()); 4];
let file_path_col = Arc::new(StringArray::from_iter_values(&file_path_values));
let pos_col = Arc::new(Int64Array::from_iter_values(vec![0i64, 1, 2, 3]));

let positional_deletes_to_write =
RecordBatch::try_new(positional_delete_schema.clone(), vec![
file_path_col,
pos_col,
])
.unwrap();

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();

let pos_del_path = format!("{}/pos-del-mixed.parquet", table_location.to_str().unwrap());
let file = File::create(&pos_del_path).unwrap();
let mut writer = ArrowWriter::try_new(
file,
positional_deletes_to_write.schema(),
Some(props.clone()),
)
.unwrap();
writer.write(&positional_deletes_to_write).unwrap();
writer.close().unwrap();

// Write equality delete file
let eq_delete_path = setup_write_equality_delete_file_1(table_location.to_str().unwrap());

// Create FileScanTask with BOTH positional and equality deletes
let pos_del = FileScanTaskDeleteFile {
file_path: pos_del_path,
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
};

let eq_del = FileScanTaskDeleteFile {
file_path: eq_delete_path.clone(),
file_type: DataContentType::EqualityDeletes,
partition_spec_id: 0,
equality_ids: Some(vec![2, 3]), // Only use field IDs that exist in both schemas
};

let file_scan_task = FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/data-1.parquet", table_location.to_str().unwrap()),
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![2, 3],
predicate: None,
deletes: vec![pos_del, eq_del],
};

// Load the deletes - should handle both types without error
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
let delete_filter = delete_file_loader
.load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref())
.await
.unwrap()
.unwrap();

// Verify both delete types can be processed together
let result = delete_filter
.build_equality_delete_predicate(&file_scan_task)
.await;
assert!(
result.is_ok(),
"Failed to build equality delete predicate: {:?}",
result.err()
);
}
}
4 changes: 3 additions & 1 deletion crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ impl DeleteFilter {
pub(crate) fn try_start_eq_del_load(&self, file_path: &str) -> Option<Arc<Notify>> {
let mut state = self.state.write().unwrap();

if !state.equality_deletes.contains_key(file_path) {
// Skip if already loaded/loading - another task owns it
if state.equality_deletes.contains_key(file_path) {
return None;
}

// Mark as loading to prevent duplicate work
let notifier = Arc::new(Notify::new());
state
.equality_deletes
Expand Down
Loading