Skip to content

feat(arrow): Allow ArrowArrayAccessor to use field.name to match fields #1561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
11 changes: 9 additions & 2 deletions crates/iceberg/src/arrow/nan_val_cnt_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow_array::{ArrayRef, Float32Array, Float64Array, RecordBatch, StructArray
use arrow_schema::DataType;

use crate::Result;
use crate::arrow::ArrowArrayAccessor;
use crate::arrow::{ArrowArrayAccessor, FieldMatchMode};
use crate::spec::{
ListType, MapType, NestedFieldRef, PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor,
StructType, visit_struct_with_partner,
Expand Down Expand Up @@ -71,6 +71,7 @@ macro_rules! count_float_nans {
pub struct NanValueCountVisitor {
/// Stores field ID to NaN value count mapping
pub nan_value_counts: HashMap<i32, u64>,
match_mode: FieldMatchMode,
}

impl SchemaWithPartnerVisitor<ArrayRef> for NanValueCountVisitor {
Expand Down Expand Up @@ -149,14 +150,20 @@ impl SchemaWithPartnerVisitor<ArrayRef> for NanValueCountVisitor {
impl NanValueCountVisitor {
/// Creates new instance of NanValueCountVisitor
pub fn new() -> Self {
Self::new_with_match_mode(FieldMatchMode::Id)
}

/// Creates new instance of NanValueCountVisitor with explicit match mode
pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self {
Self {
nan_value_counts: HashMap::new(),
match_mode,
}
}

/// Compute nan value counts in given schema and record batch
pub fn compute(&mut self, schema: SchemaRef, batch: RecordBatch) -> Result<()> {
let arrow_arr_partner_accessor = ArrowArrayAccessor {};
let arrow_arr_partner_accessor = ArrowArrayAccessor::new_with_match_mode(self.match_mode);

let struct_arr = Arc::new(StructArray::from(batch)) as ArrayRef;
visit_struct_with_partner(
Expand Down
248 changes: 238 additions & 10 deletions crates/iceberg/src/arrow/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use arrow_array::{
LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StructArray,
Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
};
use arrow_schema::DataType;
use arrow_schema::{DataType, FieldRef};
use uuid::Uuid;

use super::get_field_id;
Expand Down Expand Up @@ -425,11 +425,63 @@ impl SchemaWithPartnerVisitor<ArrayRef> for ArrowArrayToIcebergStructConverter {
}
}

/// Defines how Arrow fields are matched with Iceberg fields when converting data.
///
/// This enum provides two strategies for matching fields:
/// - `Id`: Match fields by their ID, which is stored in Arrow field metadata.
/// - `Name`: Match fields by their name, ignoring the field ID.
///
/// The ID matching mode is the default and preferred approach as it's more robust
/// against schema evolution where field names might change but IDs remain stable.
/// The name matching mode can be useful in scenarios where field IDs are not available
/// or when working with systems that don't preserve field IDs.
#[derive(Clone, Copy, Debug)]
pub enum FieldMatchMode {
/// Match fields by their ID stored in Arrow field metadata
Id,
/// Match fields by their name, ignoring field IDs
Name,
}

impl FieldMatchMode {
/// Determines if an Arrow field matches an Iceberg field based on the matching mode.
pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool {
match self {
FieldMatchMode::Id => get_field_id(arrow_field)
.map(|id| id == iceberg_field.id)
.unwrap_or(false),
FieldMatchMode::Name => arrow_field.name() == &iceberg_field.name,
}
}
}

/// Partner type representing accessing and walking arrow arrays alongside iceberg schema
pub struct ArrowArrayAccessor;
pub struct ArrowArrayAccessor {
match_mode: FieldMatchMode,
}

impl ArrowArrayAccessor {
/// Creates a new instance of ArrowArrayAccessor with the default ID matching mode
pub fn new() -> Self {
Self {
match_mode: FieldMatchMode::Id,
}
}

/// Creates a new instance of ArrowArrayAccessor with the specified matching mode
pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self {
Self { match_mode }
}
}

impl Default for ArrowArrayAccessor {
fn default() -> Self {
Self::new()
}
}

impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
fn struct_parner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
fn struct_partner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
if !matches!(schema_partner.data_type(), DataType::Struct(_)) {
return Err(Error::new(
ErrorKind::DataInvalid,
Expand All @@ -451,18 +503,17 @@ impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"The struct partner is not a struct array",
format!(
"The struct partner is not a struct array, partner: {:?}",
struct_partner
),
)
})?;

let field_pos = struct_array
.fields()
.iter()
.position(|arrow_field| {
get_field_id(arrow_field)
.map(|id| id == field.id)
.unwrap_or(false)
})
.position(|arrow_field| self.match_mode.match_field(arrow_field, field))
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
Expand Down Expand Up @@ -549,7 +600,7 @@ pub fn arrow_struct_to_literal(
ty,
struct_array,
&mut ArrowArrayToIcebergStructConverter,
&ArrowArrayAccessor,
&ArrowArrayAccessor::new(),
)
}

Expand Down Expand Up @@ -899,6 +950,183 @@ mod test {
assert_eq!(result, vec![None; 0]);
}

#[test]
fn test_find_field_by_id() {
// Create Arrow arrays for the nested structure
let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);

// Create the nested struct array with field IDs in metadata
let nested_struct_array =
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("field_a", DataType::Int32, true).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
)),
Arc::new(field_a_array) as ArrayRef,
),
(
Arc::new(Field::new("field_b", DataType::Utf8, true).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
)),
Arc::new(field_b_array) as ArrayRef,
),
])) as ArrayRef;

