Skip to content

Commit 68af86e

Browse files
mbutrovichliurenjie1024
authored andcommitted
feat(reader): Add PartitionSpec support to FileScanTask and RecordBatchTransformer (apache#1821)
Partially address apache#1749. This PR adds partition spec handling to `FileScanTask` and `RecordBatchTransformer` to correctly implement the Iceberg spec's "Column Projection" rules for fields "not present" in data files. Prior to this PR, `iceberg-rust`'s `FileScanTask` had no mechanism to pass partition information to `RecordBatchTransformer`, causing two issues: 1. **Incorrect handling of bucket partitioning**: Couldn't distinguish identity transforms (which should use partition metadata constants) from non-identity transforms like bucket/truncate/year/month (which must read from data file). For example, `bucket(4, id)` stores `id_bucket = 2` (bucket number) in partition metadata, but actual `id` values (100, 200, 300) are only in the data file. iceberg-rust was incorrectly treating bucket-partitioned source columns as constants, breaking runtime filtering and returning incorrect query results. 2. **Field ID conflicts in add_files scenarios**: When importing Hive tables via `add_files`, partition columns could have field IDs conflicting with Parquet data columns. Example: Parquet has field_id=1→"name", but Iceberg expects field_id=1→"id" (partition). Per spec, the correct field is "not present" and requires name mapping fallback. Per the Iceberg spec (https://iceberg.apache.org/spec/#column-projection), when a field ID is "not present" in a data file, it must be resolved using these rules: 1. Return the value from partition metadata if an **Identity Transform** exists 2. Use `schema.name-mapping.default` metadata to map field id to columns without field id 3. Return the default value if it has a defined `initial-default` 4. Return null in all other cases **Why this matters:** - **Identity transforms** (e.g., `identity(dept)`) store actual column values in partition metadata that can be used as constants without reading the data file - **Non-identity transforms** (e.g., `bucket(4, id)`, `day(timestamp)`) store transformed values in partition metadata (e.g., bucket number 2, not the actual `id` values 100, 200, 300) and must read source columns from the data file 1. **Added partition fields to `FileScanTask`** (`scan/task.rs`): - `partition: Option<Struct>` - Partition data from manifest entry - `partition_spec: Option<Arc<PartitionSpec>>` - For transform-aware constant detection - `name_mapping: Option<Arc<NameMapping>>` - Name mapping from table metadata 2. **Implemented `constants_map()` function** (`arrow/record_batch_transformer.rs`): - Replicates Java's `PartitionUtil.constantsMap()` behavior - Only includes fields where transform is `Transform::Identity` - Used to determine which fields use partition metadata constants vs. reading from data files 3. **Enhanced `RecordBatchTransformer`** (`arrow/record_batch_transformer.rs`): - Added `build_with_partition_data()` method to accept partition spec, partition data, and name mapping - Implements all 4 spec rules for column resolution with identity-transform awareness - Detects field ID conflicts by verifying both field ID AND name match - Falls back to name mapping when field IDs are missing/conflicting (spec rule risingwavelabs#2) 4. **Updated `ArrowReader`** (`arrow/reader.rs`): - Uses `build_with_partition_data()` when partition information is available - Falls back to `build()` when not available 5. **Updated manifest entry processing** (`scan/context.rs`): - Populates partition fields in `FileScanTask` from manifest entry data 1. **`bucket_partitioning_reads_source_column_from_file`** - Verifies that bucket-partitioned source columns are read from data files (not treated as constants from partition metadata) 2. **`identity_partition_uses_constant_from_metadata`** - Verifies that identity-transformed fields correctly use partition metadata constants 3. **`test_bucket_partitioning_with_renamed_source_column`** - Verifies field-ID-based mapping works despite column rename 4. **`add_files_partition_columns_without_field_ids`** - Verifies name mapping resolution for Hive table imports without field IDs (spec rule 5. **`add_files_with_true_field_id_conflict`** - Verifies correct field ID conflict detection with name mapping fallback (spec rule risingwavelabs#2) 6. **`test_all_four_spec_rules`** - Integration test verifying all 4 spec rules work together Yes, there are 6 new unit tests covering all 4 Iceberg spec rules. This also resolved approximately 50 Iceberg Java tests when running with DataFusion Comet's experimental apache/datafusion-comet#2528 PR. --------- Co-authored-by: Renjie Liu <[email protected]>
1 parent 4bf15b9 commit 68af86e

File tree

9 files changed

+1250
-94
lines changed

9 files changed

+1250
-94
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,9 @@ mod tests {
886886
sequence_number: 0,
887887
equality_ids: None,
888888
file_size_in_bytes: 0,
889+
partition: None,
890+
partition_spec: None,
891+
name_mapping: None,
889892
};
890893

891894
let eq_del_scan_task = FileScanTask {
@@ -902,6 +905,9 @@ mod tests {
902905
sequence_number: 0,
903906
equality_ids: eq_del.equality_ids.clone(),
904907
file_size_in_bytes: 0,
908+
partition: None,
909+
partition_spec: None,
910+
name_mapping: None,
905911
};
906912

907913
let file_scan_task = FileScanTask {
@@ -918,6 +924,9 @@ mod tests {
918924
data_file_content: DataContentType::Data,
919925
equality_ids: None,
920926
file_size_in_bytes: 0,
927+
partition: None,
928+
partition_spec: None,
929+
name_mapping: None,
921930
};
922931

923932
// Load the deletes - should handle both types without error

crates/iceberg/src/arrow/delete_file_loader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::sync::Arc;
2020
use futures::{StreamExt, TryStreamExt};
2121

2222
use crate::arrow::ArrowReader;
23-
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
23+
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
2424
use crate::io::FileIO;
2525
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
2626
use crate::spec::{Schema, SchemaRef};
@@ -82,7 +82,7 @@ impl BasicDeleteFileLoader {
8282
equality_ids: &[i32],
8383
) -> Result<ArrowRecordBatchStream> {
8484
let mut record_batch_transformer =
85-
RecordBatchTransformer::build(target_schema.clone(), equality_ids);
85+
RecordBatchTransformerBuilder::new(target_schema.clone(), equality_ids).build();
8686

8787
let record_batch_stream = record_batch_stream.map(move |record_batch| {
8888
record_batch.and_then(|record_batch| {

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,9 @@ pub(crate) mod tests {
324324
sequence_number: 0,
325325
equality_ids: None,
326326
file_size_in_bytes: 0,
327+
partition: None,
328+
partition_spec: None,
329+
name_mapping: None,
327330
};
328331

329332
let file_scan_tasks = vec![
@@ -341,6 +344,9 @@ pub(crate) mod tests {
341344
sequence_number: 0,
342345
equality_ids: None,
343346
file_size_in_bytes: 0,
347+
partition: None,
348+
partition_spec: None,
349+
name_mapping: None,
344350
},
345351
FileScanTask {
346352
start: 0,
@@ -356,6 +362,9 @@ pub(crate) mod tests {
356362
sequence_number: 0,
357363
equality_ids: None,
358364
file_size_in_bytes: 0,
365+
partition: None,
366+
partition_spec: None,
367+
name_mapping: None,
359368
},
360369
];
361370

0 commit comments

Comments
 (0)