Skip to content

Commit e617e3a

Browse files
CTTYfvaleyeliurenjie1024
authored andcommitted
feat(arrow): Allow ArrowArrayAccessor to use field.name to match fields (apache#1561)
## Which issue does this PR close? - Closes apache#1560 ## What changes are included in this PR? - Added `FieldMatchMode` to `ArrowArrayAccessor` - Added `match_mode` to `ParquetWriterBuilder`, `NanValueCountVisitor` so user have control over this behavior on the writer level - Hardcoded `IcebergWriteExec` to use `FieldMatchMode::Name` ## Are these changes tested? Yes, added uts --------- Co-authored-by: Florian Valeye <[email protected]> Co-authored-by: Renjie Liu <[email protected]>
1 parent 831eb6a commit e617e3a

File tree

5 files changed

+276
-19
lines changed

5 files changed

+276
-19
lines changed

crates/iceberg/src/arrow/nan_val_cnt_visitor.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use arrow_array::{ArrayRef, Float32Array, Float64Array, RecordBatch, StructArray
2525
use arrow_schema::DataType;
2626

2727
use crate::Result;
28-
use crate::arrow::ArrowArrayAccessor;
28+
use crate::arrow::{ArrowArrayAccessor, FieldMatchMode};
2929
use crate::spec::{
3030
ListType, MapType, NestedFieldRef, PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor,
3131
StructType, visit_struct_with_partner,
@@ -71,6 +71,7 @@ macro_rules! count_float_nans {
7171
pub struct NanValueCountVisitor {
7272
/// Stores field ID to NaN value count mapping
7373
pub nan_value_counts: HashMap<i32, u64>,
74+
match_mode: FieldMatchMode,
7475
}
7576

7677
impl SchemaWithPartnerVisitor<ArrayRef> for NanValueCountVisitor {
@@ -149,14 +150,20 @@ impl SchemaWithPartnerVisitor<ArrayRef> for NanValueCountVisitor {
149150
impl NanValueCountVisitor {
150151
/// Creates new instance of NanValueCountVisitor
151152
pub fn new() -> Self {
153+
Self::new_with_match_mode(FieldMatchMode::Id)
154+
}
155+
156+
/// Creates new instance of NanValueCountVisitor with explicit match mode
157+
pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self {
152158
Self {
153159
nan_value_counts: HashMap::new(),
160+
match_mode,
154161
}
155162
}
156163

157164
/// Compute nan value counts in given schema and record batch
158165
pub fn compute(&mut self, schema: SchemaRef, batch: RecordBatch) -> Result<()> {
159-
let arrow_arr_partner_accessor = ArrowArrayAccessor {};
166+
let arrow_arr_partner_accessor = ArrowArrayAccessor::new_with_match_mode(self.match_mode);
160167

161168
let struct_arr = Arc::new(StructArray::from(batch)) as ArrayRef;
162169
visit_struct_with_partner(

crates/iceberg/src/arrow/value.rs

Lines changed: 238 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use arrow_array::{
2121
LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StructArray,
2222
Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
2323
};
24-
use arrow_schema::DataType;
24+
use arrow_schema::{DataType, FieldRef};
2525
use uuid::Uuid;
2626

2727
use super::get_field_id;
@@ -425,11 +425,63 @@ impl SchemaWithPartnerVisitor<ArrayRef> for ArrowArrayToIcebergStructConverter {
425425
}
426426
}
427427

428+
/// Defines how Arrow fields are matched with Iceberg fields when converting data.
429+
///
430+
/// This enum provides two strategies for matching fields:
431+
/// - `Id`: Match fields by their ID, which is stored in Arrow field metadata.
432+
/// - `Name`: Match fields by their name, ignoring the field ID.
433+
///
434+
/// The ID matching mode is the default and preferred approach as it's more robust
435+
/// against schema evolution where field names might change but IDs remain stable.
436+
/// The name matching mode can be useful in scenarios where field IDs are not available
437+
/// or when working with systems that don't preserve field IDs.
438+
#[derive(Clone, Copy, Debug)]
439+
pub enum FieldMatchMode {
440+
/// Match fields by their ID stored in Arrow field metadata
441+
Id,
442+
/// Match fields by their name, ignoring field IDs
443+
Name,
444+
}
445+
446+
impl FieldMatchMode {
447+
/// Determines if an Arrow field matches an Iceberg field based on the matching mode.
448+
pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool {
449+
match self {
450+
FieldMatchMode::Id => get_field_id(arrow_field)
451+
.map(|id| id == iceberg_field.id)
452+
.unwrap_or(false),
453+
FieldMatchMode::Name => arrow_field.name() == &iceberg_field.name,
454+
}
455+
}
456+
}
457+
428458
/// Partner type representing accessing and walking arrow arrays alongside iceberg schema
429-
pub struct ArrowArrayAccessor;
459+
pub struct ArrowArrayAccessor {
460+
match_mode: FieldMatchMode,
461+
}
462+
463+
impl ArrowArrayAccessor {
464+
/// Creates a new instance of ArrowArrayAccessor with the default ID matching mode
465+
pub fn new() -> Self {
466+
Self {
467+
match_mode: FieldMatchMode::Id,
468+
}
469+
}
470+
471+
/// Creates a new instance of ArrowArrayAccessor with the specified matching mode
472+
pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self {
473+
Self { match_mode }
474+
}
475+
}
476+
477+
impl Default for ArrowArrayAccessor {
478+
fn default() -> Self {
479+
Self::new()
480+
}
481+
}
430482

431483
impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
432-
fn struct_parner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
484+
fn struct_partner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> {
433485
if !matches!(schema_partner.data_type(), DataType::Struct(_)) {
434486
return Err(Error::new(
435487
ErrorKind::DataInvalid,
@@ -451,18 +503,17 @@ impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
451503
.ok_or_else(|| {
452504
Error::new(
453505
ErrorKind::DataInvalid,
454-
"The struct partner is not a struct array",
506+
format!(
507+
"The struct partner is not a struct array, partner: {:?}",
508+
struct_partner
509+
),
455510
)
456511
})?;
457512

458513
let field_pos = struct_array
459514
.fields()
460515
.iter()
461-
.position(|arrow_field| {
462-
get_field_id(arrow_field)
463-
.map(|id| id == field.id)
464-
.unwrap_or(false)
465-
})
516+
.position(|arrow_field| self.match_mode.match_field(arrow_field, field))
466517
.ok_or_else(|| {
467518
Error::new(
468519
ErrorKind::DataInvalid,
@@ -549,7 +600,7 @@ pub fn arrow_struct_to_literal(
549600
ty,
550601
struct_array,
551602
&mut ArrowArrayToIcebergStructConverter,
552-
&ArrowArrayAccessor,
603+
&ArrowArrayAccessor::new(),
553604
)
554605
}
555606

@@ -899,6 +950,183 @@ mod test {
899950
assert_eq!(result, vec![None; 0]);
900951
}
901952

953+
#[test]
954+
fn test_find_field_by_id() {
955+
// Create Arrow arrays for the nested structure
956+
let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
957+
let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);
958+
959+
// Create the nested struct array with field IDs in metadata
960+
let nested_struct_array =
961+
Arc::new(StructArray::from(vec![
962+
(
963+
Arc::new(Field::new("field_a", DataType::Int32, true).with_metadata(
964+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
965+
)),
966+
Arc::new(field_a_array) as ArrayRef,
967+
),
968+
(
969+
Arc::new(Field::new("field_b", DataType::Utf8, true).with_metadata(
970+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
971+
)),
972+
Arc::new(field_b_array) as ArrayRef,
973+
),
974+
])) as ArrayRef;
975+
976+
let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);
977+
978+
// Create the top-level struct array with field IDs in metadata
979+
let struct_array = Arc::new(StructArray::from(vec![
980+
(
981+
Arc::new(
982+
Field::new(
983+
"nested_struct",
984+
DataType::Struct(Fields::from(vec![
985+
Field::new("field_a", DataType::Int32, true).with_metadata(
986+
HashMap::from([(
987+
PARQUET_FIELD_ID_META_KEY.to_string(),
988+
"1".to_string(),
989+
)]),
990+
),
991+
Field::new("field_b", DataType::Utf8, true).with_metadata(
992+
HashMap::from([(
993+
PARQUET_FIELD_ID_META_KEY.to_string(),
994+
"2".to_string(),
995+
)]),
996+
),
997+
])),
998+
true,
999+
)
1000+
.with_metadata(HashMap::from([(
1001+
PARQUET_FIELD_ID_META_KEY.to_string(),
1002+
"3".to_string(),
1003+
)])),
1004+
),
1005+
nested_struct_array,
1006+
),
1007+
(
1008+
Arc::new(Field::new("field_c", DataType::Int32, true).with_metadata(
1009+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
1010+
)),
1011+
Arc::new(field_c_array) as ArrayRef,
1012+
),
1013+
])) as ArrayRef;
1014+
1015+
// Create an ArrowArrayAccessor with ID matching mode
1016+
let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Id);
1017+
1018+
// Test finding fields by ID
1019+
let nested_field = NestedField::optional(
1020+
3,
1021+
"nested_struct",
1022+
Type::Struct(StructType::new(vec![
1023+
Arc::new(NestedField::optional(
1024+
1,
1025+
"field_a",
1026+
Type::Primitive(PrimitiveType::Int),
1027+
)),
1028+
Arc::new(NestedField::optional(
1029+
2,
1030+
"field_b",
1031+
Type::Primitive(PrimitiveType::String),
1032+
)),
1033+
])),
1034+
);
1035+
let nested_partner = accessor
1036+
.field_partner(&struct_array, &nested_field)
1037+
.unwrap();
1038+
1039+
// Verify we can access the nested field
1040+
let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
1041+
let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();
1042+
1043+
// Verify the field has the expected value
1044+
let int_array = field_a_partner
1045+
.as_any()
1046+
.downcast_ref::<Int32Array>()
1047+
.unwrap();
1048+
assert_eq!(int_array.value(0), 42);
1049+
assert_eq!(int_array.value(1), 43);
1050+
assert!(int_array.is_null(2));
1051+
}
1052+
1053+
#[test]
1054+
fn test_find_field_by_name() {
1055+
// Create Arrow arrays for the nested structure
1056+
let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
1057+
let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);
1058+
1059+
// Create the nested struct array WITHOUT field IDs in metadata
1060+
let nested_struct_array = Arc::new(StructArray::from(vec![
1061+
(
1062+
Arc::new(Field::new("field_a", DataType::Int32, true)),
1063+
Arc::new(field_a_array) as ArrayRef,
1064+
),
1065+
(
1066+
Arc::new(Field::new("field_b", DataType::Utf8, true)),
1067+
Arc::new(field_b_array) as ArrayRef,
1068+
),
1069+
])) as ArrayRef;
1070+
1071+
let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);
1072+
1073+
// Create the top-level struct array WITHOUT field IDs in metadata
1074+
let struct_array = Arc::new(StructArray::from(vec![
1075+
(
1076+
Arc::new(Field::new(
1077+
"nested_struct",
1078+
DataType::Struct(Fields::from(vec![
1079+
Field::new("field_a", DataType::Int32, true),
1080+
Field::new("field_b", DataType::Utf8, true),
1081+
])),
1082+
true,
1083+
)),
1084+
nested_struct_array,
1085+
),
1086+
(
1087+
Arc::new(Field::new("field_c", DataType::Int32, true)),
1088+
Arc::new(field_c_array) as ArrayRef,
1089+
),
1090+
])) as ArrayRef;
1091+
1092+
// Create an ArrowArrayAccessor with Name matching mode
1093+
let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Name);
1094+
1095+
// Test finding fields by name
1096+
let nested_field = NestedField::optional(
1097+
3,
1098+
"nested_struct",
1099+
Type::Struct(StructType::new(vec![
1100+
Arc::new(NestedField::optional(
1101+
1,
1102+
"field_a",
1103+
Type::Primitive(PrimitiveType::Int),
1104+
)),
1105+
Arc::new(NestedField::optional(
1106+
2,
1107+
"field_b",
1108+
Type::Primitive(PrimitiveType::String),
1109+
)),
1110+
])),
1111+
);
1112+
let nested_partner = accessor
1113+
.field_partner(&struct_array, &nested_field)
1114+
.unwrap();
1115+
1116+
// Verify we can access the nested field by name
1117+
let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
1118+
let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();
1119+
1120+
// Verify the field has the expected value
1121+
let int_array = field_a_partner
1122+
.as_any()
1123+
.downcast_ref::<Int32Array>()
1124+
.unwrap();
1125+
assert_eq!(int_array.value(0), 42);
1126+
assert_eq!(int_array.value(1), 43);
1127+
assert!(int_array.is_null(2));
1128+
}
1129+
9021130
#[test]
9031131
fn test_complex_nested() {
9041132
// complex nested type for test

crates/iceberg/src/spec/schema/visitor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ pub trait SchemaWithPartnerVisitor<P> {
190190
/// Accessor used to get child partner from parent partner.
191191
pub trait PartnerAccessor<P> {
192192
/// Get the struct partner from schema partner.
193-
fn struct_parner<'a>(&self, schema_partner: &'a P) -> Result<&'a P>;
193+
fn struct_partner<'a>(&self, schema_partner: &'a P) -> Result<&'a P>;
194194
/// Get the field partner from struct partner.
195195
fn field_partner<'a>(&self, struct_partner: &'a P, field: &NestedField) -> Result<&'a P>;
196196
/// Get the list element partner from list partner.
@@ -274,7 +274,7 @@ pub fn visit_schema_with_partner<P, V: SchemaWithPartnerVisitor<P>, A: PartnerAc
274274
) -> Result<V::T> {
275275
let result = visit_struct_with_partner(
276276
&schema.r#struct,
277-
accessor.struct_parner(partner)?,
277+
accessor.struct_partner(partner)?,
278278
visitor,
279279
accessor,
280280
)?;

0 commit comments

Comments
 (0)