Skip to content

Commit bdc6aae

Browse files
committed
feat(datafusion): reuse RecordBatchProjector for project.
Signed-off-by: Florian Valeye <[email protected]>
1 parent 531388f commit bdc6aae

File tree

3 files changed

+186
-173
lines changed

3 files changed

+186
-173
lines changed

crates/iceberg/src/arrow/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ pub mod delete_file_loader;
2828
pub(crate) mod delete_filter;
2929

3030
mod reader;
31-
pub(crate) mod record_batch_projector;
31+
/// RecordBatch projection utilities
32+
pub mod record_batch_projector;
3233
pub(crate) mod record_batch_transformer;
3334
mod value;
3435

crates/iceberg/src/arrow/record_batch_projector.rs

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ use std::sync::Arc;
2020
use arrow_array::{ArrayRef, RecordBatch, StructArray, make_array};
2121
use arrow_buffer::NullBuffer;
2222
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
23+
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
2324

2425
use crate::error::Result;
26+
use crate::spec::Schema as IcebergSchema;
2527
use crate::{Error, ErrorKind};
2628

2729
/// Help to project specific field from `RecordBatch`` according to the fields id.
28-
#[derive(Clone, Debug)]
29-
pub(crate) struct RecordBatchProjector {
30+
#[derive(Clone, Debug, PartialEq, Eq)]
31+
pub struct RecordBatchProjector {
3032
// A vector of vectors, where each inner vector represents the index path to access a specific field in a nested structure.
3133
// E.g. [[0], [1, 2]] means the first field is accessed directly from the first column,
3234
// while the second field is accessed from the second column and then from its third subcolumn (second column must be a struct column).
@@ -77,6 +79,80 @@ impl RecordBatchProjector {
7779
})
7880
}
7981

82+
/// Create RecordBatchProjector using Iceberg schema for field mapping.
83+
///
84+
/// This constructor is more flexible and works with any Arrow schema by using
85+
/// the Iceberg schema to map field names to field IDs.
86+
///
87+
/// # Arguments
88+
/// * `original_schema` - The original Arrow schema (doesn't need field ID metadata)
89+
/// * `iceberg_schema` - The Iceberg schema for field ID mapping
90+
/// * `target_field_ids` - The field IDs to project
91+
pub fn from_iceberg_schema_mapping(
92+
original_schema: SchemaRef,
93+
iceberg_schema: Arc<IcebergSchema>,
94+
target_field_ids: &[i32],
95+
) -> Result<Self> {
96+
let field_id_fetch_func = |field: &Field| -> Result<Option<i64>> {
97+
// First try to get field ID from metadata (Parquet case)
98+
if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
99+
let field_id = value.parse::<i32>().map_err(|e| {
100+
Error::new(
101+
ErrorKind::DataInvalid,
102+
"Failed to parse field id".to_string(),
103+
)
104+
.with_context("value", value)
105+
.with_source(e)
106+
})?;
107+
return Ok(Some(field_id as i64));
108+
}
109+
110+
// Fallback: use Iceberg schema's built-in field lookup
111+
if let Some(iceberg_field) = iceberg_schema.field_by_name(field.name()) {
112+
return Ok(Some(iceberg_field.id as i64));
113+
}
114+
115+
// Additional fallback: for nested fields, we need to search recursively
116+
fn find_field_id_in_struct(
117+
struct_type: &crate::spec::StructType,
118+
field_name: &str,
119+
) -> Option<i32> {
120+
for field in struct_type.fields() {
121+
if field.name == field_name {
122+
return Some(field.id);
123+
}
124+
if let crate::spec::Type::Struct(nested_struct) = &*field.field_type {
125+
if let Some(nested_id) = find_field_id_in_struct(nested_struct, field_name)
126+
{
127+
return Some(nested_id);
128+
}
129+
}
130+
}
131+
None
132+
}
133+
134+
// Search in nested structs
135+
for iceberg_field in iceberg_schema.as_struct().fields() {
136+
if let crate::spec::Type::Struct(struct_type) = &*iceberg_field.field_type {
137+
if let Some(nested_id) = find_field_id_in_struct(struct_type, field.name()) {
138+
return Ok(Some(nested_id as i64));
139+
}
140+
}
141+
}
142+
143+
Ok(None)
144+
};
145+
146+
let searchable_field_func = |_field: &Field| -> bool { true };
147+
148+
Self::new(
149+
original_schema,
150+
target_field_ids,
151+
field_id_fetch_func,
152+
searchable_field_func,
153+
)
154+
}
155+
80156
fn fetch_field_index<F1, F2>(
81157
fields: &Fields,
82158
index_vec: &mut Vec<usize>,
@@ -129,7 +205,7 @@ impl RecordBatchProjector {
129205
}
130206

131207
/// Do projection with columns
132-
pub(crate) fn project_column(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
208+
pub fn project_column(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
133209
self.field_indices
134210
.iter()
135211
.map(|index_vec| Self::get_column_by_field_index(batch, index_vec))

0 commit comments

Comments
 (0)