Skip to content

Commit 29dde0e

Browse files
committed
Handle field ID conflicts in add_files with name mapping
1 parent 75cc2bc commit 29dde0e

File tree

4 files changed

+298
-84
lines changed

4 files changed

+298
-84
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -788,4 +788,120 @@ mod tests {
788788
assert_eq!(data_col.value(1), "d");
789789
assert_eq!(data_col.value(2), "g");
790790
}
791+
792+
/// Test loading a FileScanTask with BOTH positional and equality deletes.
793+
/// Verifies the fix for the inverted condition that caused "Missing predicate for equality delete file" errors.
794+
#[tokio::test]
795+
async fn test_load_deletes_with_mixed_types() {
796+
use crate::scan::FileScanTask;
797+
use crate::spec::{DataFileFormat, Schema};
798+
799+
let tmp_dir = TempDir::new().unwrap();
800+
let table_location = tmp_dir.path();
801+
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
802+
.unwrap()
803+
.build()
804+
.unwrap();
805+
806+
// Create the data file schema
807+
let data_file_schema = Arc::new(
808+
Schema::builder()
809+
.with_fields(vec![
810+
crate::spec::NestedField::optional(
811+
2,
812+
"y",
813+
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
814+
)
815+
.into(),
816+
crate::spec::NestedField::optional(
817+
3,
818+
"z",
819+
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
820+
)
821+
.into(),
822+
])
823+
.build()
824+
.unwrap(),
825+
);
826+
827+
// Write positional delete file
828+
let positional_delete_schema = crate::arrow::delete_filter::tests::create_pos_del_schema();
829+
let file_path_values =
830+
vec![format!("{}/data-1.parquet", table_location.to_str().unwrap()); 4];
831+
let file_path_col = Arc::new(StringArray::from_iter_values(&file_path_values));
832+
let pos_col = Arc::new(Int64Array::from_iter_values(vec![0i64, 1, 2, 3]));
833+
834+
let positional_deletes_to_write =
835+
RecordBatch::try_new(positional_delete_schema.clone(), vec![
836+
file_path_col,
837+
pos_col,
838+
])
839+
.unwrap();
840+
841+
let props = WriterProperties::builder()
842+
.set_compression(Compression::SNAPPY)
843+
.build();
844+
845+
let pos_del_path = format!("{}/pos-del-mixed.parquet", table_location.to_str().unwrap());
846+
let file = File::create(&pos_del_path).unwrap();
847+
let mut writer = ArrowWriter::try_new(
848+
file,
849+
positional_deletes_to_write.schema(),
850+
Some(props.clone()),
851+
)
852+
.unwrap();
853+
writer.write(&positional_deletes_to_write).unwrap();
854+
writer.close().unwrap();
855+
856+
// Write equality delete file
857+
let eq_delete_path = setup_write_equality_delete_file_1(table_location.to_str().unwrap());
858+
859+
// Create FileScanTask with BOTH positional and equality deletes
860+
let pos_del = FileScanTaskDeleteFile {
861+
file_path: pos_del_path,
862+
file_type: DataContentType::PositionDeletes,
863+
partition_spec_id: 0,
864+
equality_ids: None,
865+
};
866+
867+
let eq_del = FileScanTaskDeleteFile {
868+
file_path: eq_delete_path.clone(),
869+
file_type: DataContentType::EqualityDeletes,
870+
partition_spec_id: 0,
871+
equality_ids: Some(vec![2, 3]), // Only use field IDs that exist in both schemas
872+
};
873+
874+
let file_scan_task = FileScanTask {
875+
start: 0,
876+
length: 0,
877+
record_count: None,
878+
data_file_path: format!("{}/data-1.parquet", table_location.to_str().unwrap()),
879+
data_file_format: DataFileFormat::Parquet,
880+
schema: data_file_schema.clone(),
881+
project_field_ids: vec![2, 3],
882+
predicate: None,
883+
deletes: vec![pos_del, eq_del],
884+
partition: None,
885+
partition_spec: None,
886+
name_mapping: None,
887+
};
888+
889+
// Load the deletes - should handle both types without error
890+
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
891+
let delete_filter = delete_file_loader
892+
.load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref())
893+
.await
894+
.unwrap()
895+
.unwrap();
896+
897+
// Verify both delete types can be processed together
898+
let result = delete_filter
899+
.build_equality_delete_predicate(&file_scan_task)
900+
.await;
901+
assert!(
902+
result.is_ok(),
903+
"Failed to build equality delete predicate: {:?}",
904+
result.err()
905+
);
906+
}
791907
}

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ pub(crate) mod tests {
341341
deletes: vec![pos_del_1, pos_del_2.clone()],
342342
partition: None,
343343
partition_spec: None,
344-
name_mapping: None,
344+
name_mapping: None,
345345
},
346346
FileScanTask {
347347
start: 0,
@@ -355,7 +355,7 @@ pub(crate) mod tests {
355355
deletes: vec![pos_del_3],
356356
partition: None,
357357
partition_spec: None,
358-
name_mapping: None,
358+
name_mapping: None,
359359
},
360360
];
361361

