Skip to content

Commit 62b063c

Browse files
authored
Improve documentation and examples for SchemaAdapterFactory, make record_batch "hygenic" (#13063)
* Improve documentation and examples for SchemaAdapterFactory and related classes * fix macro * Add macro hygene test * Fix example, add convenience function, update docs * Add tests and docs showing what happens when adapting a nullable column * review feedback * fix clippy
1 parent 412ca4e commit 62b063c

File tree

3 files changed

+239
-59
lines changed

3 files changed

+239
-59
lines changed

datafusion/common/src/test_util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ macro_rules! record_batch {
347347
let batch = arrow_array::RecordBatch::try_new(
348348
schema,
349349
vec![$(
350-
create_array!($type, $values),
350+
$crate::create_array!($type, $values),
351351
)*]
352352
);
353353

datafusion/core/src/datasource/schema_adapter.rs

Lines changed: 228 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -32,65 +32,77 @@ use std::sync::Arc;
3232
///
3333
/// This interface provides a way to implement custom schema adaptation logic
3434
/// for ParquetExec (for example, to fill missing columns with default value
35-
/// other than null)
35+
/// other than null).
36+
///
37+
/// Most users should use [`DefaultSchemaAdapterFactory`]. See that struct for
38+
/// more details and examples.
3639
pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
37-
/// Provides `SchemaAdapter`.
38-
// The design of this function is mostly modeled for the needs of DefaultSchemaAdapterFactory,
39-
// read its implementation docs for the reasoning
40+
/// Create a [`SchemaAdapter`]
41+
///
42+
/// Arguments:
43+
///
44+
/// * `projected_table_schema`: The schema for the table, projected to
45+
/// include only the fields being output (projected) by the this mapping.
46+
///
47+
/// * `table_schema`: The entire table schema for the table
4048
fn create(
4149
&self,
4250
projected_table_schema: SchemaRef,
4351
table_schema: SchemaRef,
4452
) -> Box<dyn SchemaAdapter>;
4553
}
4654

47-
/// Adapt file-level [`RecordBatch`]es to a table schema, which may have a schema
48-
/// obtained from merging multiple file-level schemas.
49-
///
50-
/// This is useful for enabling schema evolution in partitioned datasets.
51-
///
52-
/// This has to be done in two stages.
55+
/// Creates [`SchemaMapper`]s to map file-level [`RecordBatch`]es to a table
56+
/// schema, which may have a schema obtained from merging multiple file-level
57+
/// schemas.
5358
///
54-
/// 1. Before reading the file, we have to map projected column indexes from the
55-
/// table schema to the file schema.
59+
/// This is useful for implementing schema evolution in partitioned datasets.
5660
///
57-
/// 2. After reading a record batch map the read columns back to the expected
58-
/// columns indexes and insert null-valued columns wherever the file schema was
59-
/// missing a column present in the table schema.
61+
/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
6062
pub trait SchemaAdapter: Send + Sync {
6163
/// Map a column index in the table schema to a column index in a particular
6264
/// file schema
6365
///
66+
/// This is used while reading a file to push down projections by mapping
67+
/// projected column indexes from the table schema to the file schema
68+
///
6469
/// Panics if index is not in range for the table schema
6570
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;
6671

67-
/// Creates a `SchemaMapping` that can be used to cast or map the columns
68-
/// from the file schema to the table schema.
72+
/// Creates a mapping for casting columns from the file schema to the table
73+
/// schema.
6974
///
70-
/// If the provided `file_schema` contains columns of a different type to the expected
71-
/// `table_schema`, the method will attempt to cast the array data from the file schema
72-
/// to the table schema where possible.
75+
/// This is used after reading a record batch. The returned [`SchemaMapper`]:
7376
///
74-
/// Returns a [`SchemaMapper`] that can be applied to the output batch
75-
/// along with an ordered list of columns to project from the file
77+
/// 1. Maps columns to the expected columns indexes
78+
/// 2. Handles missing values (e.g. fills nulls or a default value) for
79+
/// columns in the in the table schema not in the file schema
80+
/// 2. Handles different types: if the column in the file schema has a
81+
/// different type than `table_schema`, the mapper will resolve this
82+
/// difference (e.g. by casting to the appropriate type)
83+
///
84+
/// Returns:
85+
/// * a [`SchemaMapper`]
86+
/// * an ordered list of columns to project from the file
7687
fn map_schema(
7788
&self,
7889
file_schema: &Schema,
7990
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
8091
}
8192

82-
/// Maps, by casting or reordering columns from the file schema to the table
83-
/// schema.
93+
/// Maps, columns from a specific file schema to the table schema.
94+
///
95+
/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
8496
pub trait SchemaMapper: Debug + Send + Sync {
85-
/// Adapts a `RecordBatch` to match the `table_schema` using the stored
86-
/// mapping and conversions.
97+
/// Adapts a `RecordBatch` to match the `table_schema`
8798
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
8899

89100
/// Adapts a [`RecordBatch`] that does not have all the columns from the
90101
/// file schema.
91102
///
92-
/// This method is used when applying a filter to a subset of the columns as
93-
/// part of `DataFusionArrowPredicate` when `filter_pushdown` is enabled.
103+
/// This method is used, for example, when applying a filter to a subset of
104+
/// the columns as part of `DataFusionArrowPredicate` when `filter_pushdown`
105+
/// is enabled.
94106
///
95107
/// This method is slower than `map_batch` as it looks up columns by name.
96108
fn map_partial_batch(
@@ -99,11 +111,106 @@ pub trait SchemaMapper: Debug + Send + Sync {
99111
) -> datafusion_common::Result<RecordBatch>;
100112
}
101113

102-
/// Implementation of [`SchemaAdapterFactory`] that maps columns by name
103-
/// and casts columns to the expected type.
114+
/// Default [`SchemaAdapterFactory`] for mapping schemas.
115+
///
116+
/// This can be used to adapt file-level record batches to a table schema and
117+
/// implement schema evolution.
118+
///
119+
/// Given an input file schema and a table schema, this factory returns
120+
/// [`SchemaAdapter`] that return [`SchemaMapper`]s that:
121+
///
122+
/// 1. Reorder columns
123+
/// 2. Cast columns to the correct type
124+
/// 3. Fill missing columns with nulls
125+
///
126+
/// # Errors:
127+
///
128+
/// * If a column in the table schema is non-nullable but is not present in the
129+
/// file schema (i.e. it is missing), the returned mapper tries to fill it with
130+
/// nulls resulting in a schema error.
131+
///
132+
/// # Illustration of Schema Mapping
133+
///
134+
/// ```text
135+
/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
136+
/// ┌───────┐ ┌───────┐ │ ┌───────┐ ┌───────┐ ┌───────┐ │
137+
/// ││ 1.0 │ │ "foo" │ ││ NULL │ │ "foo" │ │ "1.0" │
138+
/// ├───────┤ ├───────┤ │ Schema mapping ├───────┤ ├───────┤ ├───────┤ │
139+
/// ││ 2.0 │ │ "bar" │ ││ NULL │ │ "bar" │ │ "2.0" │
140+
/// └───────┘ └───────┘ │────────────────▶ └───────┘ └───────┘ └───────┘ │
141+
/// │ │
142+
/// column "c" column "b"│ column "a" column "b" column "c"│
143+
/// │ Float64 Utf8 │ Int32 Utf8 Utf8
144+
/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
145+
/// Input Record Batch Output Record Batch
146+
///
147+
/// Schema { Schema {
148+
/// "c": Float64, "a": Int32,
149+
/// "b": Utf8, "b": Utf8,
150+
/// } "c": Utf8,
151+
/// }
152+
/// ```
153+
///
154+
/// # Example of using the `DefaultSchemaAdapterFactory` to map [`RecordBatch`]s
155+
///
156+
/// Note `SchemaMapping` also supports mapping partial batches, which is used as
157+
/// part of predicate pushdown.
158+
///
159+
/// ```
160+
/// # use std::sync::Arc;
161+
/// # use arrow::datatypes::{DataType, Field, Schema};
162+
/// # use datafusion::datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory};
163+
/// # use datafusion_common::record_batch;
164+
/// // Table has fields "a", "b" and "c"
165+
/// let table_schema = Schema::new(vec![
166+
/// Field::new("a", DataType::Int32, true),
167+
/// Field::new("b", DataType::Utf8, true),
168+
/// Field::new("c", DataType::Utf8, true),
169+
/// ]);
170+
///
171+
/// // create an adapter to map the table schema to the file schema
172+
/// let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
173+
///
174+
/// // The file schema has fields "c" and "b" but "b" is stored as an 'Float64'
175+
/// // instead of 'Utf8'
176+
/// let file_schema = Schema::new(vec![
177+
/// Field::new("c", DataType::Utf8, true),
178+
/// Field::new("b", DataType::Float64, true),
179+
/// ]);
180+
///
181+
/// // Get a mapping from the file schema to the table schema
182+
/// let (mapper, _indices) = adapter.map_schema(&file_schema).unwrap();
183+
///
184+
/// let file_batch = record_batch!(
185+
/// ("c", Utf8, vec!["foo", "bar"]),
186+
/// ("b", Float64, vec![1.0, 2.0])
187+
/// ).unwrap();
188+
///
189+
/// let mapped_batch = mapper.map_batch(file_batch).unwrap();
190+
///
191+
/// // the mapped batch has the correct schema and the "b" column has been cast to Utf8
192+
/// let expected_batch = record_batch!(
193+
/// ("a", Int32, vec![None, None]), // missing column filled with nulls
194+
/// ("b", Utf8, vec!["1.0", "2.0"]), // b was cast to string and order was changed
195+
/// ("c", Utf8, vec!["foo", "bar"])
196+
/// ).unwrap();
197+
/// assert_eq!(mapped_batch, expected_batch);
198+
/// ```
104199
#[derive(Clone, Debug, Default)]
105200
pub struct DefaultSchemaAdapterFactory;
106201

202+
impl DefaultSchemaAdapterFactory {
203+
/// Create a new factory for mapping batches from a file schema to a table
204+
/// schema.
205+
///
206+
/// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with
207+
/// the same schema for both the projected table schema and the table
208+
/// schema.
209+
pub fn from_schema(table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
210+
Self.create(Arc::clone(&table_schema), table_schema)
211+
}
212+
}
213+
107214
impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
108215
fn create(
109216
&self,
@@ -117,8 +224,8 @@ impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
117224
}
118225
}
119226

120-
/// This SchemaAdapter requires both the table schema and the projected table schema because of the
121-
/// needs of the [`SchemaMapping`] it creates. Read its documentation for more details
227+
/// This SchemaAdapter requires both the table schema and the projected table
228+
/// schema. See [`SchemaMapping`] for more details
122229
#[derive(Clone, Debug)]
123230
pub(crate) struct DefaultSchemaAdapter {
124231
/// The schema for the table, projected to include only the fields being output (projected) by the
@@ -142,11 +249,12 @@ impl SchemaAdapter for DefaultSchemaAdapter {
142249
Some(file_schema.fields.find(field.name())?.0)
143250
}
144251

145-
/// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema.
252+
/// Creates a `SchemaMapping` for casting or mapping the columns from the
253+
/// file schema to the table schema.
146254
///
147-
/// If the provided `file_schema` contains columns of a different type to the expected
148-
/// `table_schema`, the method will attempt to cast the array data from the file schema
149-
/// to the table schema where possible.
255+
/// If the provided `file_schema` contains columns of a different type to
256+
/// the expected `table_schema`, the method will attempt to cast the array
257+
/// data from the file schema to the table schema where possible.
150258
///
151259
/// Returns a [`SchemaMapping`] that can be applied to the output batch
152260
/// along with an ordered list of columns to project from the file
@@ -189,36 +297,45 @@ impl SchemaAdapter for DefaultSchemaAdapter {
189297
}
190298
}
191299

192-
/// The SchemaMapping struct holds a mapping from the file schema to the table schema
193-
/// and any necessary type conversions that need to be applied.
300+
/// The SchemaMapping struct holds a mapping from the file schema to the table
301+
/// schema and any necessary type conversions.
302+
///
303+
/// Note, because `map_batch` and `map_partial_batch` functions have different
304+
/// needs, this struct holds two schemas:
305+
///
306+
/// 1. The projected **table** schema
307+
/// 2. The full table schema
194308
///
195-
/// This needs both the projected table schema and full table schema because its different
196-
/// functions have different needs. The [`map_batch`] function is only used by the ParquetOpener to
197-
/// produce a RecordBatch which has the projected schema, since that's the schema which is supposed
198-
/// to come out of the execution of this query. [`map_partial_batch`], however, is used to create a
199-
/// RecordBatch with a schema that can be used for Parquet pushdown, meaning that it may contain
200-
/// fields which are not in the projected schema (as the fields that parquet pushdown filters
201-
/// operate can be completely distinct from the fields that are projected (output) out of the
202-
/// ParquetExec).
309+
/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which
310+
/// has the projected schema, since that's the schema which is supposed to come
311+
/// out of the execution of this query. Thus `map_batch` uses
312+
/// `projected_table_schema` as it can only operate on the projected fields.
203313
///
204-
/// [`map_partial_batch`] uses `table_schema` to create the resulting RecordBatch (as it could be
205-
/// operating on any fields in the schema), while [`map_batch`] uses `projected_table_schema` (as
206-
/// it can only operate on the projected fields).
314+
/// [`map_partial_batch`] is used to create a RecordBatch with a schema that
315+
/// can be used for Parquet predicate pushdown, meaning that it may contain
316+
/// fields which are not in the projected schema (as the fields that parquet
317+
/// pushdown filters operate can be completely distinct from the fields that are
318+
/// projected (output) out of the ParquetExec). `map_partial_batch` thus uses
319+
/// `table_schema` to create the resulting RecordBatch (as it could be operating
320+
/// on any fields in the schema).
207321
///
208322
/// [`map_batch`]: Self::map_batch
209323
/// [`map_partial_batch`]: Self::map_partial_batch
210324
#[derive(Debug)]
211325
pub struct SchemaMapping {
212-
/// The schema of the table. This is the expected schema after conversion and it should match
213-
/// the schema of the query result.
326+
/// The schema of the table. This is the expected schema after conversion
327+
/// and it should match the schema of the query result.
214328
projected_table_schema: SchemaRef,
215-
/// Mapping from field index in `projected_table_schema` to index in projected file_schema.
216-
/// They are Options instead of just plain `usize`s because the table could have fields that
217-
/// don't exist in the file.
329+
/// Mapping from field index in `projected_table_schema` to index in
330+
/// projected file_schema.
331+
///
332+
/// They are Options instead of just plain `usize`s because the table could
333+
/// have fields that don't exist in the file.
218334
field_mappings: Vec<Option<usize>>,
219-
/// The entire table schema, as opposed to the projected_table_schema (which only contains the
220-
/// columns that we are projecting out of this query). This contains all fields in the table,
221-
/// regardless of if they will be projected out or not.
335+
/// The entire table schema, as opposed to the projected_table_schema (which
336+
/// only contains the columns that we are projecting out of this query).
337+
/// This contains all fields in the table, regardless of if they will be
338+
/// projected out or not.
222339
table_schema: SchemaRef,
223340
}
224341

@@ -331,8 +448,9 @@ mod tests {
331448

332449
use crate::datasource::listing::PartitionedFile;
333450
use crate::datasource::schema_adapter::{
334-
SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
451+
DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
335452
};
453+
use datafusion_common::record_batch;
336454
#[cfg(feature = "parquet")]
337455
use parquet::arrow::ArrowWriter;
338456
use tempfile::TempDir;
@@ -405,6 +523,58 @@ mod tests {
405523
assert_batches_sorted_eq!(expected, &read);
406524
}
407525

526+
#[test]
527+
fn default_schema_adapter() {
528+
let table_schema = Schema::new(vec![
529+
Field::new("a", DataType::Int32, true),
530+
Field::new("b", DataType::Utf8, true),
531+
]);
532+
533+
// file has a subset of the table schema fields and different type
534+
let file_schema = Schema::new(vec![
535+
Field::new("c", DataType::Float64, true), // not in table schema
536+
Field::new("b", DataType::Float64, true),
537+
]);
538+
539+
let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
540+
let (mapper, indices) = adapter.map_schema(&file_schema).unwrap();
541+
assert_eq!(indices, vec![1]);
542+
543+
let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap();
544+
545+
let mapped_batch = mapper.map_batch(file_batch).unwrap();
546+
547+
// the mapped batch has the correct schema and the "b" column has been cast to Utf8
548+
let expected_batch = record_batch!(
549+
("a", Int32, vec![None, None]), // missing column filled with nulls
550+
("b", Utf8, vec!["1.0", "2.0"]) // b was cast to string and order was changed
551+
)
552+
.unwrap();
553+
assert_eq!(mapped_batch, expected_batch);
554+
}
555+
556+
#[test]
557+
fn default_schema_adapter_non_nullable_columns() {
558+
let table_schema = Schema::new(vec![
559+
Field::new("a", DataType::Int32, false), // "a"" is declared non nullable
560+
Field::new("b", DataType::Utf8, true),
561+
]);
562+
let file_schema = Schema::new(vec![
563+
// since file doesn't have "a" it will be filled with nulls
564+
Field::new("b", DataType::Float64, true),
565+
]);
566+
567+
let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
568+
let (mapper, indices) = adapter.map_schema(&file_schema).unwrap();
569+
assert_eq!(indices, vec![0]);
570+
571+
let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap();
572+
573+
// Mapping fails because it tries to fill in a non-nullable column with nulls
574+
let err = mapper.map_batch(file_batch).unwrap_err().to_string();
575+
assert!(err.contains("Invalid argument error: Column 'a' is declared as non-nullable but contains null values"), "{err}");
576+
}
577+
408578
#[derive(Debug)]
409579
struct TestSchemaAdapterFactory;
410580

0 commit comments

Comments
 (0)