Skip to content

Commit c953837

Browse files
committed
clean up comments a bit, add a new test
1 parent 29dde0e commit c953837

File tree

1 file changed

+150
-21
lines changed

1 file changed

+150
-21
lines changed

crates/iceberg/src/arrow/record_batch_transformer.rs

Lines changed: 150 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -537,21 +537,20 @@ impl RecordBatchTransformer {
537537
// - When name mapping is present, field ID matches alone aren't sufficient
538538
// - We verify the field NAME also matches to ensure it's the correct field
539539
// - If names don't match, we treat the field as "not present" and use name mapping
540-
let field_by_id = if let Some((source_field, source_index)) =
541-
field_id_to_source_schema_map.get(field_id)
542-
{
543-
let name_matches = source_field.name() == &iceberg_field.name;
544-
545-
if name_mapping.is_some() && !name_matches {
546-
// Field ID conflict detected: Parquet has this field ID but for a different field.
547-
// The field we're looking for (this field_id + this name) is NOT PRESENT in the file.
548-
// Per spec: treat as "not present" and fall through to name mapping (rule #2).
549-
None
550-
} else {
551-
// Field ID matches and either:
552-
// - No name mapping present (trust the field ID)
553-
// - Names also match (correct field, use it)
554-
if source_field.data_type().equals_datatype(target_type) {
540+
let field_by_id = field_id_to_source_schema_map
541+
.get(field_id)
542+
.and_then(|(source_field, source_index)| {
543+
let name_matches = source_field.name() == &iceberg_field.name;
544+
545+
if name_mapping.is_some() && !name_matches {
546+
// Field ID conflict detected: Parquet has this field ID but for a different field.
547+
// The field we're looking for (this field_id + this name) is NOT PRESENT in the file.
548+
// Per spec: treat as "not present" and fall through to name mapping (rule #2).
549+
None
550+
} else if source_field.data_type().equals_datatype(target_type) {
551+
// Field ID matches and either:
552+
// - No name mapping present (trust the field ID)
553+
// - Names also match (correct field, use it)
555554
Some(ColumnSource::PassThrough {
556555
source_index: *source_index,
557556
})
@@ -561,10 +560,7 @@ impl RecordBatchTransformer {
561560
source_index: *source_index,
562561
})
563562
}
564-
}
565-
} else {
566-
None
567-
};
563+
});
568564

569565
if let Some(source) = field_by_id {
570566
source
@@ -1029,7 +1025,7 @@ mod test {
10291025
)]))
10301026
}
10311027

