diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 078635c9b..8a3ab3a95 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -788,4 +788,117 @@ mod tests { assert_eq!(data_col.value(1), "d"); assert_eq!(data_col.value(2), "g"); } + + /// Test loading a FileScanTask with BOTH positional and equality deletes. + /// Verifies the fix for the inverted condition that caused "Missing predicate for equality delete file" errors. + #[tokio::test] + async fn test_load_deletes_with_mixed_types() { + use crate::scan::FileScanTask; + use crate::spec::{DataFileFormat, Schema}; + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path(); + let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + // Create the data file schema + let data_file_schema = Arc::new( + Schema::builder() + .with_fields(vec![ + crate::spec::NestedField::optional( + 2, + "y", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long), + ) + .into(), + crate::spec::NestedField::optional( + 3, + "z", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + // Write positional delete file + let positional_delete_schema = crate::arrow::delete_filter::tests::create_pos_del_schema(); + let file_path_values = + vec![format!("{}/data-1.parquet", table_location.to_str().unwrap()); 4]; + let file_path_col = Arc::new(StringArray::from_iter_values(&file_path_values)); + let pos_col = Arc::new(Int64Array::from_iter_values(vec![0i64, 1, 2, 3])); + + let positional_deletes_to_write = + RecordBatch::try_new(positional_delete_schema.clone(), vec![ + file_path_col, + pos_col, + ]) + .unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let pos_del_path = format!("{}/pos-del-mixed.parquet", table_location.to_str().unwrap()); + let file = File::create(&pos_del_path).unwrap(); + let mut writer = ArrowWriter::try_new( + file, + positional_deletes_to_write.schema(), + Some(props.clone()), + ) + .unwrap(); + writer.write(&positional_deletes_to_write).unwrap(); + writer.close().unwrap(); + + // Write equality delete file + let eq_delete_path = setup_write_equality_delete_file_1(table_location.to_str().unwrap()); + + // Create FileScanTask with BOTH positional and equality deletes + let pos_del = FileScanTaskDeleteFile { + file_path: pos_del_path, + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: None, + }; + + let eq_del = FileScanTaskDeleteFile { + file_path: eq_delete_path.clone(), + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![2, 3]), // Only use field IDs that exist in both schemas + }; + + let file_scan_task = FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/data-1.parquet", table_location.to_str().unwrap()), + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![2, 3], + predicate: None, + deletes: vec![pos_del, eq_del], + }; + + // Load the deletes - should handle both types without error + let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let delete_filter = delete_file_loader + .load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref()) + .await + .unwrap() + .unwrap(); + + // Verify both delete types can be processed together + let result = delete_filter + .build_equality_delete_predicate(&file_scan_task) + .await; + assert!( + result.is_ok(), + "Failed to build equality delete predicate: {:?}", + result.err() + ); + } } diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index b853baa99..4250974bc 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -68,10 +68,12 @@ impl DeleteFilter { pub(crate) fn try_start_eq_del_load(&self, file_path: &str) -> Option> { let mut state = self.state.write().unwrap(); - if !state.equality_deletes.contains_key(file_path) { + // Skip if already loaded/loading - another task owns it + if state.equality_deletes.contains_key(file_path) { return None; } + // Mark as loading to prevent duplicate work let notifier = Arc::new(Notify::new()); state .equality_deletes