let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);

// Create the top-level struct array with field IDs in metadata
let struct_array = Arc::new(StructArray::from(vec![
(
Arc::new(
Field::new(
"nested_struct",
DataType::Struct(Fields::from(vec![
Field::new("field_a", DataType::Int32, true).with_metadata(
HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)]),
),
Field::new("field_b", DataType::Utf8, true).with_metadata(
HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)]),
),
])),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"3".to_string(),
)])),
),
nested_struct_array,
),
(
Arc::new(Field::new("field_c", DataType::Int32, true).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
)),
Arc::new(field_c_array) as ArrayRef,
),
])) as ArrayRef;

// Create an ArrowArrayAccessor with ID matching mode
let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Id);

// Test finding fields by ID
let nested_field = NestedField::optional(
3,
"nested_struct",
Type::Struct(StructType::new(vec![
Arc::new(NestedField::optional(
1,
"field_a",
Type::Primitive(PrimitiveType::Int),
)),
Arc::new(NestedField::optional(
2,
"field_b",
Type::Primitive(PrimitiveType::String),
)),
])),
);
let nested_partner = accessor
.field_partner(&struct_array, &nested_field)
.unwrap();

// Verify we can access the nested field
let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();

// Verify the field has the expected value
let int_array = field_a_partner
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(int_array.value(0), 42);
assert_eq!(int_array.value(1), 43);
assert!(int_array.is_null(2));
}

#[test]
fn test_find_field_by_name() {
// Create Arrow arrays for the nested structure
let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);

// Create the nested struct array WITHOUT field IDs in metadata
let nested_struct_array = Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("field_a", DataType::Int32, true)),
Arc::new(field_a_array) as ArrayRef,
),
(
Arc::new(Field::new("field_b", DataType::Utf8, true)),
Arc::new(field_b_array) as ArrayRef,
),
])) as ArrayRef;

let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);

// Create the top-level struct array WITHOUT field IDs in metadata
let struct_array = Arc::new(StructArray::from(vec![
(
Arc::new(Field::new(
"nested_struct",
DataType::Struct(Fields::from(vec![
Field::new("field_a", DataType::Int32, true),
Field::new("field_b", DataType::Utf8, true),
])),
true,
)),
nested_struct_array,
),
(
Arc::new(Field::new("field_c", DataType::Int32, true)),
Arc::new(field_c_array) as ArrayRef,
),
])) as ArrayRef;

// Create an ArrowArrayAccessor with Name matching mode
let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Name);

// Test finding fields by name
let nested_field = NestedField::optional(
3,
"nested_struct",
Type::Struct(StructType::new(vec![
Arc::new(NestedField::optional(
1,
"field_a",
Type::Primitive(PrimitiveType::Int),
)),
Arc::new(NestedField::optional(
2,
"field_b",
Type::Primitive(PrimitiveType::String),
)),
])),
);
let nested_partner = accessor
.field_partner(&struct_array, &nested_field)
.unwrap();

// Verify we can access the nested field by name
let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();

// Verify the field has the expected value
let int_array = field_a_partner
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(int_array.value(0), 42);
assert_eq!(int_array.value(1), 43);
assert!(int_array.is_null(2));
}

#[test]
fn test_complex_nested() {
// complex nested type for test
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/spec/schema/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ pub trait SchemaWithPartnerVisitor<P> {
/// Accessor used to get child partner from parent partner.
pub trait PartnerAccessor<P> {
/// Get the struct partner from schema partner.
fn struct_parner<'a>(&self, schema_partner: &'a P) -> Result<&'a P>;
fn struct_partner<'a>(&self, schema_partner: &'a P) -> Result<&'a P>;
/// Get the field partner from struct partner.
fn field_partner<'a>(&self, struct_partner: &'a P, field: &NestedField) -> Result<&'a P>;
/// Get the list element partner from list partner.
Expand Down Expand Up @@ -274,7 +274,7 @@ pub fn visit_schema_with_partner<P, V: SchemaWithPartnerVisitor<P>, A: PartnerAc
) -> Result<V::T> {
let result = visit_struct_with_partner(
&schema.r#struct,
accessor.struct_parner(partner)?,
accessor.struct_partner(partner)?,
visitor,
accessor,
)?;
Expand Down
Loading
Loading