Skip to content

Commit 76a7789

Browse files
authored
Refactor file schema type coercions (apache#15268)
* Refactor file schema type coercions * resolve comments * keep old api and add deprecated * resolve comments
1 parent 72705a3 commit 76a7789

File tree

2 files changed

+115
-16
lines changed

2 files changed

+115
-16
lines changed

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 112 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,114 @@ impl FileFormat for ParquetFormat {
465465
}
466466
}
467467

468+
/// Apply necessary schema type coercions to make file schema match table schema.
469+
///
470+
/// This function performs two main types of transformations in a single pass:
471+
/// 1. Binary types to string types conversion - Converts binary data types to their
472+
/// corresponding string types when the table schema expects string data
473+
/// 2. Regular to view types conversion - Converts standard string/binary types to
474+
/// view types when the table schema uses view types
475+
///
476+
/// # Arguments
477+
/// * `table_schema` - The table schema containing the desired types
478+
/// * `file_schema` - The file schema to be transformed
479+
///
480+
/// # Returns
481+
/// * `Some(Schema)` - If any transformations were applied, returns the transformed schema
482+
/// * `None` - If no transformations were needed
483+
pub fn apply_file_schema_type_coercions(
484+
table_schema: &Schema,
485+
file_schema: &Schema,
486+
) -> Option<Schema> {
487+
let mut needs_view_transform = false;
488+
let mut needs_string_transform = false;
489+
490+
// Create a mapping of table field names to their data types for fast lookup
491+
// and simultaneously check if we need any transformations
492+
let table_fields: HashMap<_, _> = table_schema
493+
.fields()
494+
.iter()
495+
.map(|f| {
496+
let dt = f.data_type();
497+
// Check if we need view type transformation
498+
if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
499+
needs_view_transform = true;
500+
}
501+
// Check if we need string type transformation
502+
if matches!(
503+
dt,
504+
&DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
505+
) {
506+
needs_string_transform = true;
507+
}
508+
509+
(f.name(), dt)
510+
})
511+
.collect();
512+
513+
// Early return if no transformation needed
514+
if !needs_view_transform && !needs_string_transform {
515+
return None;
516+
}
517+
518+
let transformed_fields: Vec<Arc<Field>> = file_schema
519+
.fields()
520+
.iter()
521+
.map(|field| {
522+
let field_name = field.name();
523+
let field_type = field.data_type();
524+
525+
// Look up the corresponding field type in the table schema
526+
if let Some(table_type) = table_fields.get(field_name) {
527+
match (table_type, field_type) {
528+
// table schema uses string type, coerce the file schema to use string type
529+
(
530+
&DataType::Utf8,
531+
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
532+
) => {
533+
return field_with_new_type(field, DataType::Utf8);
534+
}
535+
// table schema uses large string type, coerce the file schema to use large string type
536+
(
537+
&DataType::LargeUtf8,
538+
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
539+
) => {
540+
return field_with_new_type(field, DataType::LargeUtf8);
541+
}
542+
// table schema uses string view type, coerce the file schema to use view type
543+
(
544+
&DataType::Utf8View,
545+
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
546+
) => {
547+
return field_with_new_type(field, DataType::Utf8View);
548+
}
549+
// Handle view type conversions
550+
(&DataType::Utf8View, DataType::Utf8 | DataType::LargeUtf8) => {
551+
return field_with_new_type(field, DataType::Utf8View);
552+
}
553+
(&DataType::BinaryView, DataType::Binary | DataType::LargeBinary) => {
554+
return field_with_new_type(field, DataType::BinaryView);
555+
}
556+
_ => {}
557+
}
558+
}
559+
560+
// If no transformation is needed, keep the original field
561+
Arc::clone(field)
562+
})
563+
.collect();
564+
565+
Some(Schema::new_with_metadata(
566+
transformed_fields,
567+
file_schema.metadata.clone(),
568+
))
569+
}
570+
468571
/// Coerces the file schema if the table schema uses a view type.
572+
#[deprecated(
573+
since = "47.0.0",
574+
note = "Use `apply_file_schema_type_coercions` instead"
575+
)]
469576
pub fn coerce_file_schema_to_view_type(
470577
table_schema: &Schema,
471578
file_schema: &Schema,
@@ -515,6 +622,10 @@ pub fn coerce_file_schema_to_view_type(
515622
/// If the table schema uses a string type, coerce the file schema to use a string type.
516623
///
517624
/// See [ParquetFormat::binary_as_string] for details
625+
#[deprecated(
626+
since = "47.0.0",
627+
note = "Use `apply_file_schema_type_coercions` instead"
628+
)]
518629
pub fn coerce_file_schema_to_string_type(
519630
table_schema: &Schema,
520631
file_schema: &Schema,
@@ -718,11 +829,8 @@ pub fn statistics_from_parquet_meta_calc(
718829
file_metadata.schema_descr(),
719830
file_metadata.key_value_metadata(),
720831
)?;
721-
if let Some(merged) = coerce_file_schema_to_string_type(&table_schema, &file_schema) {
722-
file_schema = merged;
723-
}
724832

725-
if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &file_schema) {
833+
if let Some(merged) = apply_file_schema_type_coercions(&table_schema, &file_schema) {
726834
file_schema = merged;
727835
}
728836

datafusion/datasource-parquet/src/opener.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,11 @@
1919
2020
use std::sync::Arc;
2121

22-
use crate::file_format::{
23-
coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
24-
};
2522
use crate::page_filter::PagePruningAccessPlanFilter;
2623
use crate::row_group_filter::RowGroupAccessPlanFilter;
2724
use crate::{
28-
row_filter, should_enable_page_index, ParquetAccessPlan, ParquetFileMetrics,
29-
ParquetFileReaderFactory,
25+
apply_file_schema_type_coercions, row_filter, should_enable_page_index,
26+
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
3027
};
3128
use datafusion_datasource::file_meta::FileMeta;
3229
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
@@ -131,14 +128,8 @@ impl FileOpener for ParquetOpener {
131128
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?;
132129
let mut schema = Arc::clone(metadata.schema());
133130

134-
if let Some(merged) =
135-
coerce_file_schema_to_string_type(&table_schema, &schema)
136-
{
137-
schema = Arc::new(merged);
138-
}
139-
140131
// read with view types
141-
if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &schema)
132+
if let Some(merged) = apply_file_schema_type_coercions(&table_schema, &schema)
142133
{
143134
schema = Arc::new(merged);
144135
}

0 commit comments

Comments
 (0)