Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ pub(crate) mod tests {
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_1, pos_del_2.clone()],
partition: None,
partition_spec_id: None,
partition_spec: None,
},
FileScanTask {
start: 0,
Expand All @@ -350,6 +353,9 @@ pub(crate) mod tests {
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_3],
partition: None,
partition_spec_id: None,
partition_spec: None,
},
];

Expand Down
232 changes: 224 additions & 8 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,18 @@ impl ArrowReader {
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());
if task.partition_spec.is_some() && task.partition.is_some() {
// Use partition spec and data for proper constant identification
RecordBatchTransformer::build_with_partition_data(
task.schema_ref(),
task.project_field_ids(),
task.partition_spec.clone(),
task.partition.clone(),
)
} else {
// Fallback to build without partition data
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids())
};

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
Expand Down Expand Up @@ -1942,12 +1953,15 @@ message schema {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/1.parquet"),
data_file_path: format!("{}/1.parquet", table_location),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1],
predicate: Some(predicate.bind(schema, true).unwrap()),
deletes: vec![],
partition: None,
partition_spec_id: None,
partition_spec: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -2266,6 +2280,9 @@ message schema {
project_field_ids: vec![1],
predicate: None,
deletes: vec![],
partition: None,
partition_spec_id: None,
partition_spec: None,
};

// Task 2: read the second and third row groups
Expand All @@ -2279,6 +2296,9 @@ message schema {
project_field_ids: vec![1],
predicate: None,
deletes: vec![],
partition: None,
partition_spec_id: None,
partition_spec: None,
};

let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
Expand Down Expand Up @@ -2307,16 +2327,21 @@ message schema {
.unwrap();

let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
println!("Task 2 (bytes {rg1_start}-{file_end}) returned {total_rows_task2} rows");
println!(
"Task 2 (bytes {}-{}) returned {} rows",
rg1_start, file_end, total_rows_task2
);

assert_eq!(
total_rows_task1, 100,
"Task 1 should read only the first row group (100 rows), but got {total_rows_task1} rows"
"Task 1 should read only the first row group (100 rows), but got {} rows",
total_rows_task1
);

assert_eq!(
total_rows_task2, 200,
"Task 2 should read only the second+third row groups (200 rows), but got {total_rows_task2} rows"
"Task 2 should read only the second+third row groups (200 rows), but got {} rows",
total_rows_task2
);

// Verify the actual data values are correct (not just the row count)
Expand All @@ -2327,7 +2352,7 @@ message schema {
.as_primitive::<arrow_array::types::Int32Type>();
let first_val = id_col.value(0);
let last_val = id_col.value(id_col.len() - 1);
println!("Task 1 data range: {first_val} to {last_val}");
println!("Task 1 data range: {} to {}", first_val, last_val);

assert_eq!(first_val, 0, "Task 1 should start with id=0");
assert_eq!(last_val, 99, "Task 1 should end with id=99");
Expand All @@ -2339,7 +2364,7 @@ message schema {
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
let first_val = id_col.value(0);
println!("Task 2 first value: {first_val}");
println!("Task 2 first value: {}", first_val);

assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
}
Expand Down Expand Up @@ -2397,12 +2422,15 @@ message schema {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/old_file.parquet"),
data_file_path: format!("{}/old_file.parquet", table_location),
data_file_format: DataFileFormat::Parquet,
schema: new_schema.clone(),
project_field_ids: vec![1, 2], // Request both columns 'a' and 'b'
predicate: None,
deletes: vec![],
partition: None,
partition_spec_id: None,
partition_spec: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -2571,6 +2599,9 @@ message schema {
partition_spec_id: 0,
equality_ids: None,
}],
partition: None,
partition_spec_id: None,
partition_spec: None,
};

let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
Expand Down Expand Up @@ -2786,6 +2817,9 @@ message schema {
partition_spec_id: 0,
equality_ids: None,
}],
partition: None,
partition_spec_id: None,
partition_spec: None,
};

let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
Expand Down Expand Up @@ -2994,6 +3028,9 @@ message schema {
partition_spec_id: 0,
equality_ids: None,
}],
partition: None,
partition_spec_id: None,
partition_spec: None,
};

