diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index f7f90663a5..4949cd9044 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -45,6 +45,7 @@ use parquet::file::metadata::{ use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; +use crate::arrow::record_batch_projector::RecordBatchProjector; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::delete_vector::DeleteVector; @@ -274,6 +275,54 @@ impl ArrowReader { record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask.clone()); + let has_nested_fields = task + .project_field_ids + .iter() + .filter(|id| !is_metadata_field(**id)) + .any(|id| task.schema.as_struct().field_by_id(*id).is_none()); + let projector = if has_nested_fields { + let projected_arrow_schema = record_batch_stream_builder.schema(); + let projected_iceberg_schema = arrow_schema_to_schema(projected_arrow_schema)?; + let available_field_ids: HashSet = projected_iceberg_schema + .field_id_to_name_map() + .keys() + .copied() + .collect(); + let projectable_field_ids = task + .project_field_ids + .iter() + .copied() + .filter(|id| available_field_ids.contains(id)) + .collect::>(); + if projectable_field_ids.is_empty() { + None + } else { + Some(RecordBatchProjector::new( + projected_arrow_schema.clone(), + &projectable_field_ids, + |field| { + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .map(|value| { + value.parse::().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "field id not parseable as an i64".to_string(), + ) + .with_context("value", value) + .with_source(e) + }) + }) + .transpose() + }, + |_| true, + )?) + } + } else { + None + }; + // RecordBatchTransformer performs any transformations required on the RecordBatches // that come back from the file, such as type promotion, default column insertion, // column re-ordering, partition constants, and virtual field addition (like _file) @@ -428,11 +477,15 @@ impl ArrowReader { // Build the batch stream and send all the RecordBatches that it generates // to the requester. + let mut projector = projector.clone(); let record_batch_stream = record_batch_stream_builder .build()? .map(move |batch| match batch { - Ok(batch) => { + Ok(mut batch) => { + if let Some(projector) = &mut projector { + batch = projector.project_batch(batch)?; + } // Process the record batch (type promotion, column reordering, virtual fields, etc.) record_batch_transformer.process_record_batch(batch) } @@ -4052,4 +4105,133 @@ message schema { assert_eq!(name_col.value(2), "Charlie"); assert_eq!(name_col.value(3), "Dave"); } + + #[tokio::test] + async fn test_read_nested_parquet_column() { + use arrow_array::{Int32Array, StructArray}; + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + + let nested_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional( + 2, + "person", + Type::Struct(crate::spec::StructType::new(vec![ + NestedField::required( + 3, + "name", + Type::Primitive(PrimitiveType::String), + ) + .into(), + NestedField::optional(4, "age", Type::Primitive(PrimitiveType::Int)) + .into(), + ])), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + let inner_fields = vec![ + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + Field::new("age", DataType::Int32, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "4".to_string(), + )])), + ]; + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new( + "person", + DataType::Struct(arrow_schema::Fields::from(inner_fields.clone())), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ])); + + let id_array = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let name_array = Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])) as ArrayRef; + let age_array = Arc::new(Int32Array::from(vec![Some(30), Some(25), None])) as ArrayRef; + + let struct_array = Arc::new(StructArray::from(vec![ + (Arc::new(inner_fields[0].clone()), name_array), + (Arc::new(inner_fields[1].clone()), age_array), + ])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![id_array, struct_array]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file_path = format!("{table_location}/nested.parquet"); + let file = File::create(&file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: file_path, + data_file_format: DataFileFormat::Parquet, + schema: nested_schema.clone(), + project_field_ids: vec![1, 3], + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 3); + + let id_col = batch + .column(0) + .as_primitive::(); + assert_eq!(id_col.value(0), 1); + assert_eq!(id_col.value(1), 2); + assert_eq!(id_col.value(2), 3); + + let name_col = batch.column(1).as_string::(); + assert_eq!(name_col.value(0), "Alice"); + assert_eq!(name_col.value(1), "Bob"); + assert_eq!(name_col.value(2), "Charlie"); + } } diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 439358435c..28fc3fefa4 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -573,22 +573,38 @@ impl RecordBatchTransformer { ) -> Result> { let mut field_id_to_source_schema = HashMap::new(); for (source_field_idx, source_field) in source_schema.fields.iter().enumerate() { - // Check if field has a field ID in metadata - if let Some(field_id_str) = source_field.metadata().get(PARQUET_FIELD_ID_META_KEY) { - let this_field_id = field_id_str.parse().map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - format!("field id not parseable as an i32: {e}"), - ) - })?; + Self::collect_field_ids_recursive( + source_field, + source_field_idx, + &mut field_id_to_source_schema, + )?; + } + + Ok(field_id_to_source_schema) + } + + fn collect_field_ids_recursive( + field: &FieldRef, + top_level_idx: usize, + map: &mut HashMap, + ) -> Result<()> { + if let Some(field_id_str) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) { + let field_id = field_id_str.parse().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("field id not parseable as an i32: {e}"), + ) + })?; + map.insert(field_id, (field.clone(), top_level_idx)); + } - field_id_to_source_schema - .insert(this_field_id, (source_field.clone(), source_field_idx)); + if let DataType::Struct(inner_fields) = field.data_type() { + for inner_field in inner_fields.iter() { + Self::collect_field_ids_recursive(inner_field, top_level_idx, map)?; } - // If field doesn't have a field ID, skip it - name mapping will handle it } - Ok(field_id_to_source_schema) + Ok(()) } fn transform_columns( diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index c055c12c9a..e67f6be78b 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -258,18 +258,6 @@ impl<'a> TableScanBuilder<'a> { ) })?; - schema - .as_struct() - .field_by_id(field_id) - .ok_or_else(|| { - Error::new( - ErrorKind::FeatureUnsupported, - format!( - "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}" - ), - ) - })?; - field_ids.push(field_id); }