Skip to content

Commit fd08916

Browse files
authored
fix(reader): Support both position and equality delete files on the same FileScanTask (#1778)
## What issue does this PR close? Partially address #1749. ## Rationale for this change This PR fixes a bug in delete file loading when a `FileScanTask` contains both positional and equality delete files. We hit this when running Iceberg Java test suite via Comet in apache/datafusion-comet#2528. The tests that failed were ``` TestSparkExecutorCache > testMergeOnReadUpdate() TestSparkExecutorCache > testMergeOnReadMerge() TestSparkExecutorCache > testMergeOnReadDelete() ``` **The Bug:** The condition in `try_start_eq_del_load` (delete_filter.rs:71-73) was inverted. It returned `None` when the equality delete file was not in the cache, causing the loader to skip loading it. When `build_equality_delete_predicate` was later called, it would fail with "Missing predicate for equality delete file". ## What changes are included in this PR? **The Fix:** - Inverted the condition so it returns `None` when the file is already in the cache (being loaded or loaded), preventing duplicate work across concurrent tasks - When the file is not in the cache, mark it as Loading and proceed with loading **Additional Changes:** - Added test case `test_load_deletes_with_mixed_types` that reproduces the bug scenario ## Are these changes tested? Yes, this PR includes a new unit test `test_load_deletes_with_mixed_types` that: - Creates a `FileScanTask` with both a positional delete file and an equality delete file - Verifies that `load_deletes` successfully processes both types - Verifies that `build_equality_delete_predicate` succeeds without the "Missing predicate" error - We hit this when running Iceberg Java test suite via Comet in apache/datafusion-comet#2528. I also confirmed that it fixes the tests in Iceberg Java's suite. The test would fail before this fix and passes after.
1 parent b6dc4cf commit fd08916

File tree

2 files changed

+116
-1
lines changed

2 files changed

+116
-1
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -788,4 +788,117 @@ mod tests {
788788
assert_eq!(data_col.value(1), "d");
789789
assert_eq!(data_col.value(2), "g");
790790
}
791+
792+
/// Test loading a FileScanTask with BOTH positional and equality deletes.
793+
/// Verifies the fix for the inverted condition that caused "Missing predicate for equality delete file" errors.
794+
#[tokio::test]
795+
async fn test_load_deletes_with_mixed_types() {
796+
use crate::scan::FileScanTask;
797+
use crate::spec::{DataFileFormat, Schema};
798+
799+
let tmp_dir = TempDir::new().unwrap();
800+
let table_location = tmp_dir.path();
801+
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
802+
.unwrap()
803+
.build()
804+
.unwrap();
805+
806+
// Create the data file schema
807+
let data_file_schema = Arc::new(
808+
Schema::builder()
809+
.with_fields(vec![
810+
crate::spec::NestedField::optional(
811+
2,
812+
"y",
813+
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
814+
)
815+
.into(),
816+
crate::spec::NestedField::optional(
817+
3,
818+
"z",
819+
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
820+
)
821+
.into(),
822+
])
823+
.build()
824+
.unwrap(),
825+
);
826+
827+
// Write positional delete file
828+
let positional_delete_schema = crate::arrow::delete_filter::tests::create_pos_del_schema();
829+
let file_path_values =
830+
vec![format!("{}/data-1.parquet", table_location.to_str().unwrap()); 4];
831+
let file_path_col = Arc::new(StringArray::from_iter_values(&file_path_values));
832+
let pos_col = Arc::new(Int64Array::from_iter_values(vec![0i64, 1, 2, 3]));
833+
834+
let positional_deletes_to_write =
835+
RecordBatch::try_new(positional_delete_schema.clone(), vec![
836+
file_path_col,
837+
pos_col,
838+
])
839+
.unwrap();
840+
841+
let props = WriterProperties::builder()
842+
.set_compression(Compression::SNAPPY)
843+
.build();
844+
845+
let pos_del_path = format!("{}/pos-del-mixed.parquet", table_location.to_str().unwrap());
846+
let file = File::create(&pos_del_path).unwrap();
847+
let mut writer = ArrowWriter::try_new(
848+
file,
849+
positional_deletes_to_write.schema(),
850+
Some(props.clone()),
851+
)
852+
.unwrap();
853+
writer.write(&positional_deletes_to_write).unwrap();
854+
writer.close().unwrap();
855+
856+
// Write equality delete file
857+
let eq_delete_path = setup_write_equality_delete_file_1(table_location.to_str().unwrap());
858+
859+
// Create FileScanTask with BOTH positional and equality deletes
860+
let pos_del = FileScanTaskDeleteFile {
861+
file_path: pos_del_path,
862+
file_type: DataContentType::PositionDeletes,
863+
partition_spec_id: 0,
864+
equality_ids: None,
865+
};
866+
867+
let eq_del = FileScanTaskDeleteFile {
868+
file_path: eq_delete_path.clone(),
869+
file_type: DataContentType::EqualityDeletes,
870+
partition_spec_id: 0,
871+
equality_ids: Some(vec![2, 3]), // Only use field IDs that exist in both schemas
872+
};
873+
874+
let file_scan_task = FileScanTask {
875+
start: 0,
876+
length: 0,
877+
record_count: None,
878+
data_file_path: format!("{}/data-1.parquet", table_location.to_str().unwrap()),
879+
data_file_format: DataFileFormat::Parquet,
880+
schema: data_file_schema.clone(),
881+
project_field_ids: vec![2, 3],
882+
predicate: None,
883+
deletes: vec![pos_del, eq_del],
884+
};
885+
886+
// Load the deletes - should handle both types without error
887+
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
888+
let delete_filter = delete_file_loader
889+
.load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref())
890+
.await
891+
.unwrap()
892+
.unwrap();
893+
894+
// Verify both delete types can be processed together
895+
let result = delete_filter
896+
.build_equality_delete_predicate(&file_scan_task)
897+
.await;
898+
assert!(
899+
result.is_ok(),
900+
"Failed to build equality delete predicate: {:?}",
901+
result.err()
902+
);
903+
}
791904
}

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,12 @@ impl DeleteFilter {
6868
pub(crate) fn try_start_eq_del_load(&self, file_path: &str) -> Option<Arc<Notify>> {
6969
let mut state = self.state.write().unwrap();
7070

71-
if !state.equality_deletes.contains_key(file_path) {
71+
// Skip if already loaded/loading - another task owns it
72+
if state.equality_deletes.contains_key(file_path) {
7273
return None;
7374
}
7475

76+
// Mark as loading to prevent duplicate work
7577
let notifier = Arc::new(Notify::new());
7678
state
7779
.equality_deletes

0 commit comments

Comments
 (0)