Skip to content

Commit b3b5afe

Browse files
authored
feat(datafusion): implement the project node to add the partition columns (apache#1602)
## Which issue does this PR close? - Closes apache#1542 ## What changes are included in this PR? Implement a physical execution plan node that projects Iceberg partition columns from source data, supporting nested fields and all Iceberg transforms. ## Are these changes tested? Yes, with unit tests --------- Signed-off-by: Florian Valeye <[email protected]>
1 parent 4d20ff0 commit b3b5afe

File tree

5 files changed

+563
-5
lines changed

5 files changed

+563
-5
lines changed

crates/iceberg/src/arrow/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ pub mod delete_file_loader;
2828
pub(crate) mod delete_filter;
2929

3030
mod reader;
31-
pub(crate) mod record_batch_projector;
31+
/// RecordBatch projection utilities
32+
pub mod record_batch_projector;
3233
pub(crate) mod record_batch_transformer;
3334
mod value;
3435

crates/iceberg/src/arrow/record_batch_projector.rs

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@ use std::sync::Arc;
2020
use arrow_array::{ArrayRef, RecordBatch, StructArray, make_array};
2121
use arrow_buffer::NullBuffer;
2222
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
23+
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
2324

25+
use crate::arrow::schema::schema_to_arrow_schema;
2426
use crate::error::Result;
27+
use crate::spec::Schema as IcebergSchema;
2528
use crate::{Error, ErrorKind};
2629

2730
/// Help to project specific field from `RecordBatch`` according to the fields id.
28-
#[derive(Clone, Debug)]
29-
pub(crate) struct RecordBatchProjector {
31+
#[derive(Clone, Debug, PartialEq, Eq)]
32+
pub struct RecordBatchProjector {
3033
// A vector of vectors, where each inner vector represents the index path to access a specific field in a nested structure.
3134
// E.g. [[0], [1, 2]] means the first field is accessed directly from the first column,
3235
// while the second field is accessed from the second column and then from its third subcolumn (second column must be a struct column).
@@ -77,6 +80,46 @@ impl RecordBatchProjector {
7780
})
7881
}
7982

83+
/// Create RecordBatchProjector using Iceberg schema.
84+
///
85+
/// This constructor converts the Iceberg schema to Arrow schema with field ID metadata,
86+
/// then uses the standard field ID lookup for projection.
87+
///
88+
/// # Arguments
89+
/// * `iceberg_schema` - The Iceberg schema for field ID mapping
90+
/// * `target_field_ids` - The field IDs to project
91+
pub fn from_iceberg_schema(
92+
iceberg_schema: Arc<IcebergSchema>,
93+
target_field_ids: &[i32],
94+
) -> Result<Self> {
95+
let arrow_schema_with_ids = Arc::new(schema_to_arrow_schema(&iceberg_schema)?);
96+
97+
let field_id_fetch_func = |field: &Field| -> Result<Option<i64>> {
98+
if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
99+
let field_id = value.parse::<i32>().map_err(|e| {
100+
Error::new(
101+
ErrorKind::DataInvalid,
102+
"Failed to parse field id".to_string(),
103+
)
104+
.with_context("value", value)
105+
.with_source(e)
106+
})?;
107+
Ok(Some(field_id as i64))
108+
} else {
109+
Ok(None)
110+
}
111+
};
112+
113+
let searchable_field_func = |_field: &Field| -> bool { true };
114+
115+
Self::new(
116+
arrow_schema_with_ids,
117+
target_field_ids,
118+
field_id_fetch_func,
119+
searchable_field_func,
120+
)
121+
}
122+
80123
fn fetch_field_index<F1, F2>(
81124
fields: &Fields,
82125
index_vec: &mut Vec<usize>,
@@ -129,7 +172,7 @@ impl RecordBatchProjector {
129172
}
130173

131174
/// Do projection with columns
132-
pub(crate) fn project_column(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
175+
pub fn project_column(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
133176
self.field_indices
134177
.iter()
135178
.map(|index_vec| Self::get_column_by_field_index(batch, index_vec))
@@ -166,6 +209,7 @@ mod test {
166209
use arrow_schema::{DataType, Field, Fields, Schema};
167210

168211
use crate::arrow::record_batch_projector::RecordBatchProjector;
212+
use crate::spec::{NestedField, PrimitiveType, Schema as IcebergSchema, Type};
169213
use crate::{Error, ErrorKind};
170214

171215
#[test]
@@ -293,4 +337,25 @@ mod test {
293337
RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| true);
294338
assert!(projector.is_ok());
295339
}
340+
341+
#[test]
342+
fn test_from_iceberg_schema() {
343+
let iceberg_schema = IcebergSchema::builder()
344+
.with_schema_id(0)
345+
.with_fields(vec![
346+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
347+
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
348+
NestedField::optional(3, "age", Type::Primitive(PrimitiveType::Int)).into(),
349+
])
350+
.build()
351+
.unwrap();
352+
353+
let projector =
354+
RecordBatchProjector::from_iceberg_schema(Arc::new(iceberg_schema), &[1, 3]).unwrap();
355+
356+
assert_eq!(projector.field_indices.len(), 2);
357+
assert_eq!(projector.projected_schema_ref().fields().len(), 2);
358+
assert_eq!(projector.projected_schema_ref().field(0).name(), "id");
359+
assert_eq!(projector.projected_schema_ref().field(1).name(), "age");
360+
}
296361
}

crates/iceberg/src/transform/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
//! Transform function used to compute partition values.
1919
20+
use std::fmt::Debug;
21+
2022
use arrow_array::ArrayRef;
2123

2224
use crate::spec::{Datum, Transform};
@@ -29,7 +31,7 @@ mod truncate;
2931
mod void;
3032

3133
/// TransformFunction is a trait that defines the interface for all transform functions.
32-
pub trait TransformFunction: Send + Sync {
34+
pub trait TransformFunction: Send + Sync + Debug {
3335
/// transform will take an input array and transform it into a new array.
3436
/// The implementation of this function will need to check and downcast the input to specific
3537
/// type.

crates/integrations/datafusion/src/physical_plan/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
pub(crate) mod commit;
1919
pub(crate) mod expr_to_predicate;
2020
pub(crate) mod metadata_scan;
21+
pub(crate) mod project;
2122
pub(crate) mod scan;
2223
pub(crate) mod write;
2324

2425
pub(crate) const DATA_FILES_COL_NAME: &str = "data_files";
2526

27+
pub use project::project_with_partition;
2628
pub use scan::IcebergTableScan;

0 commit comments

Comments
 (0)