Skip to content

Commit 479a277

Browse files
authored
Relax physical schema validation (#14519)
Physical plan can be further optimized. In particular, an expression can be determined as never null even if it wasn't known at the time of logical planning. Thus, the final schema check needs to be relax, allowing now-non-null data where nullable data was expected. This replaces schema equality check, with asymmetric "is satisfied by" relation.
1 parent 2aad9d2 commit 479a277

File tree

4 files changed

+101
-2
lines changed

4 files changed

+101
-2
lines changed

datafusion/core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,7 @@ pub mod variable {
812812
#[cfg(not(target_arch = "wasm32"))]
813813
pub mod test;
814814

815+
mod schema_equivalence;
815816
pub mod test_util;
816817

817818
#[cfg(doctest)]

datafusion/core/src/physical_planner.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
8989
use datafusion_physical_plan::unnest::ListUnnest;
9090
use datafusion_sql::utils::window_expr_common_partition_keys;
9191

92+
use crate::schema_equivalence::schema_satisfied_by;
9293
use async_trait::async_trait;
9394
use futures::{StreamExt, TryStreamExt};
9495
use itertools::{multiunzip, Itertools};
@@ -659,7 +660,10 @@ impl DefaultPhysicalPlanner {
659660
let physical_input_schema_from_logical = logical_input_schema.inner();
660661

661662
if !options.execution.skip_physical_aggregate_schema_check
662-
&& &physical_input_schema != physical_input_schema_from_logical
663+
&& !schema_satisfied_by(
664+
physical_input_schema_from_logical,
665+
&physical_input_schema,
666+
)
663667
{
664668
let mut differences = Vec::new();
665669
if physical_input_schema.fields().len()
@@ -688,7 +692,7 @@ impl DefaultPhysicalPlanner {
688692
if physical_field.data_type() != logical_field.data_type() {
689693
differences.push(format!("field data type at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.data_type(), logical_field.data_type()));
690694
}
691-
if physical_field.is_nullable() != logical_field.is_nullable() {
695+
if physical_field.is_nullable() && !logical_field.is_nullable() {
692696
differences.push(format!("field nullability at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.is_nullable(), logical_field.is_nullable()));
693697
}
694698
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow_schema::{DataType, Field, Fields, Schema};
19+
20+
/// Verifies whether the original planned schema can be satisfied with data
21+
/// adhering to the candidate schema. In practice, this is equality check on the
22+
/// schemas except that original schema can have nullable fields where candidate
23+
/// is constrained to not provide null data.
24+
pub(crate) fn schema_satisfied_by(original: &Schema, candidate: &Schema) -> bool {
25+
original.metadata() == candidate.metadata()
26+
&& fields_satisfied_by(original.fields(), candidate.fields())
27+
}
28+
29+
/// See [`schema_satisfied_by`] for the contract.
30+
fn fields_satisfied_by(original: &Fields, candidate: &Fields) -> bool {
31+
original.len() == candidate.len()
32+
&& original
33+
.iter()
34+
.zip(candidate)
35+
.all(|(original, candidate)| field_satisfied_by(original, candidate))
36+
}
37+
38+
/// See [`schema_satisfied_by`] for the contract.
39+
fn field_satisfied_by(original: &Field, candidate: &Field) -> bool {
40+
original.name() == candidate.name()
41+
&& (original.is_nullable() || !candidate.is_nullable())
42+
&& original.metadata() == candidate.metadata()
43+
&& data_type_satisfied_by(original.data_type(), candidate.data_type())
44+
}
45+
46+
/// See [`schema_satisfied_by`] for the contract.
47+
fn data_type_satisfied_by(original: &DataType, candidate: &DataType) -> bool {
48+
match (original, candidate) {
49+
(DataType::List(original_field), DataType::List(candidate_field)) => {
50+
field_satisfied_by(original_field, candidate_field)
51+
}
52+
53+
(DataType::ListView(original_field), DataType::ListView(candidate_field)) => {
54+
field_satisfied_by(original_field, candidate_field)
55+
}
56+
57+
(
58+
DataType::FixedSizeList(original_field, original_size),
59+
DataType::FixedSizeList(candidate_field, candidate_size),
60+
) => {
61+
original_size == candidate_size
62+
&& field_satisfied_by(original_field, candidate_field)
63+
}
64+
65+
(DataType::LargeList(original_field), DataType::LargeList(candidate_field)) => {
66+
field_satisfied_by(original_field, candidate_field)
67+
}
68+
69+
(
70+
DataType::LargeListView(original_field),
71+
DataType::LargeListView(candidate_field),
72+
) => field_satisfied_by(original_field, candidate_field),
73+
74+
(DataType::Struct(original_fields), DataType::Struct(candidate_fields)) => {
75+
fields_satisfied_by(original_fields, candidate_fields)
76+
}
77+
78+
// TODO (DataType::Union(, _), DataType::Union(_, _)) => {}
79+
// TODO (DataType::Dictionary(_, _), DataType::Dictionary(_, _)) => {}
80+
// TODO (DataType::Map(_, _), DataType::Map(_, _)) => {}
81+
// TODO (DataType::RunEndEncoded(_, _), DataType::RunEndEncoded(_, _)) => {}
82+
_ => original == candidate,
83+
}
84+
}

datafusion/sqllogictest/test_files/union.slt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -850,3 +850,13 @@ FROM (
850850
----
851851
NULL false
852852
foo true
853+
854+
query T
855+
SELECT combined
856+
FROM (
857+
SELECT concat('A', 'B') AS combined UNION ALL
858+
SELECT concat('A', 'B') AS combined
859+
)
860+
GROUP BY combined
861+
----
862+
AB

0 commit comments

Comments
 (0)