diff --git a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs index 6b75c011cb..e514457887 100644 --- a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs +++ b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs @@ -25,7 +25,7 @@ use arrow_array::{ArrayRef, Float32Array, Float64Array, RecordBatch, StructArray use arrow_schema::DataType; use crate::Result; -use crate::arrow::ArrowArrayAccessor; +use crate::arrow::{ArrowArrayAccessor, FieldMatchMode}; use crate::spec::{ ListType, MapType, NestedFieldRef, PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, StructType, visit_struct_with_partner, @@ -71,6 +71,7 @@ macro_rules! count_float_nans { pub struct NanValueCountVisitor { /// Stores field ID to NaN value count mapping pub nan_value_counts: HashMap, + match_mode: FieldMatchMode, } impl SchemaWithPartnerVisitor for NanValueCountVisitor { @@ -149,14 +150,20 @@ impl SchemaWithPartnerVisitor for NanValueCountVisitor { impl NanValueCountVisitor { /// Creates new instance of NanValueCountVisitor pub fn new() -> Self { + Self::new_with_match_mode(FieldMatchMode::Id) + } + + /// Creates new instance of NanValueCountVisitor with explicit match mode + pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self { Self { nan_value_counts: HashMap::new(), + match_mode, } } /// Compute nan value counts in given schema and record batch pub fn compute(&mut self, schema: SchemaRef, batch: RecordBatch) -> Result<()> { - let arrow_arr_partner_accessor = ArrowArrayAccessor {}; + let arrow_arr_partner_accessor = ArrowArrayAccessor::new_with_match_mode(self.match_mode); let struct_arr = Arc::new(StructArray::from(batch)) as ArrayRef; visit_struct_with_partner( diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 993e927145..9ddd941fa4 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -21,7 +21,7 @@ use arrow_array::{ LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, }; -use arrow_schema::DataType; +use arrow_schema::{DataType, FieldRef}; use uuid::Uuid; use super::get_field_id; @@ -425,11 +425,63 @@ impl SchemaWithPartnerVisitor for ArrowArrayToIcebergStructConverter { } } +/// Defines how Arrow fields are matched with Iceberg fields when converting data. +/// +/// This enum provides two strategies for matching fields: +/// - `Id`: Match fields by their ID, which is stored in Arrow field metadata. +/// - `Name`: Match fields by their name, ignoring the field ID. +/// +/// The ID matching mode is the default and preferred approach as it's more robust +/// against schema evolution where field names might change but IDs remain stable. +/// The name matching mode can be useful in scenarios where field IDs are not available +/// or when working with systems that don't preserve field IDs. +#[derive(Clone, Copy, Debug)] +pub enum FieldMatchMode { + /// Match fields by their ID stored in Arrow field metadata + Id, + /// Match fields by their name, ignoring field IDs + Name, +} + +impl FieldMatchMode { + /// Determines if an Arrow field matches an Iceberg field based on the matching mode. + pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool { + match self { + FieldMatchMode::Id => get_field_id(arrow_field) + .map(|id| id == iceberg_field.id) + .unwrap_or(false), + FieldMatchMode::Name => arrow_field.name() == &iceberg_field.name, + } + } +} + /// Partner type representing accessing and walking arrow arrays alongside iceberg schema -pub struct ArrowArrayAccessor; +pub struct ArrowArrayAccessor { + match_mode: FieldMatchMode, +} + +impl ArrowArrayAccessor { + /// Creates a new instance of ArrowArrayAccessor with the default ID matching mode + pub fn new() -> Self { + Self { + match_mode: FieldMatchMode::Id, + } + } + + /// Creates a new instance of ArrowArrayAccessor with the specified matching mode + pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self { + Self { match_mode } + } +} + +impl Default for ArrowArrayAccessor { + fn default() -> Self { + Self::new() + } +} impl PartnerAccessor for ArrowArrayAccessor { - fn struct_parner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> { + fn struct_partner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> { if !matches!(schema_partner.data_type(), DataType::Struct(_)) { return Err(Error::new( ErrorKind::DataInvalid, @@ -451,18 +503,17 @@ impl PartnerAccessor for ArrowArrayAccessor { .ok_or_else(|| { Error::new( ErrorKind::DataInvalid, - "The struct partner is not a struct array", + format!( + "The struct partner is not a struct array, partner: {:?}", + struct_partner + ), ) })?; let field_pos = struct_array .fields() .iter() - .position(|arrow_field| { - get_field_id(arrow_field) - .map(|id| id == field.id) - .unwrap_or(false) - }) + .position(|arrow_field| self.match_mode.match_field(arrow_field, field)) .ok_or_else(|| { Error::new( ErrorKind::DataInvalid, @@ -549,7 +600,7 @@ pub fn arrow_struct_to_literal( ty, struct_array, &mut ArrowArrayToIcebergStructConverter, - &ArrowArrayAccessor, + &ArrowArrayAccessor::new(), ) } @@ -899,6 +950,183 @@ mod test { assert_eq!(result, vec![None; 0]); } + #[test] + fn test_find_field_by_id() { + // Create Arrow arrays for the nested structure + let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]); + let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]); + + // Create the nested struct array with field IDs in metadata + let nested_struct_array = + Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("field_a", DataType::Int32, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]), + )), + Arc::new(field_a_array) as ArrayRef, + ), + ( + Arc::new(Field::new("field_b", DataType::Utf8, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]), + )), + Arc::new(field_b_array) as ArrayRef, + ), + ])) as ArrayRef; + + let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]); + + // Create the top-level struct array with field IDs in metadata + let struct_array = Arc::new(StructArray::from(vec![ + ( + Arc::new( + Field::new( + "nested_struct", + DataType::Struct(Fields::from(vec![ + Field::new("field_a", DataType::Int32, true).with_metadata( + HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )]), + ), + Field::new("field_b", DataType::Utf8, true).with_metadata( + HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )]), + ), + ])), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + ), + nested_struct_array, + ), + ( + Arc::new(Field::new("field_c", DataType::Int32, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]), + )), + Arc::new(field_c_array) as ArrayRef, + ), + ])) as ArrayRef; + + // Create an ArrowArrayAccessor with ID matching mode + let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Id); + + // Test finding fields by ID + let nested_field = NestedField::optional( + 3, + "nested_struct", + Type::Struct(StructType::new(vec![ + Arc::new(NestedField::optional( + 1, + "field_a", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "field_b", + Type::Primitive(PrimitiveType::String), + )), + ])), + ); + let nested_partner = accessor + .field_partner(&struct_array, &nested_field) + .unwrap(); + + // Verify we can access the nested field + let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int)); + let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap(); + + // Verify the field has the expected value + let int_array = field_a_partner + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(int_array.value(0), 42); + assert_eq!(int_array.value(1), 43); + assert!(int_array.is_null(2)); + } + + #[test] + fn test_find_field_by_name() { + // Create Arrow arrays for the nested structure + let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]); + let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]); + + // Create the nested struct array WITHOUT field IDs in metadata + let nested_struct_array = Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("field_a", DataType::Int32, true)), + Arc::new(field_a_array) as ArrayRef, + ), + ( + Arc::new(Field::new("field_b", DataType::Utf8, true)), + Arc::new(field_b_array) as ArrayRef, + ), + ])) as ArrayRef; + + let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]); + + // Create the top-level struct array WITHOUT field IDs in metadata + let struct_array = Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new( + "nested_struct", + DataType::Struct(Fields::from(vec![ + Field::new("field_a", DataType::Int32, true), + Field::new("field_b", DataType::Utf8, true), + ])), + true, + )), + nested_struct_array, + ), + ( + Arc::new(Field::new("field_c", DataType::Int32, true)), + Arc::new(field_c_array) as ArrayRef, + ), + ])) as ArrayRef; + + // Create an ArrowArrayAccessor with Name matching mode + let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Name); + + // Test finding fields by name + let nested_field = NestedField::optional( + 3, + "nested_struct", + Type::Struct(StructType::new(vec![ + Arc::new(NestedField::optional( + 1, + "field_a", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "field_b", + Type::Primitive(PrimitiveType::String), + )), + ])), + ); + let nested_partner = accessor + .field_partner(&struct_array, &nested_field) + .unwrap(); + + // Verify we can access the nested field by name + let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int)); + let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap(); + + // Verify the field has the expected value + let int_array = field_a_partner + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(int_array.value(0), 42); + assert_eq!(int_array.value(1), 43); + assert!(int_array.is_null(2)); + } + #[test] fn test_complex_nested() { // complex nested type for test diff --git a/crates/iceberg/src/spec/schema/visitor.rs b/crates/iceberg/src/spec/schema/visitor.rs index ebb9b86bba..50f7c04caa 100644 --- a/crates/iceberg/src/spec/schema/visitor.rs +++ b/crates/iceberg/src/spec/schema/visitor.rs @@ -190,7 +190,7 @@ pub trait SchemaWithPartnerVisitor

{ /// Accessor used to get child partner from parent partner. pub trait PartnerAccessor

{ /// Get the struct partner from schema partner. - fn struct_parner<'a>(&self, schema_partner: &'a P) -> Result<&'a P>; + fn struct_partner<'a>(&self, schema_partner: &'a P) -> Result<&'a P>; /// Get the field partner from struct partner. fn field_partner<'a>(&self, struct_partner: &'a P, field: &NestedField) -> Result<&'a P>; /// Get the list element partner from list partner. @@ -274,7 +274,7 @@ pub fn visit_schema_with_partner, A: PartnerAc ) -> Result { let result = visit_struct_with_partner( &schema.r#struct, - accessor.struct_parner(partner)?, + accessor.struct_partner(partner)?, visitor, accessor, )?; diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index db410f47e3..4f65d64be4 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -37,8 +37,8 @@ use thrift::protocol::TOutputProtocol; use super::location_generator::{FileNameGenerator, LocationGenerator}; use super::{FileWriter, FileWriterBuilder}; use crate::arrow::{ - ArrowFileReader, DEFAULT_MAP_FIELD_NAME, NanValueCountVisitor, get_parquet_stat_max_as_datum, - get_parquet_stat_min_as_datum, + ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor, + get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, }; use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ @@ -55,6 +55,7 @@ use crate::{Error, ErrorKind, Result}; pub struct ParquetWriterBuilder { props: WriterProperties, schema: SchemaRef, + match_mode: FieldMatchMode, file_io: FileIO, location_generator: T, @@ -70,10 +71,30 @@ impl ParquetWriterBuilder { file_io: FileIO, location_generator: T, file_name_generator: F, + ) -> Self { + Self::new_with_match_mode( + props, + schema, + FieldMatchMode::Id, + file_io, + location_generator, + file_name_generator, + ) + } + + /// Create a new `ParquetWriterBuilder` with custom match mode + pub fn new_with_match_mode( + props: WriterProperties, + schema: SchemaRef, + match_mode: FieldMatchMode, + file_io: FileIO, + location_generator: T, + file_name_generator: F, ) -> Self { Self { props, schema, + match_mode, file_io, location_generator, file_name_generator, @@ -96,7 +117,7 @@ impl FileWriterBuilder for ParquetWr writer_properties: self.props, current_row_num: 0, out_file, - nan_value_count_visitor: NanValueCountVisitor::new(), + nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode), }) } } diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index a8d0b110af..4b8ef1ab11 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -35,7 +35,7 @@ use datafusion::physical_plan::{ execute_input_stream, }; use futures::StreamExt; -use iceberg::arrow::schema_to_arrow_schema; +use iceberg::arrow::{FieldMatchMode, schema_to_arrow_schema}; use iceberg::spec::{ DataFileFormat, PROPERTY_DEFAULT_FILE_FORMAT, PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT, PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES, PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, @@ -232,9 +232,10 @@ impl ExecutionPlan for IcebergWriteExec { } // Create data file writer builder - let parquet_file_writer_builder = ParquetWriterBuilder::new( + let parquet_file_writer_builder = ParquetWriterBuilder::new_with_match_mode( WriterProperties::default(), self.table.metadata().current_schema().clone(), + FieldMatchMode::Name, self.table.file_io().clone(), DefaultLocationGenerator::new(self.table.metadata().clone()) .map_err(to_datafusion_error)?,