Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 183 additions & 1 deletion crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use parquet::file::metadata::{
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};

use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
use crate::arrow::record_batch_projector::RecordBatchProjector;
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::delete_vector::DeleteVector;
Expand Down Expand Up @@ -274,6 +275,54 @@ impl ArrowReader {
record_batch_stream_builder =
record_batch_stream_builder.with_projection(projection_mask.clone());

let has_nested_fields = task
.project_field_ids
.iter()
.filter(|id| !is_metadata_field(**id))
.any(|id| task.schema.as_struct().field_by_id(*id).is_none());
let projector = if has_nested_fields {
let projected_arrow_schema = record_batch_stream_builder.schema();
let projected_iceberg_schema = arrow_schema_to_schema(projected_arrow_schema)?;
let available_field_ids: HashSet<i32> = projected_iceberg_schema
.field_id_to_name_map()
.keys()
.copied()
.collect();
let projectable_field_ids = task
.project_field_ids
.iter()
.copied()
.filter(|id| available_field_ids.contains(id))
.collect::<Vec<_>>();
if projectable_field_ids.is_empty() {
None
} else {
Some(RecordBatchProjector::new(
projected_arrow_schema.clone(),
&projectable_field_ids,
|field| {
field
.metadata()
.get(PARQUET_FIELD_ID_META_KEY)
.map(|value| {
value.parse::<i64>().map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
"field id not parseable as an i64".to_string(),
)
.with_context("value", value)
.with_source(e)
})
})
.transpose()
},
|_| true,
)?)
}
} else {
None
};

// RecordBatchTransformer performs any transformations required on the RecordBatches
// that come back from the file, such as type promotion, default column insertion,
// column re-ordering, partition constants, and virtual field addition (like _file)
Expand Down Expand Up @@ -428,11 +477,15 @@ impl ArrowReader {

// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let mut projector = projector.clone();
let record_batch_stream =
record_batch_stream_builder
.build()?
.map(move |batch| match batch {
Ok(batch) => {
Ok(mut batch) => {
if let Some(projector) = &mut projector {
batch = projector.project_batch(batch)?;
}
// Process the record batch (type promotion, column reordering, virtual fields, etc.)
record_batch_transformer.process_record_batch(batch)
}
Expand Down Expand Up @@ -4052,4 +4105,133 @@ message schema {
assert_eq!(name_col.value(2), "Charlie");
assert_eq!(name_col.value(3), "Dave");
}

#[tokio::test]
async fn test_read_nested_parquet_column() {
use arrow_array::{Int32Array, StructArray};

let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();

let nested_schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![1])
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(
2,
"person",
Type::Struct(crate::spec::StructType::new(vec![
NestedField::required(
3,
"name",
Type::Primitive(PrimitiveType::String),
)
.into(),
NestedField::optional(4, "age", Type::Primitive(PrimitiveType::Int))
.into(),
])),
)
.into(),
])
.build()
.unwrap(),
);

let inner_fields = vec![
Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"3".to_string(),
)])),
Field::new("age", DataType::Int32, true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"4".to_string(),
)])),
];

let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
Field::new(
"person",
DataType::Struct(arrow_schema::Fields::from(inner_fields.clone())),
true,
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)])),
]));

let id_array = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
let name_array = Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])) as ArrayRef;
let age_array = Arc::new(Int32Array::from(vec![Some(30), Some(25), None])) as ArrayRef;

let struct_array = Arc::new(StructArray::from(vec![
(Arc::new(inner_fields[0].clone()), name_array),
(Arc::new(inner_fields[1].clone()), age_array),
])) as ArrayRef;

let to_write =
RecordBatch::try_new(arrow_schema.clone(), vec![id_array, struct_array]).unwrap();

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();

let file_path = format!("{table_location}/nested.parquet");
let file = File::create(&file_path).unwrap();
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();

let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
let reader = ArrowReaderBuilder::new(file_io).build();

let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: file_path,
data_file_format: DataFileFormat::Parquet,
schema: nested_schema.clone(),
project_field_ids: vec![1, 3],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;

let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

assert_eq!(result.len(), 1);
let batch = &result[0];
assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 3);

let id_col = batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(id_col.value(0), 1);
assert_eq!(id_col.value(1), 2);
assert_eq!(id_col.value(2), 3);

let name_col = batch.column(1).as_string::<i32>();
assert_eq!(name_col.value(0), "Alice");
assert_eq!(name_col.value(1), "Bob");
assert_eq!(name_col.value(2), "Charlie");
}
}
40 changes: 28 additions & 12 deletions crates/iceberg/src/arrow/record_batch_transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,22 +573,38 @@ impl RecordBatchTransformer {
) -> Result<HashMap<i32, (FieldRef, usize)>> {
let mut field_id_to_source_schema = HashMap::new();
for (source_field_idx, source_field) in source_schema.fields.iter().enumerate() {
// Check if field has a field ID in metadata
if let Some(field_id_str) = source_field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
let this_field_id = field_id_str.parse().map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
format!("field id not parseable as an i32: {e}"),
)
})?;
Self::collect_field_ids_recursive(
source_field,
source_field_idx,
&mut field_id_to_source_schema,
)?;
}

Ok(field_id_to_source_schema)
}

fn collect_field_ids_recursive(
field: &FieldRef,
top_level_idx: usize,
map: &mut HashMap<i32, (FieldRef, usize)>,
) -> Result<()> {
if let Some(field_id_str) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
let field_id = field_id_str.parse().map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
format!("field id not parseable as an i32: {e}"),
)
})?;
map.insert(field_id, (field.clone(), top_level_idx));
}

field_id_to_source_schema
.insert(this_field_id, (source_field.clone(), source_field_idx));
if let DataType::Struct(inner_fields) = field.data_type() {
for inner_field in inner_fields.iter() {
Self::collect_field_ids_recursive(inner_field, top_level_idx, map)?;
}
// If field doesn't have a field ID, skip it - name mapping will handle it
}

Ok(field_id_to_source_schema)
Ok(())
}

fn transform_columns(
Expand Down
12 changes: 0 additions & 12 deletions crates/iceberg/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,18 +258,6 @@ impl<'a> TableScanBuilder<'a> {
)
})?;

schema
.as_struct()
.field_by_id(field_id)
.ok_or_else(|| {
Error::new(
ErrorKind::FeatureUnsupported,
format!(
"Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}"
),
)
})?;

field_ids.push(field_id);
}

Expand Down
Loading