let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
Expand Down Expand Up @@ -3094,6 +3131,9 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec_id: None,
partition_spec: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3188,6 +3228,9 @@ message schema {
project_field_ids: vec![1, 3],
predicate: None,
deletes: vec![],
partition: None,
partition_spec_id: None,
partition_spec: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3271,6 +3314,9 @@ message schema {
project_field_ids: vec![1, 2, 3],
predicate: None,
deletes: vec![],
partition: None,
partition_spec_id: None,
partition_spec: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3368,6 +3414,9 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec_id: None,
partition_spec: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3494,6 +3543,9 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec_id: None,
partition_spec: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3587,6 +3639,9 @@ message schema {
project_field_ids: vec![1, 5, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec_id: None,
partition_spec: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3693,6 +3748,9 @@ message schema {
project_field_ids: vec![1, 2, 3],
predicate: Some(predicate.bind(schema, true).unwrap()),
deletes: vec![],
partition: None,
partition_spec_id: None,
partition_spec: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand All @@ -3708,4 +3766,162 @@ message schema {
// Should return empty results
assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
}

/// Test bucket partitioning reads source column from data file (not partition metadata).
///
/// This is an integration test verifying the complete ArrowReader pipeline with bucket partitioning.
/// It corresponds to TestRuntimeFiltering tests in Iceberg Java (e.g., testRenamedSourceColumnTable).
///
/// # Iceberg Spec Requirements
///
/// Per the Iceberg spec "Column Projection" section:
/// > "Return the value from partition metadata if an **Identity Transform** exists for the field"
///
/// This means:
/// - Identity transforms (e.g., `identity(dept)`) use constants from partition metadata
/// - Non-identity transforms (e.g., `bucket(4, id)`) must read source columns from data files
/// - Partition metadata for bucket transforms stores bucket numbers (0-3), NOT source values
///
/// Java's PartitionUtil.constantsMap() implements this via:
/// ```java
/// if (field.transform().isIdentity()) {
/// idToConstant.put(field.sourceId(), converted);
/// }
/// ```
///
/// # What This Test Verifies
///
/// This test ensures the full ArrowReader → RecordBatchTransformer pipeline correctly handles
/// bucket partitioning when FileScanTask provides partition_spec and partition_data:
///
/// - Parquet file has field_id=1 named "id" with actual data [1, 5, 9, 13]
/// - FileScanTask specifies partition_spec with bucket(4, id) and partition_data with bucket=1
/// - RecordBatchTransformer.constants_map() excludes bucket-partitioned field from constants
/// - ArrowReader correctly reads [1, 5, 9, 13] from the data file
/// - Values are NOT replaced with constant 1 from partition metadata
///
/// # Why This Matters
///
/// Without correct handling:
/// - Runtime filtering would break (e.g., `WHERE id = 5` would fail)
/// - Query results would be incorrect (all rows would have id=1)
/// - Bucket partitioning would be unusable for query optimization
///
/// # References
/// - Iceberg spec: format/spec.md "Column Projection" + "Partition Transforms"
/// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
/// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
#[tokio::test]
async fn test_bucket_partitioning_reads_source_column_from_file() {
use arrow_array::Int32Array;

use crate::spec::{Literal, PartitionSpec, Struct, Transform};

// Iceberg schema with id and name columns
let schema = Arc::new(
Schema::builder()
.with_schema_id(0)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap(),
);

// Partition spec: bucket(4, id)
let partition_spec = Arc::new(
PartitionSpec::builder(schema.clone())
.with_spec_id(0)
.add_partition_field("id", "id_bucket", Transform::Bucket(4))
.unwrap()
.build()
.unwrap(),
);

// Partition data: bucket value is 1
let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);

// Create Arrow schema with field IDs for Parquet file
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)])),
]));

// Write Parquet file with data
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();

let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef;
let name_data =
Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef;

let to_write =
RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(format!("{}/data.parquet", &table_location)).unwrap();
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();

// Read the Parquet file with partition spec and data
let reader = ArrowReaderBuilder::new(file_io).build();
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/data.parquet", table_location),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: Some(partition_data),
partition_spec_id: Some(0),
partition_spec: Some(partition_spec),
})]
.into_iter(),
)) as FileScanTaskStream;

let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

// Verify we got the correct data
assert_eq!(result.len(), 1);
let batch = &result[0];

assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 4);

// The id column MUST contain actual values from the Parquet file [1, 5, 9, 13],
// NOT the constant partition value 1
let id_col = batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(id_col.value(0), 1);
assert_eq!(id_col.value(1), 5);
assert_eq!(id_col.value(2), 9);
assert_eq!(id_col.value(3), 13);

let name_col = batch.column(1).as_string::<i32>();
assert_eq!(name_col.value(0), "Alice");
assert_eq!(name_col.value(1), "Bob");
assert_eq!(name_col.value(2), "Charlie");
assert_eq!(name_col.value(3), "Dave");
}
}
Loading
Loading