Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,9 @@ mod tests {
project_field_ids: vec![2, 3],
predicate: None,
deletes: vec![pos_del, eq_del],
partition: None,
partition_spec: None,
name_mapping: None,
};

// Load the deletes - should handle both types without error
Expand Down
6 changes: 6 additions & 0 deletions crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ pub(crate) mod tests {
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_1, pos_del_2.clone()],
partition: None,
partition_spec: None,
name_mapping: None,
},
FileScanTask {
start: 0,
Expand All @@ -352,6 +355,9 @@ pub(crate) mod tests {
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_3],
partition: None,
partition_spec: None,
name_mapping: None,
},
];

Expand Down
211 changes: 208 additions & 3 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,14 @@ impl ArrowReader {

// RecordBatchTransformer performs any transformations required on the RecordBatches
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());
// and column re-ordering.
let mut record_batch_transformer = RecordBatchTransformer::build_with_partition_data(
task.schema_ref(),
task.project_field_ids(),
task.partition_spec.clone(),
task.partition.clone(),
task.name_mapping.clone(),
);

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
Expand Down Expand Up @@ -1948,6 +1953,9 @@ message schema {
project_field_ids: vec![1],
predicate: Some(predicate.bind(schema, true).unwrap()),
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -2266,6 +2274,9 @@ message schema {
project_field_ids: vec![1],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
};

// Task 2: read the second and third row groups
Expand All @@ -2279,6 +2290,9 @@ message schema {
project_field_ids: vec![1],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
};

let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
Expand Down Expand Up @@ -2403,6 +2417,9 @@ message schema {
project_field_ids: vec![1, 2], // Request both columns 'a' and 'b'
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -2571,6 +2588,9 @@ message schema {
partition_spec_id: 0,
equality_ids: None,
}],
partition: None,
partition_spec: None,
name_mapping: None,
};

let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
Expand Down Expand Up @@ -2786,6 +2806,9 @@ message schema {
partition_spec_id: 0,
equality_ids: None,
}],
partition: None,
partition_spec: None,
name_mapping: None,
};

let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
Expand Down Expand Up @@ -2994,6 +3017,9 @@ message schema {
partition_spec_id: 0,
equality_ids: None,
}],
partition: None,
partition_spec: None,
name_mapping: None,
};

let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
Expand Down Expand Up @@ -3094,6 +3120,9 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3188,6 +3217,9 @@ message schema {
project_field_ids: vec![1, 3],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3271,6 +3303,9 @@ message schema {
project_field_ids: vec![1, 2, 3],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3368,6 +3403,9 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3494,6 +3532,9 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3587,6 +3628,9 @@ message schema {
project_field_ids: vec![1, 5, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3693,6 +3737,9 @@ message schema {
project_field_ids: vec![1, 2, 3],
predicate: Some(predicate.bind(schema, true).unwrap()),
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand All @@ -3708,4 +3755,162 @@ message schema {
// Should return empty results
assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
}

/// Test bucket partitioning reads source column from data file (not partition metadata).
///
/// This is an integration test verifying the complete ArrowReader pipeline with bucket partitioning.
/// It corresponds to TestRuntimeFiltering tests in Iceberg Java (e.g., testRenamedSourceColumnTable).
///
/// # Iceberg Spec Requirements
///
/// Per the Iceberg spec "Column Projection" section:
/// > "Return the value from partition metadata if an **Identity Transform** exists for the field"
///
/// This means:
/// - Identity transforms (e.g., `identity(dept)`) use constants from partition metadata
/// - Non-identity transforms (e.g., `bucket(4, id)`) must read source columns from data files
/// - Partition metadata for bucket transforms stores bucket numbers (0-3), NOT source values
///
/// Java's PartitionUtil.constantsMap() implements this via:
/// ```java
/// if (field.transform().isIdentity()) {
/// idToConstant.put(field.sourceId(), converted);
/// }
/// ```
///
/// # What This Test Verifies
///
/// This test ensures the full ArrowReader → RecordBatchTransformer pipeline correctly handles
/// bucket partitioning when FileScanTask provides partition_spec and partition_data:
///
/// - Parquet file has field_id=1 named "id" with actual data [1, 5, 9, 13]
/// - FileScanTask specifies partition_spec with bucket(4, id) and partition_data with bucket=1
/// - RecordBatchTransformer.constants_map() excludes bucket-partitioned field from constants
/// - ArrowReader correctly reads [1, 5, 9, 13] from the data file
/// - Values are NOT replaced with constant 1 from partition metadata
///
/// # Why This Matters
///
/// Without correct handling:
/// - Runtime filtering would break (e.g., `WHERE id = 5` would fail)
/// - Query results would be incorrect (all rows would have id=1)
/// - Bucket partitioning would be unusable for query optimization
///
/// # References
/// - Iceberg spec: format/spec.md "Column Projection" + "Partition Transforms"
/// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
/// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
#[tokio::test]
async fn test_bucket_partitioning_reads_source_column_from_file() {
use arrow_array::Int32Array;

use crate::spec::{Literal, PartitionSpec, Struct, Transform};

// Iceberg schema with id and name columns
let schema = Arc::new(
Schema::builder()
.with_schema_id(0)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap(),
);

// Partition spec: bucket(4, id)
let partition_spec = Arc::new(
PartitionSpec::builder(schema.clone())
.with_spec_id(0)
.add_partition_field("id", "id_bucket", Transform::Bucket(4))
.unwrap()
.build()
.unwrap(),
);

// Partition data: bucket value is 1
let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);

// Create Arrow schema with field IDs for Parquet file
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)])),
]));

// Write Parquet file with data
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();

let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef;
let name_data =
Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef;

let to_write =
RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(format!("{}/data.parquet", &table_location)).unwrap();
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();

// Read the Parquet file with partition spec and data
let reader = ArrowReaderBuilder::new(file_io).build();
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/data.parquet", table_location),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: Some(partition_data),
partition_spec: Some(partition_spec),
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;

let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

// Verify we got the correct data
assert_eq!(result.len(), 1);
let batch = &result[0];

assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 4);

// The id column MUST contain actual values from the Parquet file [1, 5, 9, 13],
// NOT the constant partition value 1
let id_col = batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(id_col.value(0), 1);
assert_eq!(id_col.value(1), 5);
assert_eq!(id_col.value(2), 9);
assert_eq!(id_col.value(3), 13);

let name_col = batch.column(1).as_string::<i32>();
assert_eq!(name_col.value(0), "Alice");
assert_eq!(name_col.value(1), "Bob");
assert_eq!(name_col.value(2), "Charlie");
assert_eq!(name_col.value(3), "Dave");
}
}
Loading
Loading