diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 8a3ab3a955..f1c4f86f23 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -881,6 +881,9 @@ mod tests { project_field_ids: vec![2, 3], predicate: None, deletes: vec![pos_del, eq_del], + partition: None, + partition_spec: None, + name_mapping: None, }; // Load the deletes - should handle both types without error diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 4250974bcd..14b5124ee6 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -341,6 +341,9 @@ pub(crate) mod tests { project_field_ids: vec![], predicate: None, deletes: vec![pos_del_1, pos_del_2.clone()], + partition: None, + partition_spec: None, + name_mapping: None, }, FileScanTask { start: 0, @@ -352,6 +355,9 @@ pub(crate) mod tests { project_field_ids: vec![], predicate: None, deletes: vec![pos_del_3], + partition: None, + partition_spec: None, + name_mapping: None, }, ]; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index fed8f19c05..770566f93d 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -234,9 +234,14 @@ impl ArrowReader { // RecordBatchTransformer performs any transformations required on the RecordBatches // that come back from the file, such as type promotion, default column insertion - // and column re-ordering - let mut record_batch_transformer = - RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids()); + // and column re-ordering. + let mut record_batch_transformer = RecordBatchTransformer::build_with_partition_data( + task.schema_ref(), + task.project_field_ids(), + task.partition_spec.clone(), + task.partition.clone(), + task.name_mapping.clone(), + ); if let Some(batch_size) = batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); @@ -1948,6 +1953,9 @@ message schema { project_field_ids: vec![1], predicate: Some(predicate.bind(schema, true).unwrap()), deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -2266,6 +2274,9 @@ message schema { project_field_ids: vec![1], predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, }; // Task 2: read the second and third row groups @@ -2279,6 +2290,9 @@ message schema { project_field_ids: vec![1], predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, }; let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream; @@ -2403,6 +2417,9 @@ message schema { project_field_ids: vec![1, 2], // Request both columns 'a' and 'b' predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -2571,6 +2588,9 @@ message schema { partition_spec_id: 0, equality_ids: None, }], + partition: None, + partition_spec: None, + name_mapping: None, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -2786,6 +2806,9 @@ message schema { partition_spec_id: 0, equality_ids: None, }], + partition: None, + partition_spec: None, + name_mapping: None, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -2994,6 +3017,9 @@ message schema { partition_spec_id: 0, equality_ids: None, }], + partition: None, + partition_spec: None, + name_mapping: None, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -3094,6 +3120,9 @@ message schema { project_field_ids: vec![1, 2], predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3188,6 +3217,9 @@ message schema { project_field_ids: vec![1, 3], predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3271,6 +3303,9 @@ message schema { project_field_ids: vec![1, 2, 3], predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3368,6 +3403,9 @@ message schema { project_field_ids: vec![1, 2], predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3494,6 +3532,9 @@ message schema { project_field_ids: vec![1, 2], predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3587,6 +3628,9 @@ message schema { project_field_ids: vec![1, 5, 2], predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3693,6 +3737,9 @@ message schema { project_field_ids: vec![1, 2, 3], predicate: Some(predicate.bind(schema, true).unwrap()), deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3708,4 +3755,162 @@ message schema { // Should return empty results assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0)); } + + /// Test bucket partitioning reads source column from data file (not partition metadata). + /// + /// This is an integration test verifying the complete ArrowReader pipeline with bucket partitioning. + /// It corresponds to TestRuntimeFiltering tests in Iceberg Java (e.g., testRenamedSourceColumnTable). + /// + /// # Iceberg Spec Requirements + /// + /// Per the Iceberg spec "Column Projection" section: + /// > "Return the value from partition metadata if an **Identity Transform** exists for the field" + /// + /// This means: + /// - Identity transforms (e.g., `identity(dept)`) use constants from partition metadata + /// - Non-identity transforms (e.g., `bucket(4, id)`) must read source columns from data files + /// - Partition metadata for bucket transforms stores bucket numbers (0-3), NOT source values + /// + /// Java's PartitionUtil.constantsMap() implements this via: + /// ```java + /// if (field.transform().isIdentity()) { + /// idToConstant.put(field.sourceId(), converted); + /// } + /// ``` + /// + /// # What This Test Verifies + /// + /// This test ensures the full ArrowReader → RecordBatchTransformer pipeline correctly handles + /// bucket partitioning when FileScanTask provides partition_spec and partition_data: + /// + /// - Parquet file has field_id=1 named "id" with actual data [1, 5, 9, 13] + /// - FileScanTask specifies partition_spec with bucket(4, id) and partition_data with bucket=1 + /// - RecordBatchTransformer.constants_map() excludes bucket-partitioned field from constants + /// - ArrowReader correctly reads [1, 5, 9, 13] from the data file + /// - Values are NOT replaced with constant 1 from partition metadata + /// + /// # Why This Matters + /// + /// Without correct handling: + /// - Runtime filtering would break (e.g., `WHERE id = 5` would fail) + /// - Query results would be incorrect (all rows would have id=1) + /// - Bucket partitioning would be unusable for query optimization + /// + /// # References + /// - Iceberg spec: format/spec.md "Column Projection" + "Partition Transforms" + /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java + /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java + #[tokio::test] + async fn test_bucket_partitioning_reads_source_column_from_file() { + use arrow_array::Int32Array; + + use crate::spec::{Literal, PartitionSpec, Struct, Transform}; + + // Iceberg schema with id and name columns + let schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + // Partition spec: bucket(4, id) + let partition_spec = Arc::new( + PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .add_partition_field("id", "id_bucket", Transform::Bucket(4)) + .unwrap() + .build() + .unwrap(), + ); + + // Partition data: bucket value is 1 + let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]); + + // Create Arrow schema with field IDs for Parquet file + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ])); + + // Write Parquet file with data + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef; + let name_data = + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let file = File::create(format!("{}/data.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + // Read the Parquet file with partition spec and data + let reader = ArrowReaderBuilder::new(file_io).build(); + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/data.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + deletes: vec![], + partition: Some(partition_data), + partition_spec: Some(partition_spec), + name_mapping: None, + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + // Verify we got the correct data + assert_eq!(result.len(), 1); + let batch = &result[0]; + + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 4); + + // The id column MUST contain actual values from the Parquet file [1, 5, 9, 13], + // NOT the constant partition value 1 + let id_col = batch + .column(0) + .as_primitive::(); + assert_eq!(id_col.value(0), 1); + assert_eq!(id_col.value(1), 5); + assert_eq!(id_col.value(2), 9); + assert_eq!(id_col.value(3), 13); + + let name_col = batch.column(1).as_string::(); + assert_eq!(name_col.value(0), "Alice"); + assert_eq!(name_col.value(1), "Bob"); + assert_eq!(name_col.value(2), "Charlie"); + assert_eq!(name_col.value(3), "Dave"); + } } diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 5fbbbb106a..a4b22eb102 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -29,9 +29,45 @@ use arrow_schema::{ use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::schema_to_arrow_schema; -use crate::spec::{Literal, PrimitiveLiteral, Schema as IcebergSchema}; +use crate::spec::{ + Literal, NameMapping, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, + Transform, +}; use crate::{Error, ErrorKind, Result}; +/// Build a map of field ID to constant value for identity-partitioned fields. +/// +/// Implements Iceberg spec "Column Projection" rule #1: use partition metadata constants +/// only for identity-transformed fields. Non-identity transforms (bucket, truncate, year, etc.) +/// store derived values in partition metadata, so source columns must be read from data files. +/// +/// Example: For `bucket(4, id)`, partition metadata has `id_bucket = 2` (bucket number), +/// but the actual `id` values (100, 200, 300) are only in the data file. +/// +/// Matches Java's `PartitionUtil.constantsMap()` which filters `if (field.transform().isIdentity())`. +/// +/// # References +/// - Spec: https://iceberg.apache.org/spec/#column-projection +/// - Java: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java:constantsMap() +fn constants_map( + partition_spec: &PartitionSpec, + partition_data: &Struct, +) -> HashMap { + let mut constants = HashMap::new(); + + for (pos, field) in partition_spec.fields().iter().enumerate() { + // Only identity transforms should use constant values from partition metadata + if matches!(field.transform, Transform::Identity) { + // Get the partition value for this field + if let Some(Some(Literal::Primitive(value))) = partition_data.iter().nth(pos) { + constants.insert(field.source_id, value.clone()); + } + } + } + + constants +} + /// Indicates how a particular column in a processed RecordBatch should /// be sourced. #[derive(Debug)] @@ -112,6 +148,13 @@ pub(crate) struct RecordBatchTransformer { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, + // Optional partition spec and data for proper constant identification + partition_spec: Option>, + partition_data: Option, + + // Optional name mapping for resolving field IDs from column names + name_mapping: Option>, + // BatchTransform gets lazily constructed based on the schema of // the first RecordBatch we receive from the file batch_transform: Option, @@ -123,12 +166,55 @@ impl RecordBatchTransformer { pub(crate) fn build( snapshot_schema: Arc, projected_iceberg_field_ids: &[i32], + ) -> Self { + Self::build_with_partition_data( + snapshot_schema, + projected_iceberg_field_ids, + None, + None, + None, + ) + } + + /// Build a RecordBatchTransformer with partition spec and data for proper constant identification. + /// + /// Implements the Iceberg spec's "Column Projection" rules for resolving field IDs "not present" in data files: + /// 1. Return the value from partition metadata if an Identity Transform exists + /// 2. Use schema.name-mapping.default metadata to map field id to columns without field id + /// 3. Return the default value if it has a defined initial-default + /// 4. Return null in all other cases + /// + /// # Why this method exists + /// + /// 1. **Bucket partitioning**: Distinguish identity transforms (use partition metadata constants) + /// from non-identity transforms like bucket (read from data file) to enable runtime filtering on + /// bucket-partitioned columns. + /// + /// 2. **Add_files field ID conflicts**: When importing Hive tables, partition columns can have field IDs + /// conflicting with Parquet data columns (e.g., Parquet has field_id=1->"name", but Iceberg expects + /// field_id=1->"id"). Per spec, such fields are "not present" and should use name mapping (rule #2). + /// + /// This matches Java's ParquetSchemaUtil.applyNameMapping approach but detects conflicts during projection. + /// + /// # References + /// - Spec: https://iceberg.apache.org/spec/#column-projection + /// - Java: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java + /// - Java: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java + pub(crate) fn build_with_partition_data( + snapshot_schema: Arc, + projected_iceberg_field_ids: &[i32], + partition_spec: Option>, + partition_data: Option, + name_mapping: Option>, ) -> Self { let projected_iceberg_field_ids = projected_iceberg_field_ids.to_vec(); Self { snapshot_schema, projected_iceberg_field_ids, + partition_spec, + partition_data, + name_mapping, batch_transform: None, } } @@ -167,6 +253,9 @@ impl RecordBatchTransformer { record_batch.schema_ref(), self.snapshot_schema.as_ref(), &self.projected_iceberg_field_ids, + self.partition_spec.as_ref().map(|s| s.as_ref()), + self.partition_data.as_ref(), + self.name_mapping.as_ref().map(|n| n.as_ref()), )?); self.process_record_batch(record_batch)? @@ -185,6 +274,9 @@ impl RecordBatchTransformer { source_schema: &ArrowSchemaRef, snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], + partition_spec: Option<&PartitionSpec>, + partition_data: Option<&Struct>, + name_mapping: Option<&NameMapping>, ) -> Result { let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); let field_id_to_mapped_schema_map = @@ -205,6 +297,12 @@ impl RecordBatchTransformer { let target_schema = Arc::new(ArrowSchema::new(fields?)); + let constants_map = if let (Some(spec), Some(data)) = (partition_spec, partition_data) { + constants_map(spec, data) + } else { + HashMap::new() + }; + match Self::compare_schemas(source_schema, &target_schema) { SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough), SchemaComparison::NameChangesOnly => Ok(BatchTransform::ModifySchema { target_schema }), @@ -214,6 +312,9 @@ impl RecordBatchTransformer { snapshot_schema, projected_iceberg_field_ids, field_id_to_mapped_schema_map, + constants_map, + partition_spec, + name_mapping, )?, target_schema, }), @@ -270,57 +371,127 @@ impl RecordBatchTransformer { snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], field_id_to_mapped_schema_map: HashMap, + constants_map: HashMap, + _partition_spec: Option<&PartitionSpec>, + name_mapping: Option<&NameMapping>, ) -> Result> { let field_id_to_source_schema_map = Self::build_field_id_to_arrow_schema_map(source_schema)?; - projected_iceberg_field_ids.iter().map(|field_id|{ - let (target_field, _) = field_id_to_mapped_schema_map.get(field_id).ok_or( - Error::new(ErrorKind::Unexpected, "could not find field in schema") - )?; - let target_type = target_field.data_type(); + // Build name-based map for spec rule #2 (name mapping) + // This allows us to find Parquet columns by name when field IDs are missing/conflicting + let field_name_to_source_schema_map = + Self::build_field_name_to_arrow_schema_map(source_schema); - Ok(if let Some((source_field, source_index)) = field_id_to_source_schema_map.get(field_id) { - // column present in source + projected_iceberg_field_ids + .iter() + .map(|field_id| { + let (target_field, _) = + field_id_to_mapped_schema_map + .get(field_id) + .ok_or(Error::new( + ErrorKind::Unexpected, + "could not find field in schema", + ))?; + let target_type = target_field.data_type(); - if source_field.data_type().equals_datatype(target_type) { - // no promotion required - ColumnSource::PassThrough { - source_index: *source_index - } - } else { - // promotion required - ColumnSource::Promote { + let iceberg_field = snapshot_schema.field_by_id(*field_id).ok_or(Error::new( + ErrorKind::Unexpected, + "Field not found in snapshot schema", + ))?; + + // Iceberg spec's "Column Projection" rules (https://iceberg.apache.org/spec/#column-projection). + // For fields "not present" in data files: + // 1. Use partition metadata (identity transforms only) + // 2. Use name mapping + // 3. Use initial_default + // 4. Return null + // + // Why check partition constants before Parquet field IDs (Java: BaseParquetReaders.java:299): + // In add_files scenarios, partition columns may exist in BOTH Parquet AND partition metadata. + // Partition metadata is authoritative - it defines which partition this file belongs to. + + // Why verify names when checking field IDs: + // add_files can create field ID conflicts (Parquet field_id=1->"name", Iceberg field_id=1->"id"). + // Name mismatches with name_mapping present indicate conflicts, treat field as "not present". + // Without name_mapping, name mismatches are just column renames, so trust the field ID. + // (Java: ParquetSchemaUtil.applyNameMapping, TestAddFilesProcedure.addDataPartitioned) + let field_by_id = field_id_to_source_schema_map.get(field_id).and_then( + |(source_field, source_index)| { + if name_mapping.is_some() { + let name_matches = source_field.name() == &iceberg_field.name; + if !name_matches { + return None; // Field ID conflict, treat as "not present" + } + } + + if source_field.data_type().equals_datatype(target_type) { + Some(ColumnSource::PassThrough { + source_index: *source_index, + }) + } else { + Some(ColumnSource::Promote { + target_type: target_type.clone(), + source_index: *source_index, + }) + } + }, + ); + + // Apply spec's fallback steps for "not present" fields. + let column_source = if let Some(constant_value) = constants_map.get(field_id) { + // Rule #1: Identity partition constant + ColumnSource::Add { + value: Some(constant_value.clone()), target_type: target_type.clone(), - source_index: *source_index, } - } - } else { - // column must be added - let iceberg_field = snapshot_schema.field_by_id(*field_id).ok_or( - Error::new(ErrorKind::Unexpected, "Field not found in snapshot schema") - )?; - - let default_value = if let Some(iceberg_default_value) = - &iceberg_field.initial_default - { - let Literal::Primitive(primitive_literal) = iceberg_default_value else { - return Err(Error::new( - ErrorKind::Unexpected, - format!("Default value for column must be primitive type, but encountered {iceberg_default_value:?}") - )); - }; - Some(primitive_literal.clone()) + } else if let Some(source) = field_by_id { + source } else { - None + // Rule #2: Name mapping (Java: ReadConf.java:83-85) + let name_mapped_column = name_mapping.and_then(|mapping| { + mapping.fields().iter().find_map(|mapped_field| { + if mapped_field.field_id() == Some(*field_id) { + mapped_field.names().iter().find_map(|name| { + field_name_to_source_schema_map + .get(name) + .map(|(field, idx)| (field.clone(), *idx)) + }) + } else { + None + } + }) + }); + + if let Some((source_field, source_index)) = name_mapped_column { + if source_field.data_type().equals_datatype(target_type) { + ColumnSource::PassThrough { source_index } + } else { + ColumnSource::Promote { + target_type: target_type.clone(), + source_index, + } + } + } else { + // Rules #3 and #4: initial_default or null + let default_value = + iceberg_field.initial_default.as_ref().and_then(|lit| { + if let Literal::Primitive(prim) = lit { + Some(prim.clone()) + } else { + None + } + }); + ColumnSource::Add { + value: default_value, + target_type: target_type.clone(), + } + } }; - ColumnSource::Add { - value: default_value, - target_type: target_type.clone(), - } + Ok(column_source) }) - }).collect() + .collect() } fn build_field_id_to_arrow_schema_map( @@ -328,30 +499,50 @@ impl RecordBatchTransformer { ) -> Result> { let mut field_id_to_source_schema = HashMap::new(); for (source_field_idx, source_field) in source_schema.fields.iter().enumerate() { - let this_field_id = source_field - .metadata() - .get(PARQUET_FIELD_ID_META_KEY) - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - "field ID not present in parquet metadata", - ) - })? - .parse() - .map_err(|e| { + // Check if field has a field ID in metadata + if let Some(field_id_str) = source_field.metadata().get(PARQUET_FIELD_ID_META_KEY) { + let this_field_id = field_id_str.parse().map_err(|e| { Error::new( ErrorKind::DataInvalid, - format!("field id not parseable as an i32: {e}"), + format!("field id not parseable as an i32: {}", e), ) })?; - field_id_to_source_schema - .insert(this_field_id, (source_field.clone(), source_field_idx)); + field_id_to_source_schema + .insert(this_field_id, (source_field.clone(), source_field_idx)); + } + // If field doesn't have a field ID, skip it - name mapping will handle it } Ok(field_id_to_source_schema) } + /// Build a map from field name to (FieldRef, index) for name-based column resolution. + /// + /// This is used for Iceberg spec rule #2: "Use schema.name-mapping.default metadata + /// to map field id to columns without field id as described below and use the column + /// if it is present." + /// + /// Unlike `build_field_id_to_arrow_schema_map`, this method handles Parquet files + /// that may not have field IDs in their metadata. It builds a simple name-based index + /// to enable column resolution by name when field IDs are missing or conflicting. + /// + /// # References + /// - Iceberg spec: format/spec.md "Column Projection" section, rule #2 + /// - Java impl: ParquetSchemaUtil.applyNameMapping() + ReadConf constructor + fn build_field_name_to_arrow_schema_map( + source_schema: &SchemaRef, + ) -> HashMap { + let mut field_name_to_source_schema = HashMap::new(); + for (source_field_idx, source_field) in source_schema.fields.iter().enumerate() { + field_name_to_source_schema.insert( + source_field.name().to_string(), + (source_field.clone(), source_field_idx), + ); + } + field_name_to_source_schema + } + fn transform_columns( &self, columns: &[Arc], @@ -447,7 +638,7 @@ impl RecordBatchTransformer { (dt, _) => { return Err(Error::new( ErrorKind::Unexpected, - format!("unexpected target column type {dt}"), + format!("unexpected target column type {}", dt), )); } }) @@ -467,7 +658,7 @@ mod test { use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::record_batch_transformer::RecordBatchTransformer; - use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Type}; + use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Struct, Type}; #[test] fn build_field_id_to_source_schema_map_works() { @@ -696,4 +887,735 @@ mod test { value.to_string(), )])) } + + /// Test for add_files with Parquet files that have NO field IDs (Hive tables). + /// + /// This reproduces the scenario from Iceberg spec where: + /// - Hive-style partitioned Parquet files are imported via add_files procedure + /// - Parquet files DO NOT have field IDs (typical for Hive tables) + /// - Iceberg schema assigns field IDs: id (1), name (2), dept (3), subdept (4) + /// - Partition columns (id, dept) have initial_default values + /// + /// Per the Iceberg spec (https://iceberg.apache.org/spec/#column-projection), + /// this scenario requires `schema.name-mapping.default` from table metadata + /// to correctly map Parquet columns by name to Iceberg field IDs. + /// + /// Expected behavior with name mapping: + /// 1. id=1 (from initial_default) - spec rule #3 + /// 2. name="John Doe" (from Parquet via name mapping) - spec rule #2 + /// 3. dept="hr" (from initial_default) - spec rule #3 + /// 4. subdept="communications" (from Parquet via name mapping) - spec rule #2 + #[test] + fn add_files_partition_columns_without_field_ids() { + // Iceberg schema after add_files: id (partition), name, dept (partition), subdept + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)) + .with_initial_default(Literal::int(1)) + .into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(3, "dept", Type::Primitive(PrimitiveType::String)) + .with_initial_default(Literal::string("hr")) + .into(), + NestedField::optional(4, "subdept", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build() + .unwrap(), + ); + + // Parquet file schema: name, subdept (NO field IDs - typical for Hive tables) + // Note: Partition columns (id, dept) are NOT in the Parquet file - they're in directory paths + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("name", DataType::Utf8, true), + Field::new("subdept", DataType::Utf8, true), + ])); + + // Create name mapping to resolve field ID conflicts + // Per Iceberg spec: "Use schema.name-mapping.default metadata to map field id + // to columns without field id" + // + // The name mapping tells us: + // - Iceberg field ID 2 ("name") can be found in Parquet column "name" (even though Parquet has field_id=1) + // - Iceberg field ID 4 ("subdept") can be found in Parquet column "subdept" (even though Parquet has field_id=2) + use crate::spec::{MappedField, NameMapping}; + let name_mapping = Arc::new(NameMapping::new(vec![ + MappedField::new(Some(2), vec!["name".to_string()], vec![]), + MappedField::new(Some(4), vec!["subdept".to_string()], vec![]), + ])); + + let projected_field_ids = [1, 2, 3, 4]; // id, name, dept, subdept + + let mut transformer = RecordBatchTransformer::build_with_partition_data( + snapshot_schema, + &projected_field_ids, + None, + None, + Some(name_mapping), + ); + + // Create a Parquet RecordBatch with data for: name="John Doe", subdept="communications" + let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ + Arc::new(StringArray::from(vec!["John Doe"])), + Arc::new(StringArray::from(vec!["communications"])), + ]) + .unwrap(); + + let result = transformer.process_record_batch(parquet_batch).unwrap(); + + // Verify the transformed RecordBatch has: + // - id=1 (from initial_default, not from Parquet) + // - name="John Doe" (from Parquet, matched by name despite field ID conflict) + // - dept="hr" (from initial_default, not from Parquet) + // - subdept="communications" (from Parquet, matched by name) + assert_eq!(result.num_columns(), 4); + assert_eq!(result.num_rows(), 1); + + let id_column = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_column.value(0), 1); + + let name_column = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(name_column.value(0), "John Doe"); + + let dept_column = result + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(dept_column.value(0), "hr"); + + let subdept_column = result + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(subdept_column.value(0), "communications"); + } + + /// Test for bucket partitioning where source columns must be read from data files. + /// + /// This test verifies correct implementation of the Iceberg spec's "Column Projection" rules: + /// > "Return the value from partition metadata if an **Identity Transform** exists for the field" + /// + /// # Why this test is critical + /// + /// The key insight is that partition metadata stores TRANSFORMED values, not source values: + /// - For `bucket(4, id)`, partition metadata has `id_bucket = 2` (the bucket number) + /// - The actual `id` column values (100, 200, 300) are ONLY in the data file + /// + /// If iceberg-rust incorrectly treated bucket-partitioned fields as constants, it would: + /// 1. Replace all `id` values with the constant `2` from partition metadata + /// 2. Break runtime filtering (e.g., `WHERE id = 100` would match no rows) + /// 3. Return incorrect query results + /// + /// # What this test verifies + /// + /// - Bucket-partitioned fields (e.g., `bucket(4, id)`) are read from the data file + /// - The source column `id` contains actual values (100, 200, 300), not constants + /// - Java's `PartitionUtil.constantsMap()` behavior is correctly replicated: + /// ```java + /// if (field.transform().isIdentity()) { // FALSE for bucket transforms + /// idToConstant.put(field.sourceId(), converted); + /// } + /// ``` + /// + /// # Real-world impact + /// + /// This reproduces the failure scenario from Iceberg Java's TestRuntimeFiltering: + /// - Tables partitioned by `bucket(N, col)` are common for load balancing + /// - Queries filter on the source column: `SELECT * FROM tbl WHERE col = value` + /// - Runtime filtering pushes predicates down to Iceberg file scans + /// - Without this fix, the filter would match against constant partition values instead of data + /// + /// # References + /// - Iceberg spec: format/spec.md "Column Projection" + "Partition Transforms" + /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java + /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java + #[test] + fn bucket_partitioning_reads_source_column_from_file() { + use crate::spec::{Struct, Transform}; + + // Table schema: id (data column), name (data column), id_bucket (partition column) + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + // Partition spec: bucket(4, id) - the id field is bucketed + let partition_spec = Arc::new( + crate::spec::PartitionSpec::builder(snapshot_schema.clone()) + .with_spec_id(0) + .add_partition_field("id", "id_bucket", Transform::Bucket(4)) + .unwrap() + .build() + .unwrap(), + ); + + // Partition data: bucket value is 2 + // In Iceberg, partition data is a Struct where each field corresponds to a partition field + let partition_data = Struct::from_iter(vec![Some(Literal::int(2))]); + + // Parquet file contains both id and name columns + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + simple_field("id", DataType::Int32, false, "1"), + simple_field("name", DataType::Utf8, true, "2"), + ])); + + let projected_field_ids = [1, 2]; // id, name + + let mut transformer = RecordBatchTransformer::build_with_partition_data( + snapshot_schema, + &projected_field_ids, + Some(partition_spec), + Some(partition_data), + None, + ); + + // Create a Parquet RecordBatch with actual data + // The id column MUST be read from here, not treated as a constant + let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ + Arc::new(Int32Array::from(vec![100, 200, 300])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + ]) + .unwrap(); + + let result = transformer.process_record_batch(parquet_batch).unwrap(); + + // Verify the transformed RecordBatch correctly reads id from the file + // (NOT as a constant from partition metadata) + assert_eq!(result.num_columns(), 2); + assert_eq!(result.num_rows(), 3); + + let id_column = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + // These values MUST come from the Parquet file, not be replaced by constants + assert_eq!(id_column.value(0), 100); + assert_eq!(id_column.value(1), 200); + assert_eq!(id_column.value(2), 300); + + let name_column = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(name_column.value(0), "Alice"); + assert_eq!(name_column.value(1), "Bob"); + assert_eq!(name_column.value(2), "Charlie"); + } + + /// Test that identity-transformed partition fields ARE treated as constants. + /// + /// This is the complement to `bucket_partitioning_reads_source_column_from_file`, + /// verifying that constants_map() correctly identifies identity-transformed + /// partition fields per the Iceberg spec. + /// + /// # Spec requirement (format/spec.md "Column Projection") + /// + /// > "Return the value from partition metadata if an Identity Transform exists for the field + /// > and the partition value is present in the `partition` struct on `data_file` object + /// > in the manifest. This allows for metadata only migrations of Hive tables." + /// + /// # Why identity transforms use constants + /// + /// Unlike bucket/truncate/year/etc., identity transforms don't modify the value: + /// - `identity(dept)` stores the actual `dept` value in partition metadata + /// - Partition metadata has `dept = "engineering"` (the real value, not a hash/bucket) + /// - This value can be used directly without reading the data file + /// + /// # Performance benefit + /// + /// For Hive migrations where partition columns aren't in data files: + /// - Partition metadata provides the column values + /// - No need to read from data files (metadata-only query optimization) + /// - Common pattern: `dept=engineering/subdept=backend/file.parquet` + /// - `dept` and `subdept` are in directory structure, not in `file.parquet` + /// - Iceberg populates these from partition metadata as constants + /// + /// # What this test verifies + /// + /// - Identity-partitioned fields use constants from partition metadata + /// - The `dept` column is populated with `"engineering"` (not read from file) + /// - Java's `PartitionUtil.constantsMap()` behavior is matched: + /// ```java + /// if (field.transform().isIdentity()) { // TRUE for identity + /// idToConstant.put(field.sourceId(), converted); + /// } + /// ``` + /// + /// # References + /// - Iceberg spec: format/spec.md "Column Projection" + /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java + #[test] + fn identity_partition_uses_constant_from_metadata() { + use crate::spec::{Struct, Transform}; + + // Table schema: id (data column), dept (partition column), name (data column) + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "dept", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(3, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + // Partition spec: identity(dept) - the dept field uses identity transform + let partition_spec = Arc::new( + crate::spec::PartitionSpec::builder(snapshot_schema.clone()) + .with_spec_id(0) + .add_partition_field("dept", "dept", Transform::Identity) + .unwrap() + .build() + .unwrap(), + ); + + // Partition data: dept="engineering" + let partition_data = Struct::from_iter(vec![Some(Literal::string("engineering"))]); + + // Parquet file contains only id and name (dept is in partition path) + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + simple_field("id", DataType::Int32, false, "1"), + simple_field("name", DataType::Utf8, true, "3"), + ])); + + let projected_field_ids = [1, 2, 3]; // id, dept, name + + let mut transformer = RecordBatchTransformer::build_with_partition_data( + snapshot_schema, + &projected_field_ids, + Some(partition_spec), + Some(partition_data), + None, + ); + + let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ + Arc::new(Int32Array::from(vec![100, 200])), + Arc::new(StringArray::from(vec!["Alice", "Bob"])), + ]) + .unwrap(); + + let result = transformer.process_record_batch(parquet_batch).unwrap(); + + // Verify the dept column is populated with the constant from partition metadata + assert_eq!(result.num_columns(), 3); + assert_eq!(result.num_rows(), 2); + + let id_column = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_column.value(0), 100); + assert_eq!(id_column.value(1), 200); + + let dept_column = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + // This value MUST come from partition metadata (constant) + assert_eq!(dept_column.value(0), "engineering"); + assert_eq!(dept_column.value(1), "engineering"); + + let name_column = result + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(name_column.value(0), "Alice"); + assert_eq!(name_column.value(1), "Bob"); + } + + /// Test bucket partitioning with renamed source column. + /// + /// This verifies correct behavior for TestRuntimeFiltering.testRenamedSourceColumnTable() in Iceberg Java. + /// When a source column is renamed after partitioning is established, field-ID-based mapping + /// must still correctly identify the column in Parquet files. + /// + /// # Scenario + /// + /// 1. Table created with `bucket(4, id)` partitioning + /// 2. Data written to Parquet files (field_id=1, name="id") + /// 3. Column renamed: `ALTER TABLE ... RENAME COLUMN id TO row_id` + /// 4. Iceberg schema now has: field_id=1, name="row_id" + /// 5. Parquet files still have: field_id=1, name="id" + /// + /// # Expected Behavior Per Iceberg Spec + /// + /// Per the Iceberg spec "Column Projection" section and Java's PartitionUtil.constantsMap(): + /// - Bucket transforms are NON-identity, so partition metadata stores bucket numbers (0-3), not source values + /// - Source columns for non-identity transforms MUST be read from data files + /// - Field-ID-based mapping should find the column by field_id=1 (ignoring name mismatch) + /// - Runtime filtering on `row_id` should work correctly + /// + /// # What This Tests + /// + /// This test ensures that when FileScanTask provides partition_spec and partition_data: + /// - constants_map() correctly identifies that bucket(4, row_id) is NOT an identity transform + /// - The source column (field_id=1) is NOT added to constants_map + /// - Field-ID-based mapping reads actual values from the Parquet file + /// - Values [100, 200, 300] are read, not replaced with bucket constant 2 + /// + /// # References + /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java::testRenamedSourceColumnTable + /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java::constantsMap() + /// - Iceberg spec: format/spec.md "Column Projection" section + #[test] + fn test_bucket_partitioning_with_renamed_source_column() { + use crate::spec::{Struct, Transform}; + + // Iceberg schema after rename: row_id (was id), name + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "row_id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + // Partition spec: bucket(4, row_id) - but source_id still points to field_id=1 + let partition_spec = Arc::new( + crate::spec::PartitionSpec::builder(snapshot_schema.clone()) + .with_spec_id(0) + .add_partition_field("row_id", "row_id_bucket", Transform::Bucket(4)) + .unwrap() + .build() + .unwrap(), + ); + + // Partition data: bucket value is 2 + let partition_data = Struct::from_iter(vec![Some(Literal::int(2))]); + + // Parquet file has OLD column name "id" but SAME field_id=1 + // Field-ID-based mapping should find this despite name mismatch + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + simple_field("id", DataType::Int32, false, "1"), + simple_field("name", DataType::Utf8, true, "2"), + ])); + + let projected_field_ids = [1, 2]; // row_id (field_id=1), name (field_id=2) + + let mut transformer = RecordBatchTransformer::build_with_partition_data( + snapshot_schema, + &projected_field_ids, + Some(partition_spec), + Some(partition_data), + None, + ); + + // Create a Parquet RecordBatch with actual data + // Despite column rename, data should be read via field_id=1 + let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ + Arc::new(Int32Array::from(vec![100, 200, 300])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + ]) + .unwrap(); + + let result = transformer.process_record_batch(parquet_batch).unwrap(); + + // Verify the transformed RecordBatch correctly reads data despite name mismatch + assert_eq!(result.num_columns(), 2); + assert_eq!(result.num_rows(), 3); + + let row_id_column = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + // These values MUST come from the Parquet file via field_id=1, + // not be replaced by the bucket constant (2) + assert_eq!(row_id_column.value(0), 100); + assert_eq!(row_id_column.value(1), 200); + assert_eq!(row_id_column.value(2), 300); + + let name_column = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(name_column.value(0), "Alice"); + assert_eq!(name_column.value(1), "Bob"); + assert_eq!(name_column.value(2), "Charlie"); + } + + /// Comprehensive integration test that verifies all 4 Iceberg spec rules work correctly. + /// + /// Per the Iceberg spec (https://iceberg.apache.org/spec/#column-projection), + /// "Values for field ids which are not present in a data file must be resolved + /// according the following rules:" + /// + /// This test creates a scenario where each rule is exercised: + /// - Rule #1: dept (identity-partitioned) -> constant from partition metadata + /// - Rule #2: data (via name mapping) -> read from Parquet file by name + /// - Rule #3: category (initial_default) -> use default value + /// - Rule #4: notes (no default) -> return null + /// + /// # References + /// - Iceberg spec: format/spec.md "Column Projection" section + #[test] + fn test_all_four_spec_rules() { + use crate::spec::{MappedField, NameMapping, Transform}; + + // Iceberg schema with columns designed to exercise each spec rule + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + // Field in Parquet by field ID (normal case) + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + // Rule #1: Identity-partitioned field - should use partition metadata + NestedField::required(2, "dept", Type::Primitive(PrimitiveType::String)).into(), + // Rule #2: Field resolved by name mapping (no field ID in Parquet) + NestedField::required(3, "data", Type::Primitive(PrimitiveType::String)).into(), + // Rule #3: Field with initial_default + NestedField::optional(4, "category", Type::Primitive(PrimitiveType::String)) + .with_initial_default(Literal::string("default_category")) + .into(), + // Rule #4: Field with no default - should be null + NestedField::optional(5, "notes", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build() + .unwrap(), + ); + + // Partition spec: identity transform on dept + let partition_spec = Arc::new( + crate::spec::PartitionSpec::builder(snapshot_schema.clone()) + .with_spec_id(0) + .add_partition_field("dept", "dept", Transform::Identity) + .unwrap() + .build() + .unwrap(), + ); + + // Partition data: dept="engineering" + let partition_data = Struct::from_iter(vec![Some(Literal::string("engineering"))]); + + // Parquet schema: has id (with field_id=1) and data (without field ID) + // Missing: dept (in partition), category (has default), notes (no default) + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + simple_field("id", DataType::Int32, false, "1"), + Field::new("data", DataType::Utf8, false), // No field ID - needs name mapping + ])); + + // Name mapping: maps field ID 3 to "data" column + let name_mapping = Arc::new(NameMapping::new(vec![MappedField::new( + Some(3), + vec!["data".to_string()], + vec![], + )])); + + let projected_field_ids = [1, 2, 3, 4, 5]; // id, dept, data, category, notes + + let mut transformer = RecordBatchTransformer::build_with_partition_data( + snapshot_schema, + &projected_field_ids, + Some(partition_spec), + Some(partition_data), + Some(name_mapping), + ); + + let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ + Arc::new(Int32Array::from(vec![100, 200])), + Arc::new(StringArray::from(vec!["value1", "value2"])), + ]) + .unwrap(); + + let result = transformer.process_record_batch(parquet_batch).unwrap(); + + assert_eq!(result.num_columns(), 5); + assert_eq!(result.num_rows(), 2); + + // Verify each column demonstrates the correct spec rule: + + // Normal case: id from Parquet by field ID + let id_column = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_column.value(0), 100); + assert_eq!(id_column.value(1), 200); + + // Rule #1: dept from partition metadata (identity transform) + let dept_column = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(dept_column.value(0), "engineering"); + assert_eq!(dept_column.value(1), "engineering"); + + // Rule #2: data from Parquet via name mapping + let data_column = result + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(data_column.value(0), "value1"); + assert_eq!(data_column.value(1), "value2"); + + // Rule #3: category from initial_default + let category_column = result + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(category_column.value(0), "default_category"); + assert_eq!(category_column.value(1), "default_category"); + + // Rule #4: notes is null (no default, not in Parquet, not in partition) + let notes_column = result + .column(4) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(notes_column.is_null(0)); + assert!(notes_column.is_null(1)); + } + + /// Verifies field ID conflict detection for add_files imports. + /// + /// Why: add_files can import Parquet with conflicting field IDs (field_id=1->"name" in Parquet + /// vs field_id=1->"id" in Iceberg). Name-checking detects conflicts, treats fields as "not present", + /// allowing spec fallback to partition constants and name mapping. + /// + /// Reproduces: TestAddFilesProcedure.addDataPartitioned + #[test] + fn add_files_with_field_id_conflicts_like_java_test() { + use crate::spec::{MappedField, NameMapping, Struct, Transform}; + + // Iceberg schema (field IDs 1-4) + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "dept", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(4, "subdept", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build() + .unwrap(), + ); + + let partition_spec = Arc::new( + crate::spec::PartitionSpec::builder(snapshot_schema.clone()) + .with_spec_id(0) + .add_partition_field("id", "id", Transform::Identity) + .unwrap() + .build() + .unwrap(), + ); + + let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]); + + // Parquet schema with CONFLICTING field IDs (1-3 instead of 1-4) + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + simple_field("name", DataType::Utf8, false, "1"), + simple_field("dept", DataType::Utf8, false, "2"), + simple_field("subdept", DataType::Utf8, false, "3"), + ])); + + // WHY name mapping: Resolves conflicts by mapping Iceberg field IDs to Parquet column names + let name_mapping = Arc::new(NameMapping::new(vec![ + MappedField::new(Some(2), vec!["name".to_string()], vec![]), + MappedField::new(Some(3), vec!["dept".to_string()], vec![]), + MappedField::new(Some(4), vec!["subdept".to_string()], vec![]), + ])); + + let projected_field_ids = [1, 2, 3, 4]; // id, name, dept, subdept + + let mut transformer = RecordBatchTransformer::build_with_partition_data( + snapshot_schema, + &projected_field_ids, + Some(partition_spec), + Some(partition_data), + Some(name_mapping), + ); + + // Parquet data (one row) + let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ + Arc::new(StringArray::from(vec!["John Doe"])), // name column + Arc::new(StringArray::from(vec!["Engineering"])), // dept column + Arc::new(StringArray::from(vec!["Backend"])), // subdept column + ]) + .unwrap(); + + let result = transformer.process_record_batch(parquet_batch).unwrap(); + + assert_eq!(result.num_columns(), 4); + assert_eq!(result.num_rows(), 1); + + // Verify: id from partition constant (conflict with Parquet field_id=1) + assert_eq!( + result + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + 1 + ); + + // Verify: name via name mapping (conflict with Parquet field_id=2) + assert_eq!( + result + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + "John Doe" + ); + + // Verify: dept via name mapping (conflict with Parquet field_id=3) + assert_eq!( + result + .column(2) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + "Engineering" + ); + + // Verify: subdept via name mapping (not in Parquet by field ID) + assert_eq!( + result + .column(3) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + "Backend" + ); + } } diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 3f7c29dbf4..fe3f5c8f7e 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -128,6 +128,13 @@ impl ManifestEntryContext { .map(|x| x.as_ref().snapshot_bound_predicate.clone()), deletes, + + // Include partition data and spec from manifest entry + partition: Some(self.manifest_entry.data_file.partition.clone()), + // TODO: Pass actual PartitionSpec through context chain for native flow + partition_spec: None, + // TODO: Extract name_mapping from table metadata property "schema.name-mapping.default" + name_mapping: None, }) } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 6884e00b9b..3e319ca062 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -1777,6 +1777,9 @@ pub mod tests { record_count: Some(100), data_file_format: DataFileFormat::Parquet, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, }; test_fn(task); @@ -1791,6 +1794,9 @@ pub mod tests { record_count: None, data_file_format: DataFileFormat::Avro, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, }; test_fn(task); } diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index 32fe3ae309..e1ef241a57 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -15,16 +15,39 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use futures::stream::BoxStream; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Serialize, Serializer}; use crate::Result; use crate::expr::BoundPredicate; -use crate::spec::{DataContentType, DataFileFormat, ManifestEntryRef, Schema, SchemaRef}; +use crate::spec::{ + DataContentType, DataFileFormat, ManifestEntryRef, NameMapping, PartitionSpec, Schema, + SchemaRef, Struct, +}; /// A stream of [`FileScanTask`]. pub type FileScanTaskStream = BoxStream<'static, Result>; +/// Serialization helper that always returns NotImplementedError. +/// Used for fields that should not be serialized but we want to be explicit about it. +fn serialize_not_implemented(_: &T, _: S) -> std::result::Result +where S: Serializer { + Err(serde::ser::Error::custom( + "Serialization not implemented for this field", + )) +} + +/// Deserialization helper that always returns NotImplementedError. +/// Used for fields that should not be deserialized but we want to be explicit about it. +fn deserialize_not_implemented<'de, D, T>(_: D) -> std::result::Result +where D: serde::Deserializer<'de> { + Err(serde::de::Error::custom( + "Deserialization not implemented for this field", + )) +} + /// A task to scan part of file. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FileScanTask { @@ -54,6 +77,33 @@ pub struct FileScanTask { /// The list of delete files that may need to be applied to this data file pub deletes: Vec, + + /// Partition data from the manifest entry, used to identify which columns can use + /// constant values from partition metadata vs. reading from the data file. + /// Per the Iceberg spec, only identity-transformed partition fields should use constants. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(serialize_with = "serialize_not_implemented")] + #[serde(deserialize_with = "deserialize_not_implemented")] + pub partition: Option, + + /// The partition spec for this file, used to distinguish identity transforms + /// (which use partition metadata constants) from non-identity transforms like + /// bucket/truncate (which must read source columns from the data file). + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(serialize_with = "serialize_not_implemented")] + #[serde(deserialize_with = "deserialize_not_implemented")] + pub partition_spec: Option>, + + /// Name mapping from table metadata (property: schema.name-mapping.default), + /// used to resolve field IDs from column names when Parquet files lack field IDs + /// or have field ID conflicts. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(serialize_with = "serialize_not_implemented")] + #[serde(deserialize_with = "deserialize_not_implemented")] + pub name_mapping: Option>, } impl FileScanTask {