1818//! Custom schema adapter that uses Spark-compatible conversions
1919
2020use crate :: parquet:: parquet_support:: { spark_parquet_convert, SparkParquetOptions } ;
21- use arrow:: array:: { new_null_array , RecordBatch , RecordBatchOptions } ;
21+ use arrow:: array:: { RecordBatch , RecordBatchOptions } ;
2222use arrow:: datatypes:: { Schema , SchemaRef } ;
2323use datafusion:: common:: ColumnStatistics ;
2424use datafusion:: datasource:: schema_adapter:: { SchemaAdapter , SchemaAdapterFactory , SchemaMapper } ;
2525use datafusion:: physical_plan:: ColumnarValue ;
26+ use datafusion:: scalar:: ScalarValue ;
27+ use std:: collections:: HashMap ;
2628use std:: sync:: Arc ;
2729
2830/// An implementation of DataFusion's `SchemaAdapterFactory` that uses a Spark-compatible
@@ -31,12 +33,17 @@ use std::sync::Arc;
3133pub struct SparkSchemaAdapterFactory {
3234 /// Spark cast options
3335 parquet_options : SparkParquetOptions ,
36+ default_values : Option < HashMap < usize , ScalarValue > > ,
3437}
3538
3639impl SparkSchemaAdapterFactory {
37- pub fn new ( options : SparkParquetOptions ) -> Self {
40+ pub fn new (
41+ options : SparkParquetOptions ,
42+ default_values : Option < HashMap < usize , ScalarValue > > ,
43+ ) -> Self {
3844 Self {
3945 parquet_options : options,
46+ default_values,
4047 }
4148 }
4249}
@@ -56,6 +63,7 @@ impl SchemaAdapterFactory for SparkSchemaAdapterFactory {
5663 Box :: new ( SparkSchemaAdapter {
5764 required_schema,
5865 parquet_options : self . parquet_options . clone ( ) ,
66+ default_values : self . default_values . clone ( ) ,
5967 } )
6068 }
6169}
@@ -69,6 +77,7 @@ pub struct SparkSchemaAdapter {
6977 required_schema : SchemaRef ,
7078 /// Spark cast options
7179 parquet_options : SparkParquetOptions ,
80+ default_values : Option < HashMap < usize , ScalarValue > > ,
7281}
7382
7483impl SchemaAdapter for SparkSchemaAdapter {
@@ -134,6 +143,7 @@ impl SchemaAdapter for SparkSchemaAdapter {
134143 required_schema : Arc :: < Schema > :: clone ( & self . required_schema ) ,
135144 field_mappings,
136145 parquet_options : self . parquet_options . clone ( ) ,
146+ default_values : self . default_values . clone ( ) ,
137147 } ) ,
138148 projection,
139149 ) )
@@ -158,16 +168,7 @@ impl SchemaAdapter for SparkSchemaAdapter {
158168/// out of the execution of this query. Thus `map_batch` uses
159169/// `projected_table_schema` as it can only operate on the projected fields.
160170///
161- /// [`map_partial_batch`] is used to create a RecordBatch with a schema that
162- /// can be used for Parquet predicate pushdown, meaning that it may contain
163- /// fields which are not in the projected schema (as the fields that parquet
164- /// pushdown filters operate can be completely distinct from the fields that are
165- /// projected (output) out of the ParquetExec). `map_partial_batch` thus uses
166- /// `table_schema` to create the resulting RecordBatch (as it could be operating
167- /// on any fields in the schema).
168- ///
169171/// [`map_batch`]: Self::map_batch
170- /// [`map_partial_batch`]: Self::map_partial_batch
171172#[ derive( Debug ) ]
172173pub struct SchemaMapping {
173174 /// The schema of the table. This is the expected schema after conversion
@@ -181,6 +182,7 @@ pub struct SchemaMapping {
181182 field_mappings : Vec < Option < usize > > ,
182183 /// Spark cast options
183184 parquet_options : SparkParquetOptions ,
185+ default_values : Option < HashMap < usize , ScalarValue > > ,
184186}
185187
186188impl SchemaMapper for SchemaMapping {
@@ -197,15 +199,43 @@ impl SchemaMapper for SchemaMapping {
197199 // go through each field in the projected schema
198200 . fields ( )
199201 . iter ( )
202+ . enumerate ( )
200203 // and zip it with the index that maps fields from the projected table schema to the
201204 // projected file schema in `batch`
202205 . zip ( & self . field_mappings )
203206 // and for each one...
204- . map ( |( field, file_idx) | {
207+ . map ( |( ( field_idx , field) , file_idx) | {
205208 file_idx. map_or_else (
206- // If this field only exists in the table, and not in the file, then we know
207- // that it's null, so just return that.
208- || Ok ( new_null_array ( field. data_type ( ) , batch_rows) ) ,
209+ // If this field only exists in the table, and not in the file, then we need to
210+ // populate a default value for it.
211+ || {
212+ if self . default_values . is_some ( ) {
213+ // We have a map of default values, see if this field is in there.
214+ if let Some ( value) =
215+ self . default_values . as_ref ( ) . unwrap ( ) . get ( & field_idx)
216+ // Default value exists, construct a column from it.
217+ {
218+ let cv = if field. data_type ( ) == & value. data_type ( ) {
219+ ColumnarValue :: Scalar ( value. clone ( ) )
220+ } else {
221+ // Data types don't match. This can happen when default values
222+ // are stored by Spark in a format different than the column's
223+ // type (e.g., INT32 when the column is DATE32)
224+ spark_parquet_convert (
225+ ColumnarValue :: Scalar ( value. clone ( ) ) ,
226+ field. data_type ( ) ,
227+ & self . parquet_options ,
228+ ) ?
229+ } ;
230+ return cv. into_array ( batch_rows) ;
231+ }
232+ }
233+ // Construct an entire column of nulls. We use the Scalar representation
234+ // for better performance.
235+ let cv =
236+ ColumnarValue :: Scalar ( ScalarValue :: try_new_null ( field. data_type ( ) ) ?) ;
237+ cv. into_array ( batch_rows)
238+ } ,
209239 // However, if it does exist in both, then try to cast it to the correct output
210240 // type
211241 |batch_idx| {
@@ -316,7 +346,7 @@ mod test {
316346
317347 let parquet_source = Arc :: new (
318348 ParquetSource :: new ( TableParquetOptions :: new ( ) ) . with_schema_adapter_factory ( Arc :: new (
319- SparkSchemaAdapterFactory :: new ( spark_parquet_options) ,
349+ SparkSchemaAdapterFactory :: new ( spark_parquet_options, None ) ,
320350 ) ) ,
321351 ) ;
322352
0 commit comments