Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/iceberg/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ pub mod delete_file_loader;
pub(crate) mod delete_filter;

mod reader;
pub(crate) mod record_batch_projector;
/// RecordBatch projection utilities
pub mod record_batch_projector;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to make this pub?

pub(crate) mod record_batch_transformer;
mod value;

Expand Down
71 changes: 68 additions & 3 deletions crates/iceberg/src/arrow/record_batch_projector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ use std::sync::Arc;
use arrow_array::{ArrayRef, RecordBatch, StructArray, make_array};
use arrow_buffer::NullBuffer;
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;

use crate::arrow::schema::schema_to_arrow_schema;
use crate::error::Result;
use crate::spec::Schema as IcebergSchema;
use crate::{Error, ErrorKind};

/// Help to project specific field from `RecordBatch`` according to the fields id.
#[derive(Clone, Debug)]
pub(crate) struct RecordBatchProjector {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RecordBatchProjector {
// A vector of vectors, where each inner vector represents the index path to access a specific field in a nested structure.
// E.g. [[0], [1, 2]] means the first field is accessed directly from the first column,
// while the second field is accessed from the second column and then from its third subcolumn (second column must be a struct column).
Expand Down Expand Up @@ -77,6 +80,46 @@ impl RecordBatchProjector {
})
}

/// Create RecordBatchProjector using Iceberg schema.
///
/// This constructor converts the Iceberg schema to Arrow schema with field ID metadata,
/// then uses the standard field ID lookup for projection.
///
/// # Arguments
/// * `iceberg_schema` - The Iceberg schema for field ID mapping
/// * `target_field_ids` - The field IDs to project
pub fn from_iceberg_schema(
iceberg_schema: Arc<IcebergSchema>,
target_field_ids: &[i32],
) -> Result<Self> {
let arrow_schema_with_ids = Arc::new(schema_to_arrow_schema(&iceberg_schema)?);

let field_id_fetch_func = |field: &Field| -> Result<Option<i64>> {
if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
let field_id = value.parse::<i32>().map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
"Failed to parse field id".to_string(),
)
.with_context("value", value)
.with_source(e)
})?;
Ok(Some(field_id as i64))
} else {
Ok(None)
}
};

let searchable_field_func = |_field: &Field| -> bool { true };

Self::new(
arrow_schema_with_ids,
target_field_ids,
field_id_fetch_func,
searchable_field_func,
)
}

fn fetch_field_index<F1, F2>(
fields: &Fields,
index_vec: &mut Vec<usize>,
Expand Down Expand Up @@ -129,7 +172,7 @@ impl RecordBatchProjector {
}

/// Do projection with columns
pub(crate) fn project_column(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
pub fn project_column(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
self.field_indices
.iter()
.map(|index_vec| Self::get_column_by_field_index(batch, index_vec))
Expand Down Expand Up @@ -166,6 +209,7 @@ mod test {
use arrow_schema::{DataType, Field, Fields, Schema};

use crate::arrow::record_batch_projector::RecordBatchProjector;
use crate::spec::{NestedField, PrimitiveType, Schema as IcebergSchema, Type};
use crate::{Error, ErrorKind};

#[test]
Expand Down Expand Up @@ -293,4 +337,25 @@ mod test {
RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| true);
assert!(projector.is_ok());
}

#[test]
fn test_from_iceberg_schema() {
let iceberg_schema = IcebergSchema::builder()
.with_schema_id(0)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
NestedField::optional(3, "age", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap();

let projector =
RecordBatchProjector::from_iceberg_schema(Arc::new(iceberg_schema), &[1, 3]).unwrap();

assert_eq!(projector.field_indices.len(), 2);
assert_eq!(projector.projected_schema_ref().fields().len(), 2);
assert_eq!(projector.projected_schema_ref().field(0).name(), "id");
assert_eq!(projector.projected_schema_ref().field(1).name(), "age");
}
}
2 changes: 1 addition & 1 deletion crates/iceberg/src/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod truncate;
mod void;

/// TransformFunction is a trait that defines the interface for all transform functions.
pub trait TransformFunction: Send + Sync {
pub trait TransformFunction: Send + Sync + std::fmt::Debug {
/// transform will take an input array and transform it into a new array.
/// The implementation of this function will need to check and downcast the input to specific
/// type.
Expand Down
2 changes: 2 additions & 0 deletions crates/integrations/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
pub(crate) mod commit;
pub(crate) mod expr_to_predicate;
pub(crate) mod metadata_scan;
pub(crate) mod project;
pub(crate) mod scan;
pub(crate) mod write;

pub(crate) const DATA_FILES_COL_NAME: &str = "data_files";

pub use project::project_with_partition;
pub use scan::IcebergTableScan;
Loading
Loading