Skip to content

Commit 3928ac5

Browse files
committed
enters match mode
1 parent a33869d commit 3928ac5

File tree

3 files changed

+217
-48
lines changed

3 files changed

+217
-48
lines changed

crates/iceberg/src/arrow/nan_val_cnt_visitor.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use arrow_array::{ArrayRef, Float32Array, Float64Array, RecordBatch, StructArray
2525
use arrow_schema::DataType;
2626

2727
use crate::Result;
28-
use crate::arrow::ArrowArrayAccessor;
28+
use crate::arrow::{ArrowArrayAccessor, FieldMatchMode};
2929
use crate::spec::{
3030
ListType, MapType, NestedFieldRef, PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor,
3131
StructType, visit_struct_with_partner,
@@ -71,6 +71,7 @@ macro_rules! count_float_nans {
7171
pub struct NanValueCountVisitor {
7272
/// Stores field ID to NaN value count mapping
7373
pub nan_value_counts: HashMap<i32, u64>,
74+
match_mode: FieldMatchMode,
7475
}
7576

7677
impl SchemaWithPartnerVisitor<ArrayRef> for NanValueCountVisitor {
@@ -149,15 +150,20 @@ impl SchemaWithPartnerVisitor<ArrayRef> for NanValueCountVisitor {
149150
impl NanValueCountVisitor {
150151
/// Creates new instance of NanValueCountVisitor
151152
pub fn new() -> Self {
153+
Self::new_with_match_mode(FieldMatchMode::Id)
154+
}
155+
156+
/// Creates new instance of NanValueCountVisitor with explicit match mode
157+
pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self {
152158
Self {
153159
nan_value_counts: HashMap::new(),
160+
match_mode,
154161
}
155162
}
156163

157164
/// Compute nan value counts in given schema and record batch
158165
pub fn compute(&mut self, schema: SchemaRef, batch: RecordBatch) -> Result<()> {
159-
let arrow_arr_partner_accessor =
160-
ArrowArrayAccessor::new_with_table_schema(schema.as_ref())?;
166+
let arrow_arr_partner_accessor = ArrowArrayAccessor::new_with_match_mode(self.match_mode);
161167

162168
let struct_arr = Arc::new(StructArray::from(batch)) as ArrayRef;
163169
visit_struct_with_partner(

crates/iceberg/src/arrow/value.rs

Lines changed: 184 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ use arrow_array::{
2121
LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StructArray,
2222
Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
2323
};
24-
use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema};
24+
use arrow_schema::{DataType, FieldRef};
2525
use uuid::Uuid;
2626

27-
use super::{get_field_id, schema_to_arrow_schema};
27+
use super::get_field_id;
2828
use crate::spec::{
29-
ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveType, Schema,
29+
ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveType,
3030
SchemaWithPartnerVisitor, Struct, StructType, visit_struct_with_partner,
3131
};
3232
use crate::{Error, ErrorKind, Result};
@@ -425,13 +425,26 @@ impl SchemaWithPartnerVisitor<ArrayRef> for ArrowArrayToIcebergStructConverter {
425425
}
426426
}
427427

428-
/// todo doc
428+
/// Defines how Arrow fields are matched with Iceberg fields when converting data.
429+
///
430+
/// This enum provides two strategies for matching fields:
431+
/// - `Id`: Match fields by their ID, which is stored in Arrow field metadata.
432+
/// - `Name`: Match fields by their name, ignoring the field ID.
433+
///
434+
/// The ID matching mode is the default and preferred approach as it's more robust
435+
/// against schema evolution where field names might change but IDs remain stable.
436+
/// The name matching mode can be useful in scenarios where field IDs are not available
437+
/// or when working with systems that don't preserve field IDs.
438+
#[derive(Clone, Copy, Debug)]
429439
pub enum FieldMatchMode {
440+
/// Match fields by their ID stored in Arrow field metadata
430441
Id,
442+
/// Match fields by their name, ignoring field IDs
431443
Name,
432444
}
433445

434446
impl FieldMatchMode {
447+
/// Determines if an Arrow field matches an Iceberg field based on the matching mode.
435448
pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool {
436449
match self {
437450
FieldMatchMode::Id => get_field_id(arrow_field)
@@ -448,15 +461,14 @@ pub struct ArrowArrayAccessor {
448461
}
449462

450463
impl ArrowArrayAccessor {
451-
/// Creates a new instance of ArrowArrayAccessor without arrow schema fallback
464+
/// Creates a new instance of ArrowArrayAccessor with the default ID matching mode
452465
pub fn new() -> Self {
453466
Self {
454467
match_mode: FieldMatchMode::Id,
455468
}
456469
}
457470

458-
/// Creates a new instance of ArrowArrayAccessor with arrow schema converted from table schema
459-
/// for field ID resolution fallback
471+
/// Creates a new instance of ArrowArrayAccessor with the specified matching mode
460472
pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self {
461473
Self { match_mode }
462474
}
@@ -933,50 +945,180 @@ mod test {
933945
}
934946

935947
#[test]
936-
fn test_field_id_fallback_with_arrow_schema() {
937-
// Create an Arrow struct array with a field that doesn't have field ID in metadata
938-
let int32_array = Int32Array::from(vec![Some(42), Some(43), None]);
939-
940-
// Create the struct array with a field that has no field ID metadata
941-
let struct_array = Arc::new(StructArray::from(vec![(
942-
Arc::new(Field::new("field_a", DataType::Int32, true)), // No field ID metadata
943-
Arc::new(int32_array) as ArrayRef,
944-
)])) as ArrayRef;
945-
946-
// Create an Iceberg schema with field ID
947-
let iceberg_schema = Schema::builder()
948-
.with_schema_id(1)
949-
.with_fields(vec![Arc::new(NestedField::optional(
950-
100, // Field ID that we'll look for
951-
"field_a",
952-
Type::Primitive(PrimitiveType::Int),
953-
))])
954-
.build()
955-
.unwrap();
948+
fn test_find_field_by_id() {
949+
// Create Arrow arrays for the nested structure
950+
let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
951+
let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);
952+
953+
// Create the nested struct array with field IDs in metadata
954+
let nested_struct_array =
955+
Arc::new(StructArray::from(vec![
956+
(
957+
Arc::new(Field::new("field_a", DataType::Int32, true).with_metadata(
958+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
959+
)),
960+
Arc::new(field_a_array) as ArrayRef,
961+
),
962+
(
963+
Arc::new(Field::new("field_b", DataType::Utf8, true).with_metadata(
964+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
965+
)),
966+
Arc::new(field_b_array) as ArrayRef,
967+
),
968+
])) as ArrayRef;
956969

957-
// Create an ArrowArrayAccessor with the table schema for fallback
958-
let accessor = ArrowArrayAccessor::new_with_table_schema(&iceberg_schema).unwrap();
970+
let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);
959971

960-
// Create a nested field to look up
961-
let field = NestedField::optional(100, "field_a", Type::Primitive(PrimitiveType::Int));
972+
// Create the top-level struct array with field IDs in metadata
973+
let struct_array = Arc::new(StructArray::from(vec![
974+
(
975+
Arc::new(
976+
Field::new(
977+
"nested_struct",
978+
DataType::Struct(Fields::from(vec![
979+
Field::new("field_a", DataType::Int32, true).with_metadata(
980+
HashMap::from([(
981+
PARQUET_FIELD_ID_META_KEY.to_string(),
982+
"1".to_string(),
983+
)]),
984+
),
985+
Field::new("field_b", DataType::Utf8, true).with_metadata(
986+
HashMap::from([(
987+
PARQUET_FIELD_ID_META_KEY.to_string(),
988+
"2".to_string(),
989+
)]),
990+
),
991+
])),
992+
true,
993+
)
994+
.with_metadata(HashMap::from([(
995+
PARQUET_FIELD_ID_META_KEY.to_string(),
996+
"3".to_string(),
997+
)])),
998+
),
999+
nested_struct_array,
1000+
),
1001+
(
1002+
Arc::new(Field::new("field_c", DataType::Int32, true).with_metadata(
1003+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
1004+
)),
1005+
Arc::new(field_c_array) as ArrayRef,
1006+
),
1007+
])) as ArrayRef;
9621008

963-
// This should succeed by using the arrow_schema fallback
964-
let result = accessor.field_partner(&struct_array, &field);
1009+
// Create an ArrowArrayAccessor with ID matching mode
1010+
let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Id);
9651011

966-
// Verify that the field was found
967-
assert!(result.is_ok());
1012+
// Test finding fields by ID
1013+
let nested_field = NestedField::optional(
1014+
3,
1015+
"nested_struct",
1016+
Type::Struct(StructType::new(vec![
1017+
Arc::new(NestedField::optional(
1018+
1,
1019+
"field_a",
1020+
Type::Primitive(PrimitiveType::Int),
1021+
)),
1022+
Arc::new(NestedField::optional(
1023+
2,
1024+
"field_b",
1025+
Type::Primitive(PrimitiveType::String),
1026+
)),
1027+
])),
1028+
);
1029+
let nested_partner = accessor
1030+
.field_partner(&struct_array, &nested_field)
1031+
.unwrap();
9681032

969-
// Verify that the field has the expected value
970-
let array_ref = result.unwrap();
971-
let int_array = array_ref.as_any().downcast_ref::<Int32Array>().unwrap();
1033+
// Verify we can access the nested field
1034+
let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
1035+
let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();
1036+
1037+
// Verify the field has the expected value
1038+
let int_array = field_a_partner
1039+
.as_any()
1040+
.downcast_ref::<Int32Array>()
1041+
.unwrap();
9721042
assert_eq!(int_array.value(0), 42);
9731043
assert_eq!(int_array.value(1), 43);
9741044
assert!(int_array.is_null(2));
1045+
}
1046+
1047+
#[test]
1048+
fn test_find_field_by_name() {
1049+
// Create Arrow arrays for the nested structure
1050+
let field_a_array = Int32Array::from(vec![Some(42), Some(43), None]);
1051+
let field_b_array = StringArray::from(vec![Some("value1"), Some("value2"), None]);
1052+
1053+
// Create the nested struct array WITHOUT field IDs in metadata
1054+
let nested_struct_array = Arc::new(StructArray::from(vec![
1055+
(
1056+
Arc::new(Field::new("field_a", DataType::Int32, true)),
1057+
Arc::new(field_a_array) as ArrayRef,
1058+
),
1059+
(
1060+
Arc::new(Field::new("field_b", DataType::Utf8, true)),
1061+
Arc::new(field_b_array) as ArrayRef,
1062+
),
1063+
])) as ArrayRef;
1064+
1065+
let field_c_array = Int32Array::from(vec![Some(100), Some(200), None]);
1066+
1067+
// Create the top-level struct array WITHOUT field IDs in metadata
1068+
let struct_array = Arc::new(StructArray::from(vec![
1069+
(
1070+
Arc::new(Field::new(
1071+
"nested_struct",
1072+
DataType::Struct(Fields::from(vec![
1073+
Field::new("field_a", DataType::Int32, true),
1074+
Field::new("field_b", DataType::Utf8, true),
1075+
])),
1076+
true,
1077+
)),
1078+
nested_struct_array,
1079+
),
1080+
(
1081+
Arc::new(Field::new("field_c", DataType::Int32, true)),
1082+
Arc::new(field_c_array) as ArrayRef,
1083+
),
1084+
])) as ArrayRef;
1085+
1086+
// Create an ArrowArrayAccessor with Name matching mode
1087+
let accessor = ArrowArrayAccessor::new_with_match_mode(FieldMatchMode::Name);
1088+
1089+
// Test finding fields by name
1090+
let nested_field = NestedField::optional(
1091+
3,
1092+
"nested_struct",
1093+
Type::Struct(StructType::new(vec![
1094+
Arc::new(NestedField::optional(
1095+
1,
1096+
"field_a",
1097+
Type::Primitive(PrimitiveType::Int),
1098+
)),
1099+
Arc::new(NestedField::optional(
1100+
2,
1101+
"field_b",
1102+
Type::Primitive(PrimitiveType::String),
1103+
)),
1104+
])),
1105+
);
1106+
let nested_partner = accessor
1107+
.field_partner(&struct_array, &nested_field)
1108+
.unwrap();
1109+
1110+
// Verify we can access the nested field by name
1111+
let field_a = NestedField::optional(1, "field_a", Type::Primitive(PrimitiveType::Int));
1112+
let field_a_partner = accessor.field_partner(nested_partner, &field_a).unwrap();
9751113

976-
// Now try with an accessor without arrow_schema - this should fail
977-
let accessor_without_schema = ArrowArrayAccessor::new().unwrap();
978-
let result = accessor_without_schema.field_partner(&struct_array, &field);
979-
assert!(result.is_err());
1114+
// Verify the field has the expected value
1115+
let int_array = field_a_partner
1116+
.as_any()
1117+
.downcast_ref::<Int32Array>()
1118+
.unwrap();
1119+
assert_eq!(int_array.value(0), 42);
1120+
assert_eq!(int_array.value(1), 43);
1121+
assert!(int_array.is_null(2));
9801122
}
9811123

9821124
#[test]

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ use thrift::protocol::TOutputProtocol;
3737
use super::location_generator::{FileNameGenerator, LocationGenerator};
3838
use super::{FileWriter, FileWriterBuilder};
3939
use crate::arrow::{
40-
ArrowFileReader, DEFAULT_MAP_FIELD_NAME, NanValueCountVisitor, get_parquet_stat_max_as_datum,
41-
get_parquet_stat_min_as_datum,
40+
ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor,
41+
get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum,
4242
};
4343
use crate::io::{FileIO, FileWrite, OutputFile};
4444
use crate::spec::{
@@ -55,6 +55,7 @@ use crate::{Error, ErrorKind, Result};
5555
pub struct ParquetWriterBuilder<T: LocationGenerator, F: FileNameGenerator> {
5656
props: WriterProperties,
5757
schema: SchemaRef,
58+
match_mode: FieldMatchMode,
5859

5960
file_io: FileIO,
6061
location_generator: T,
@@ -70,10 +71,30 @@ impl<T: LocationGenerator, F: FileNameGenerator> ParquetWriterBuilder<T, F> {
7071
file_io: FileIO,
7172
location_generator: T,
7273
file_name_generator: F,
74+
) -> Self {
75+
Self::new_with_match_mode(
76+
props,
77+
schema,
78+
FieldMatchMode::Id,
79+
file_io,
80+
location_generator,
81+
file_name_generator,
82+
)
83+
}
84+
85+
/// Create a new `ParquetWriterBuilder` with custom match mode
86+
pub fn new_with_match_mode(
87+
props: WriterProperties,
88+
schema: SchemaRef,
89+
match_mode: FieldMatchMode,
90+
file_io: FileIO,
91+
location_generator: T,
92+
file_name_generator: F,
7393
) -> Self {
7494
Self {
7595
props,
7696
schema,
97+
match_mode,
7798
file_io,
7899
location_generator,
79100
file_name_generator,
@@ -96,7 +117,7 @@ impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWr
96117
writer_properties: self.props,
97118
current_row_num: 0,
98119
out_file,
99-
nan_value_count_visitor: NanValueCountVisitor::new(),
120+
nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),
100121
})
101122
}
102123
}

0 commit comments

Comments
 (0)