Skip to content

Commit 79d60f9

Browse files
authored
Remove qualifiers on pushed down predicates / Fix parquet pruning (apache#689)
* Remove qualifiers on pushed down predicates * Add test for normalizing and unnormalizing columns * Fix logical conflict
1 parent 9f8e265 commit 79d60f9

File tree

4 files changed

+164
-26
lines changed

4 files changed

+164
-26
lines changed

datafusion/src/logical_plan/expr.rs

Lines changed: 140 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,22 @@ impl Column {
9090
/// For example, `foo` will be normalized to `t.foo` if there is a
9191
/// column named `foo` in a relation named `t` found in `schemas`
9292
pub fn normalize(self, plan: &LogicalPlan) -> Result<Self> {
93+
let schemas = plan.all_schemas();
94+
let using_columns = plan.using_columns()?;
95+
self.normalize_with_schemas(&schemas, &using_columns)
96+
}
97+
98+
// Internal implementation of normalize
99+
fn normalize_with_schemas(
100+
self,
101+
schemas: &[&Arc<DFSchema>],
102+
using_columns: &[HashSet<Column>],
103+
) -> Result<Self> {
93104
if self.relation.is_some() {
94105
return Ok(self);
95106
}
96107

97-
let schemas = plan.all_schemas();
98-
let using_columns = plan.using_columns()?;
99-
100-
for schema in &schemas {
108+
for schema in schemas {
101109
let fields = schema.fields_with_unqualified_name(&self.name);
102110
match fields.len() {
103111
0 => continue,
@@ -118,7 +126,7 @@ impl Column {
118126
// We will use the relation from the first matched field to normalize self.
119127

120128
// Compare matched fields with one USING JOIN clause at a time
121-
for using_col in &using_columns {
129+
for using_col in using_columns {
122130
let all_matched = fields
123131
.iter()
124132
.all(|f| using_col.contains(&f.qualified_column()));
@@ -1171,22 +1179,39 @@ pub fn replace_col(e: Expr, replace_map: &HashMap<&Column, &Column>) -> Result<E
11711179

11721180
/// Recursively call [`Column::normalize`] on all Column expressions
11731181
/// in the `expr` expression tree.
1174-
pub fn normalize_col(e: Expr, plan: &LogicalPlan) -> Result<Expr> {
1182+
pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result<Expr> {
1183+
normalize_col_with_schemas(expr, &plan.all_schemas(), &plan.using_columns()?)
1184+
}
1185+
1186+
/// Recursively call [`Column::normalize`] on all Column expressions
1187+
/// in the `expr` expression tree.
1188+
fn normalize_col_with_schemas(
1189+
expr: Expr,
1190+
schemas: &[&Arc<DFSchema>],
1191+
using_columns: &[HashSet<Column>],
1192+
) -> Result<Expr> {
11751193
struct ColumnNormalizer<'a> {
1176-
plan: &'a LogicalPlan,
1194+
schemas: &'a [&'a Arc<DFSchema>],
1195+
using_columns: &'a [HashSet<Column>],
11771196
}
11781197

11791198
impl<'a> ExprRewriter for ColumnNormalizer<'a> {
11801199
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
11811200
if let Expr::Column(c) = expr {
1182-
Ok(Expr::Column(c.normalize(self.plan)?))
1201+
Ok(Expr::Column(c.normalize_with_schemas(
1202+
self.schemas,
1203+
self.using_columns,
1204+
)?))
11831205
} else {
11841206
Ok(expr)
11851207
}
11861208
}
11871209
}
11881210

1189-
e.rewrite(&mut ColumnNormalizer { plan })
1211+
expr.rewrite(&mut ColumnNormalizer {
1212+
schemas,
1213+
using_columns,
1214+
})
11901215
}
11911216

11921217
/// Recursively normalize all Column expressions in a list of expression trees
@@ -1198,6 +1223,38 @@ pub fn normalize_cols(
11981223
exprs.into_iter().map(|e| normalize_col(e, plan)).collect()
11991224
}
12001225

1226+
/// Recursively 'unnormalize' (remove all qualifiers) from an
1227+
/// expression tree.
1228+
///
1229+
/// For example, if there were expressions like `foo.bar` this would
1230+
/// rewrite it to just `bar`.
1231+
pub fn unnormalize_col(expr: Expr) -> Expr {
1232+
struct RemoveQualifier {}
1233+
1234+
impl ExprRewriter for RemoveQualifier {
1235+
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
1236+
if let Expr::Column(col) = expr {
1237+
//let Column { relation: _, name } = col;
1238+
Ok(Expr::Column(Column {
1239+
relation: None,
1240+
name: col.name,
1241+
}))
1242+
} else {
1243+
Ok(expr)
1244+
}
1245+
}
1246+
}
1247+
1248+
expr.rewrite(&mut RemoveQualifier {})
1249+
.expect("Unnormalize is infallable")
1250+
}
1251+
1252+
/// Recursively un-normalize all Column expressions in a list of expression trees
1253+
#[inline]
1254+
pub fn unnormalize_cols(exprs: impl IntoIterator<Item = Expr>) -> Vec<Expr> {
1255+
exprs.into_iter().map(unnormalize_col).collect()
1256+
}
1257+
12011258
/// Create an expression to represent the min() aggregate function
12021259
pub fn min(expr: Expr) -> Expr {
12031260
Expr::AggregateFunction {
@@ -1810,4 +1867,78 @@ mod tests {
18101867
}
18111868
}
18121869
}
1870+
1871+
#[test]
1872+
fn normalize_cols() {
1873+
let expr = col("a") + col("b") + col("c");
1874+
1875+
// Schemas with some matching and some non matching cols
1876+
let schema_a =
1877+
DFSchema::new(vec![make_field("tableA", "a"), make_field("tableA", "aa")])
1878+
.unwrap();
1879+
let schema_c =
1880+
DFSchema::new(vec![make_field("tableC", "cc"), make_field("tableC", "c")])
1881+
.unwrap();
1882+
let schema_b = DFSchema::new(vec![make_field("tableB", "b")]).unwrap();
1883+
// non matching
1884+
let schema_f =
1885+
DFSchema::new(vec![make_field("tableC", "f"), make_field("tableC", "ff")])
1886+
.unwrap();
1887+
let schemas = vec![schema_c, schema_f, schema_b, schema_a]
1888+
.into_iter()
1889+
.map(Arc::new)
1890+
.collect::<Vec<_>>();
1891+
let schemas = schemas.iter().collect::<Vec<_>>();
1892+
1893+
let normalized_expr = normalize_col_with_schemas(expr, &schemas, &[]).unwrap();
1894+
assert_eq!(
1895+
normalized_expr,
1896+
col("tableA.a") + col("tableB.b") + col("tableC.c")
1897+
);
1898+
}
1899+
1900+
#[test]
1901+
fn normalize_cols_priority() {
1902+
let expr = col("a") + col("b");
1903+
// Schemas with multiple matches for column a, first takes priority
1904+
let schema_a = DFSchema::new(vec![make_field("tableA", "a")]).unwrap();
1905+
let schema_b = DFSchema::new(vec![make_field("tableB", "b")]).unwrap();
1906+
let schema_a2 = DFSchema::new(vec![make_field("tableA2", "a")]).unwrap();
1907+
let schemas = vec![schema_a2, schema_b, schema_a]
1908+
.into_iter()
1909+
.map(Arc::new)
1910+
.collect::<Vec<_>>();
1911+
let schemas = schemas.iter().collect::<Vec<_>>();
1912+
1913+
let normalized_expr = normalize_col_with_schemas(expr, &schemas, &[]).unwrap();
1914+
assert_eq!(normalized_expr, col("tableA2.a") + col("tableB.b"));
1915+
}
1916+
1917+
#[test]
1918+
fn normalize_cols_non_exist() {
1919+
// test normalizing columns when the name doesn't exist
1920+
let expr = col("a") + col("b");
1921+
let schema_a = DFSchema::new(vec![make_field("tableA", "a")]).unwrap();
1922+
let schemas = vec![schema_a].into_iter().map(Arc::new).collect::<Vec<_>>();
1923+
let schemas = schemas.iter().collect::<Vec<_>>();
1924+
1925+
let error = normalize_col_with_schemas(expr, &schemas, &[])
1926+
.unwrap_err()
1927+
.to_string();
1928+
assert_eq!(
1929+
error,
1930+
"Error during planning: Column #b not found in provided schemas"
1931+
);
1932+
}
1933+
1934+
#[test]
1935+
fn unnormalize_cols() {
1936+
let expr = col("tableA.a") + col("tableB.b");
1937+
let unnormalized_expr = unnormalize_col(expr);
1938+
assert_eq!(unnormalized_expr, col("a") + col("b"));
1939+
}
1940+
1941+
fn make_field(relation: &str, column: &str) -> DFField {
1942+
DFField::new(Some(relation), column, DataType::Int8, false)
1943+
}
18131944
}

datafusion/src/logical_plan/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ pub use expr::{
4343
min, normalize_col, normalize_cols, now, octet_length, or, random, regexp_match,
4444
regexp_replace, repeat, replace, replace_col, reverse, right, round, rpad, rtrim,
4545
sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos,
46-
substr, sum, tan, to_hex, translate, trim, trunc, upper, when, Column, Expr,
47-
ExprRewriter, ExpressionVisitor, Literal, Recursion,
46+
substr, sum, tan, to_hex, translate, trim, trunc, unnormalize_col, unnormalize_cols,
47+
upper, when, Column, Expr, ExprRewriter, ExpressionVisitor, Literal, Recursion,
4848
};
4949
pub use extension::UserDefinedLogicalNode;
5050
pub use operators::Operator;

datafusion/src/physical_plan/planner.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ use super::{
2323
};
2424
use crate::execution::context::ExecutionContextState;
2525
use crate::logical_plan::{
26-
DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType,
27-
StringifiedPlan, UserDefinedLogicalNode,
26+
unnormalize_cols, DFSchema, Expr, LogicalPlan, Operator,
27+
Partitioning as LogicalPartitioning, PlanType, StringifiedPlan,
28+
UserDefinedLogicalNode,
2829
};
2930
use crate::physical_plan::explain::ExplainExec;
3031
use crate::physical_plan::expressions;
@@ -311,7 +312,13 @@ impl DefaultPhysicalPlanner {
311312
filters,
312313
limit,
313314
..
314-
} => source.scan(projection, batch_size, filters, *limit),
315+
} => {
316+
// Remove all qualifiers from the scan as the provider
317+
// doesn't know (nor should care) how the relation was
318+
// referred to in the query
319+
let filters = unnormalize_cols(filters.iter().cloned());
320+
source.scan(projection, batch_size, &filters, *limit)
321+
}
315322
LogicalPlan::Window {
316323
input, window_expr, ..
317324
} => {

datafusion/tests/parquet_pruning.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ async fn prune_timestamps_nanos() {
4444
.query("SELECT * FROM t where nanos < to_timestamp('2020-01-02 01:01:11Z')")
4545
.await;
4646
println!("{}", output.description());
47-
// TODO This should prune one metrics without error
48-
assert_eq!(output.predicate_evaluation_errors(), Some(1));
49-
assert_eq!(output.row_groups_pruned(), Some(0));
47+
// This should prune one metrics without error
48+
assert_eq!(output.predicate_evaluation_errors(), Some(0));
49+
assert_eq!(output.row_groups_pruned(), Some(1));
5050
assert_eq!(output.result_rows, 10, "{}", output.description());
5151
}
5252

@@ -59,9 +59,9 @@ async fn prune_timestamps_micros() {
5959
)
6060
.await;
6161
println!("{}", output.description());
62-
// TODO This should prune one metrics without error
63-
assert_eq!(output.predicate_evaluation_errors(), Some(1));
64-
assert_eq!(output.row_groups_pruned(), Some(0));
62+
// This should prune one metrics without error
63+
assert_eq!(output.predicate_evaluation_errors(), Some(0));
64+
assert_eq!(output.row_groups_pruned(), Some(1));
6565
assert_eq!(output.result_rows, 10, "{}", output.description());
6666
}
6767

@@ -74,9 +74,9 @@ async fn prune_timestamps_millis() {
7474
)
7575
.await;
7676
println!("{}", output.description());
77-
// TODO This should prune one metrics without error
78-
assert_eq!(output.predicate_evaluation_errors(), Some(1));
79-
assert_eq!(output.row_groups_pruned(), Some(0));
77+
// This should prune one metrics without error
78+
assert_eq!(output.predicate_evaluation_errors(), Some(0));
79+
assert_eq!(output.row_groups_pruned(), Some(1));
8080
assert_eq!(output.result_rows, 10, "{}", output.description());
8181
}
8282

@@ -89,9 +89,9 @@ async fn prune_timestamps_seconds() {
8989
)
9090
.await;
9191
println!("{}", output.description());
92-
// TODO This should prune one metrics without error
93-
assert_eq!(output.predicate_evaluation_errors(), Some(1));
94-
assert_eq!(output.row_groups_pruned(), Some(0));
92+
// This should prune one metrics without error
93+
assert_eq!(output.predicate_evaluation_errors(), Some(0));
94+
assert_eq!(output.row_groups_pruned(), Some(1));
9595
assert_eq!(output.result_rows, 10, "{}", output.description());
9696
}
9797

0 commit comments

Comments
 (0)