Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,4 +788,117 @@ mod tests {
assert_eq!(data_col.value(1), "d");
assert_eq!(data_col.value(2), "g");
}

/// Test loading a FileScanTask with BOTH positional and equality deletes.
/// Verifies the fix for the inverted condition that caused "Missing predicate for equality delete file" errors.
#[tokio::test]
async fn test_load_deletes_with_mixed_types() {
use crate::scan::FileScanTask;
use crate::spec::{DataFileFormat, Schema};

let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path();
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
.unwrap()
.build()
.unwrap();

// Create the data file schema
let data_file_schema = Arc::new(
Schema::builder()
.with_fields(vec![
crate::spec::NestedField::optional(
2,
"y",
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
)
.into(),
crate::spec::NestedField::optional(
3,
"z",
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
)
.into(),
])
.build()
.unwrap(),
);

// Write positional delete file
let positional_delete_schema = crate::arrow::delete_filter::tests::create_pos_del_schema();
let file_path_values =
vec![format!("{}/data-1.parquet", table_location.to_str().unwrap()); 4];
let file_path_col = Arc::new(StringArray::from_iter_values(&file_path_values));
let pos_col = Arc::new(Int64Array::from_iter_values(vec![0i64, 1, 2, 3]));

let positional_deletes_to_write =
RecordBatch::try_new(positional_delete_schema.clone(), vec![
file_path_col,
pos_col,
])
.unwrap();

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();

let pos_del_path = format!("{}/pos-del-mixed.parquet", table_location.to_str().unwrap());
let file = File::create(&pos_del_path).unwrap();
let mut writer = ArrowWriter::try_new(
file,
positional_deletes_to_write.schema(),
Some(props.clone()),
)
.unwrap();
writer.write(&positional_deletes_to_write).unwrap();
writer.close().unwrap();

// Write equality delete file
let eq_delete_path = setup_write_equality_delete_file_1(table_location.to_str().unwrap());

// Create FileScanTask with BOTH positional and equality deletes
let pos_del = FileScanTaskDeleteFile {
file_path: pos_del_path,
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
};

let eq_del = FileScanTaskDeleteFile {
file_path: eq_delete_path.clone(),
file_type: DataContentType::EqualityDeletes,
partition_spec_id: 0,
equality_ids: Some(vec![2, 3]), // Only use field IDs that exist in both schemas
};

let file_scan_task = FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/data-1.parquet", table_location.to_str().unwrap()),
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![2, 3],
predicate: None,
deletes: vec![pos_del, eq_del],
};

// Load the deletes - should handle both types without error
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
let delete_filter = delete_file_loader
.load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref())
.await
.unwrap()
.unwrap();

// Verify both delete types can be processed together
let result = delete_filter
.build_equality_delete_predicate(&file_scan_task)
.await;
assert!(
result.is_ok(),
"Failed to build equality delete predicate: {:?}",
result.err()
);
}
}
4 changes: 3 additions & 1 deletion crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ impl DeleteFilter {
pub(crate) fn try_start_eq_del_load(&self, file_path: &str) -> Option<Arc<Notify>> {
let mut state = self.state.write().unwrap();

if !state.equality_deletes.contains_key(file_path) {
// Skip if already loaded/loading - another task owns it
if state.equality_deletes.contains_key(file_path) {
return None;
}

// Mark as loading to prevent duplicate work
let notifier = Arc::new(Notify::new());
state
.equality_deletes
Expand Down
Loading