crates/iceberg/src/arrow/reader.rs

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -234,21 +234,16 @@ impl ArrowReader {
234234

235235
// RecordBatchTransformer performs any transformations required on the RecordBatches
236236
// that come back from the file, such as type promotion, default column insertion
237-
// and column re-ordering
238-
let mut record_batch_transformer =
239-
if task.partition_spec.is_some() && task.partition.is_some() {
240-
// Use partition spec and data for proper constant identification
241-
RecordBatchTransformer::build_with_partition_data(
242-
task.schema_ref(),
243-
task.project_field_ids(),
244-
task.partition_spec.clone(),
245-
task.partition.clone(),
246-
task.name_mapping.clone(),
247-
)
248-
} else {
249-
// Fallback to build without partition data
250-
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids())
251-
};
237+
// and column re-ordering.
238+
// Always use build_with_partition_data to ensure name_mapping is passed through,
239+
// even when partition spec/data aren't available.
240+
let mut record_batch_transformer = RecordBatchTransformer::build_with_partition_data(
241+
task.schema_ref(),
242+
task.project_field_ids(),
243+
task.partition_spec.clone(),
244+
task.partition.clone(),
245+
task.name_mapping.clone(),
246+
);
252247

253248
if let Some(batch_size) = batch_size {
254249
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
@@ -1962,7 +1957,7 @@ message schema {
19621957
deletes: vec![],
19631958
partition: None,
19641959
partition_spec: None,
1965-
name_mapping: None,
1960+
name_mapping: None,
19661961
})]
19671962
.into_iter(),
19681963
)) as FileScanTaskStream;
@@ -2431,7 +2426,7 @@ message schema {
24312426
deletes: vec![],
24322427
partition: None,
24332428
partition_spec: None,
2434-
name_mapping: None,
2429+
name_mapping: None,
24352430
})]
24362431
.into_iter(),
24372432
)) as FileScanTaskStream;
@@ -3134,7 +3129,7 @@ message schema {
31343129
deletes: vec![],
31353130
partition: None,
31363131
partition_spec: None,
3137-
name_mapping: None,
3132+
name_mapping: None,
31383133
})]
31393134
.into_iter(),
31403135
)) as FileScanTaskStream;
@@ -3231,7 +3226,7 @@ message schema {
32313226
deletes: vec![],
32323227
partition: None,
32333228
partition_spec: None,
3234-
name_mapping: None,
3229+
name_mapping: None,
32353230
})]
32363231
.into_iter(),
32373232
)) as FileScanTaskStream;
@@ -3317,7 +3312,7 @@ message schema {
33173312
deletes: vec![],
33183313
partition: None,
33193314
partition_spec: None,
3320-
name_mapping: None,
3315+
name_mapping: None,
33213316
})]
33223317
.into_iter(),
33233318
)) as FileScanTaskStream;
@@ -3417,7 +3412,7 @@ message schema {
34173412
deletes: vec![],
34183413
partition: None,
34193414
partition_spec: None,
3420-
name_mapping: None,
3415+
name_mapping: None,
34213416
})]
34223417
.into_iter(),
34233418
)) as FileScanTaskStream;
@@ -3546,7 +3541,7 @@ message schema {
35463541
deletes: vec![],
35473542
partition: None,
35483543
partition_spec: None,
3549-
name_mapping: None,
3544+
name_mapping: None,
35503545
})]
35513546
.into_iter(),
35523547
)) as FileScanTaskStream;
@@ -3642,7 +3637,7 @@ message schema {
36423637
deletes: vec![],
36433638
partition: None,
36443639
partition_spec: None,
3645-
name_mapping: None,
3640+
name_mapping: None,
36463641
})]
36473642
.into_iter(),
36483643
)) as FileScanTaskStream;
@@ -3751,7 +3746,7 @@ message schema {
37513746
deletes: vec![],
37523747
partition: None,
37533748
partition_spec: None,
3754-
name_mapping: None,
3749+
name_mapping: None,
37553750
})]
37563751
.into_iter(),
37573752
)) as FileScanTaskStream;

0 commit comments

Comments
 (0)