Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/iceberg/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ pub mod delete_file_loader;
pub(crate) mod delete_filter;

mod reader;
pub(crate) mod record_batch_projector;
/// RecordBatch projection utilities
pub mod record_batch_projector;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to make this pub?

pub(crate) mod record_batch_transformer;
mod value;

Expand Down
82 changes: 79 additions & 3 deletions crates/iceberg/src/arrow/record_batch_projector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ use std::sync::Arc;
use arrow_array::{ArrayRef, RecordBatch, StructArray, make_array};
use arrow_buffer::NullBuffer;
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;

use crate::error::Result;
use crate::spec::Schema as IcebergSchema;
use crate::{Error, ErrorKind};

/// Help to project specific field from `RecordBatch`` according to the fields id.
#[derive(Clone, Debug)]
pub(crate) struct RecordBatchProjector {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RecordBatchProjector {
// A vector of vectors, where each inner vector represents the index path to access a specific field in a nested structure.
// E.g. [[0], [1, 2]] means the first field is accessed directly from the first column,
// while the second field is accessed from the second column and then from its third subcolumn (second column must be a struct column).
Expand Down Expand Up @@ -77,6 +79,80 @@ impl RecordBatchProjector {
})
}

/// Create RecordBatchProjector using Iceberg schema for field mapping.
///
/// This constructor is more flexible and works with any Arrow schema by using
/// the Iceberg schema to map field names to field IDs.
///
/// # Arguments
/// * `original_schema` - The original Arrow schema (doesn't need field ID metadata)
/// * `iceberg_schema` - The Iceberg schema for field ID mapping
/// * `target_field_ids` - The field IDs to project
pub fn from_iceberg_schema_mapping(
original_schema: SchemaRef,
iceberg_schema: Arc<IcebergSchema>,
target_field_ids: &[i32],
) -> Result<Self> {
let field_id_fetch_func = |field: &Field| -> Result<Option<i64>> {
// First try to get field ID from metadata (Parquet case)
if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
let field_id = value.parse::<i32>().map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
"Failed to parse field id".to_string(),
)
.with_context("value", value)
.with_source(e)
})?;
return Ok(Some(field_id as i64));
}

// Fallback: use Iceberg schema's built-in field lookup
if let Some(iceberg_field) = iceberg_schema.field_by_name(field.name()) {
return Ok(Some(iceberg_field.id as i64));
}

// Additional fallback: for nested fields, we need to search recursively
fn find_field_id_in_struct(
struct_type: &crate::spec::StructType,
field_name: &str,
) -> Option<i32> {
for field in struct_type.fields() {
if field.name == field_name {
return Some(field.id);
}
if let crate::spec::Type::Struct(nested_struct) = &*field.field_type {
if let Some(nested_id) = find_field_id_in_struct(nested_struct, field_name)
{
return Some(nested_id);
}
}
}
None
}

// Search in nested structs
for iceberg_field in iceberg_schema.as_struct().fields() {
if let crate::spec::Type::Struct(struct_type) = &*iceberg_field.field_type {
if let Some(nested_id) = find_field_id_in_struct(struct_type, field.name()) {
return Ok(Some(nested_id as i64));
}
}
}

Ok(None)
};

let searchable_field_func = |_field: &Field| -> bool { true };

Self::new(
original_schema,
target_field_ids,
field_id_fetch_func,
searchable_field_func,
)
}

fn fetch_field_index<F1, F2>(
fields: &Fields,
index_vec: &mut Vec<usize>,
Expand Down Expand Up @@ -129,7 +205,7 @@ impl RecordBatchProjector {
}

/// Do projection with columns
pub(crate) fn project_column(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
pub fn project_column(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
self.field_indices
.iter()
.map(|index_vec| Self::get_column_by_field_index(batch, index_vec))
Expand Down
2 changes: 2 additions & 0 deletions crates/integrations/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
pub(crate) mod commit;
pub(crate) mod expr_to_predicate;
pub(crate) mod metadata_scan;
pub(crate) mod project;
pub(crate) mod scan;
pub(crate) mod write;

pub(crate) const DATA_FILES_COL_NAME: &str = "data_files";

pub use project::project_with_partition;
pub use scan::IcebergTableScan;
Loading
Loading