Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet-variant-compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ rust-version = { workspace = true }


[dependencies]
arrow = { workspace = true }
arrow = { workspace = true , features = ["canonical_extension_types"]}
arrow-schema = { workspace = true }
half = { version = "2.1", default-features = false }
indexmap = "2.10.0"
Expand Down
2 changes: 1 addition & 1 deletion parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ encryption = ["dep:ring"]
flate2-rust_backened = ["flate2/rust_backend"]
flate2-zlib-rs = ["flate2/zlib-rs"]
# Enable parquet variant support
variant_experimental = ["parquet-variant", "parquet-variant-json", "parquet-variant-compute"]
variant_experimental = ["arrow", "parquet-variant", "parquet-variant-json", "parquet-variant-compute"]


[[example]]
Expand Down
29 changes: 17 additions & 12 deletions parquet/src/arrow/schema/complex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use crate::arrow::schema::extension::add_extension_type;
use crate::arrow::schema::primitive::convert_primitive;
use crate::arrow::{ProjectionMask, PARQUET_FIELD_ID_META_KEY};
use crate::basic::{ConvertedType, Repetition};
Expand Down Expand Up @@ -172,7 +173,7 @@ impl Visitor {

let parquet_fields = struct_type.get_fields();

// Extract the arrow fields
// Extract any arrow fields from the hints
let arrow_fields = match &context.data_type {
Some(DataType::Struct(fields)) => {
if fields.len() != parquet_fields.len() {
Expand Down Expand Up @@ -220,10 +221,10 @@ impl Visitor {
data_type,
};

if let Some(child) = self.dispatch(parquet_field, child_ctx)? {
if let Some(mut child) = self.dispatch(parquet_field, child_ctx)? {
// The child type returned may be different from what is encoded in the arrow
// schema in the event of a mismatch or a projection
child_fields.push(convert_field(parquet_field, &child, arrow_field));
child_fields.push(convert_field(parquet_field, &mut child, arrow_field));
children.push(child);
}
}
Expand Down Expand Up @@ -352,13 +353,13 @@ impl Visitor {

// Need both columns to be projected
match (maybe_key, maybe_value) {
(Some(key), Some(value)) => {
(Some(mut key), Some(mut value)) => {
let key_field = Arc::new(
convert_field(map_key, &key, arrow_key)
convert_field(map_key, &mut key, arrow_key)
// The key is always non-nullable (#5630)
.with_nullable(false),
);
let value_field = Arc::new(convert_field(map_value, &value, arrow_value));
let value_field = Arc::new(convert_field(map_value, &mut value, arrow_value));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is somewhat confusing to me in that the arrow type is encoded twice -- once on a ParquetField (which has an arrow Field in it) and once in this ValueField.

Any help simplifing this would be most appreciated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't think I understand this part of the code well enough to suggest anything :(

let field_metadata = match arrow_map {
Some(field) => field.metadata().clone(),
_ => HashMap::default(),
Expand Down Expand Up @@ -495,8 +496,8 @@ impl Visitor {
};

match self.dispatch(item_type, new_context) {
Ok(Some(item)) => {
let item_field = Arc::new(convert_field(item_type, &item, arrow_field));
Ok(Some(mut item)) => {
let item_field = Arc::new(convert_field(item_type, &mut item, arrow_field));

// Use arrow type as hint for index size
let arrow_type = match context.data_type {
Expand Down Expand Up @@ -540,11 +541,15 @@ impl Visitor {
}
}

/// Computes the [`Field`] for a child column
/// Computes the Arrow [`Field`] for a child column
///
/// The resulting [`Field`] will have the type dictated by `field`, a name
/// The resulting Arrow [`Field`] will have the type dictated by the Parquet `field`, a name
/// dictated by the `parquet_type`, and any metadata from `arrow_hint`
fn convert_field(parquet_type: &Type, field: &ParquetField, arrow_hint: Option<&Field>) -> Field {
fn convert_field(
parquet_type: &Type,
field: &mut ParquetField,
arrow_hint: Option<&Field>,
) -> Field {
let name = parquet_type.name();
let data_type = field.arrow_type.clone();
let nullable = field.nullable;
Expand Down Expand Up @@ -575,7 +580,7 @@ fn convert_field(parquet_type: &Type, field: &ParquetField, arrow_hint: Option<&
);
ret.set_metadata(meta);
}
ret
add_extension_type(ret, parquet_type)
}
}
}
Expand Down
62 changes: 62 additions & 0 deletions parquet/src/arrow/schema/extension.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Arrow Extension Type Support for Parquet
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to consolidate the rest of the extension type handling in this module, to try and improve the current situation where #cfg(..) is sprinkled over the type conversion logic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

//!
//! This module contains mapping code to map Parquet [`LogicalType`]s to/from
//! Arrow [`ExtensionType`]s.
//!
//! Extension types are represented using the metadata from Arrow [`Field`]s
//! with the key "ARROW:extension:name".

use crate::basic::LogicalType;
use crate::schema::types::Type;
use arrow_schema::extension::ExtensionType;
use arrow_schema::Field;

/// Adds extension type metadata, if necessary, based on the Parquet field's
/// [`LogicalType`]
///
/// Some Parquet logical types, such as Variant, do not map directly to an
/// Arrow DataType, and instead are represented by an Arrow ExtensionType.
/// Extension types are attached to Arrow Fields via metadata.
pub(crate) fn add_extension_type(arrow_field: Field, parquet_type: &Type) -> Field {
let result = match parquet_type.get_basic_info().logical_type() {
#[cfg(feature = "variant_experimental")]
Some(LogicalType::Variant) => {
arrow_field.with_extension_type(parquet_variant_compute::VariantType)
}
// TODO add other LogicalTypes here
_ => arrow_field,
};
result
}

/// Return the Parquet logical type to use for the specified Arrow field, if any.
#[cfg(feature = "variant_experimental")]
pub(crate) fn logical_type_for_struct(field: &Field) -> Option<LogicalType> {
use parquet_variant_compute::VariantType;
if field.extension_type_name()? == VariantType::NAME {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried here about testing for the extension type using try_extension_type and then discarding any error via ok() -- creating an ArrowError requires allocating a string, so that pattern can be expensive (allocate and format a string just to throw it away)

@scovich also noticed this in the pathfinding PR here:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do both? Check the name (= quick and cheap) and only try_extension_type if the name matches? (if they asked for Variant and provided an incorrect data type, they shouldn't be surprised at the cost of allocating an error message)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an excellent call -- done in 68ffd32

return Some(LogicalType::Variant);
};
None
}

#[cfg(not(feature = "variant_experimental"))]
pub(crate) fn logical_type_for_struct(field: &Field) -> Option<LogicalType> {
None
}
12 changes: 9 additions & 3 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ use crate::file::{metadata::KeyValue, properties::WriterProperties};
use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};

mod complex;
mod extension;
mod primitive;

use super::PARQUET_FIELD_ID_META_KEY;
use crate::arrow::schema::extension::logical_type_for_struct;
use crate::arrow::ProjectionMask;
pub(crate) use complex::{ParquetField, ParquetFieldType};

use super::PARQUET_FIELD_ID_META_KEY;

/// Convert Parquet schema to Arrow schema including optional metadata
///
/// Attempts to decode any existing Arrow schema metadata, falling back
Expand All @@ -63,7 +64,11 @@ pub fn parquet_to_arrow_schema_by_columns(
Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0)
}

/// Extracts the arrow metadata
/// Determines the Arrow Schema from a Parquet schema
///
/// Looks for an Arrow schema metadata "hint" (see
/// [`parquet_to_arrow_field_levels`]), and uses it if present to ensure
/// lossless round trips.
pub(crate) fn parquet_to_arrow_schema_and_fields(
parquet_schema: &SchemaDescriptor,
mask: ProjectionMask,
Expand Down Expand Up @@ -728,6 +733,7 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
.with_fields(fields)
.with_repetition(repetition)
.with_id(id)
.with_logical_type(logical_type_for_struct(field))
.build()
}
DataType::Map(field, _) => {
Expand Down
Loading
Loading