1032-
/// Test for add_files partition column handling with field ID conflicts.
1028+
/// Test for add_files with Parquet files that have NO field IDs (Hive tables).
10331029
///
10341030
/// This reproduces the scenario from Iceberg spec where:
10351031
/// - Hive-style partitioned Parquet files are imported via add_files procedure
@@ -1047,7 +1043,7 @@ mod test {
10471043
/// 3. dept="hr" (from initial_default) - spec rule #3
10481044
/// 4. subdept="communications" (from Parquet via name mapping) - spec rule #2
10491045
#[test]
1050-
fn add_files_partition_columns_with_field_id_conflict() {
1046+
fn add_files_partition_columns_without_field_ids() {
10511047
// Iceberg schema after add_files: id (partition), name, dept (partition), subdept
10521048
let snapshot_schema = Arc::new(
10531049
Schema::builder()
@@ -1143,6 +1139,139 @@ mod test {
11431139
assert_eq!(subdept_column.value(0), "communications");
11441140
}
11451141

1142+
/// Test for TRUE field ID conflicts where Parquet field IDs don't match Iceberg semantics.
1143+
///
1144+
/// This is the critical test that exercises the field ID conflict detection code (lines 545-549).
1145+
///
1146+
/// # Scenario
1147+
///
1148+
/// Parquet file was written with its own field ID assignment:
1149+
/// - field_id=1 → "name" (String)
1150+
/// - field_id=2 → "salary" (Int)
1151+
///
1152+
/// Then imported to Iceberg via add_files with NEW field IDs:
1153+
/// - field_id=1 → "id" (Int, partition column with initial_default=1)
1154+
/// - field_id=2 → "name" (String)
1155+
/// - field_id=3 → "dept" (String, partition column with initial_default="hr")
1156+
/// - field_id=4 → "salary" (Int)
1157+
///
1158+
/// # The Conflict
1159+
///
1160+
/// When looking for Iceberg field_id=2 ("name"):
1161+
/// - Parquet HAS a field_id=2, but it's "salary", not "name"
1162+
/// - This is a field ID conflict - same ID, different field
1163+
///
1164+
/// # Expected Behavior Per Spec
1165+
///
1166+
/// Per the spec, field_id=2 "name" is NOT PRESENT in the Parquet file (even though
1167+
/// a different field_id=2 exists). The implementation must:
1168+
/// 1. Detect the conflict (field ID matches, but names don't match)
1169+
/// 2. Treat Iceberg field_id=2 as "not present"
1170+
/// 3. Fall through to name mapping (spec rule #2) to find "name" by column name
1171+
///
1172+
/// # Implementation
1173+
///
1174+
/// This tests the code at lines 545-549:
1175+
/// ```rust
1176+
/// if name_mapping.is_some() && !name_matches {
1177+
/// // Field ID conflict detected
1178+
/// None
1179+
/// }
1180+
/// ```
1181+
///
1182+
/// # References
1183+
/// - Iceberg spec: https://iceberg.apache.org/spec/#column-projection
1184+
#[test]
1185+
fn add_files_with_true_field_id_conflict() {
1186+
use crate::spec::{MappedField, NameMapping};
1187+
1188+
// Iceberg schema after add_files import
1189+
let snapshot_schema = Arc::new(
1190+
Schema::builder()
1191+
.with_schema_id(0)
1192+
.with_fields(vec![
1193+
NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int))
1194+
.with_initial_default(Literal::int(1))
1195+
.into(),
1196+
NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1197+
NestedField::optional(3, "dept", Type::Primitive(PrimitiveType::String))
1198+
.with_initial_default(Literal::string("hr"))
1199+
.into(),
1200+
NestedField::optional(4, "salary", Type::Primitive(PrimitiveType::Int)).into(),
1201+
])
1202+
.build()
1203+
.unwrap(),
1204+
);
1205+
1206+
// Parquet file schema with CONFLICTING field IDs:
1207+
// - field_id=1 is "name" in Parquet, but "id" in Iceberg (conflict!)
1208+
// - field_id=2 is "salary" in Parquet, but "name" in Iceberg (conflict!)
1209+
let parquet_schema = Arc::new(ArrowSchema::new(vec![
1210+
simple_field("name", DataType::Utf8, true, "1"),
1211+
simple_field("salary", DataType::Int32, false, "2"),
1212+
]));
1213+
1214+
// Name mapping is CRITICAL - without it, we'd incorrectly use the conflicting field IDs
1215+
let name_mapping = Arc::new(NameMapping::new(vec![
1216+
MappedField::new(Some(2), vec!["name".to_string()], vec![]),
1217+
MappedField::new(Some(4), vec!["salary".to_string()], vec![]),
1218+
]));
1219+
1220+
let projected_field_ids = [1, 2, 3, 4]; // id, name, dept, salary
1221+
1222+
let mut transformer = RecordBatchTransformer::build_with_partition_data(
1223+
snapshot_schema,
1224+
&projected_field_ids,
1225+
None,
1226+
None,
1227+
Some(name_mapping),
1228+
);
1229+
1230+
let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
1231+
Arc::new(StringArray::from(vec!["Alice"])),
1232+
Arc::new(Int32Array::from(vec![50000])),
1233+
])
1234+
.unwrap();
1235+
1236+
let result = transformer.process_record_batch(parquet_batch).unwrap();
1237+
1238+
// Verify the transformed RecordBatch correctly resolved field ID conflicts:
1239+
assert_eq!(result.num_columns(), 4);
1240+
assert_eq!(result.num_rows(), 1);
1241+
1242+
// Column 0: id=1 (from initial_default, NOT from Parquet field_id=1 which is "name")
1243+
let id_column = result
1244+
.column(0)
1245+
.as_any()
1246+
.downcast_ref::<Int32Array>()
1247+
.unwrap();
1248+
assert_eq!(id_column.value(0), 1);
1249+
1250+
// Column 1: name="Alice" (from Parquet via name mapping, NOT from field_id=2 which is "salary")
1251+
let name_column = result
1252+
.column(1)
1253+
.as_any()
1254+
.downcast_ref::<StringArray>()
1255+
.unwrap();
1256+
assert_eq!(name_column.value(0), "Alice");
1257+
1258+
// Column 2: dept="hr" (from initial_default, not in Parquet file)
1259+
let dept_column = result
1260+
.column(2)
1261+
.as_any()
1262+
.downcast_ref::<StringArray>()
1263+
.unwrap();
1264+
assert_eq!(dept_column.value(0), "hr");
1265+
1266+
// Column 3: salary=50000 (from Parquet via name mapping, NOT from field_id=2 directly)
1267+
let salary_column = result
1268+
.column(3)
1269+
.as_any()
1270+
.downcast_ref::<Int32Array>()
1271+
.unwrap();
1272+
assert_eq!(salary_column.value(0), 50000);
1273+
}
1274+
11461275
/// Test for bucket partitioning where source columns must be read from data files.
11471276
///
11481277
/// This test verifies correct implementation of the Iceberg spec's "Column Projection" rules:

0 commit comments

Comments
 (0)