Skip to content

Commit 4a23f65

Browse files
authored
feat: Support more datafusion features (#4180)
After spending some time today debugging a Datafusion issue with @joseph-isaacs and @robert3005 (ended up not being related to us), I figured I'll use the momentum to add some of the missing features: 1. Support for basic arithmetic expression pushdown (which exists in TPC-DS), including subtraction to `vortex-expr`. 2. Support files with different (but compatible) schemas in the same table. This is currently only supported if a `PhysicalExprAdapterFactory` is provided (either through the `FileScanConfig` or directly). 3. Support hive-style partitioning columns. Both features aren't fully fleshed out and probably require more tests, I want to figure out if we can use datafusion's own test suite to close that gap, they have pretty cool tooling to test SQL. One of the changes it required is making the source and opener more datafusion-y, doing most of the conversion work into Vortex in the actual `FileOpenFuture`. It allows us to utilize more of datafusion's tools for rewriting expressions over specific files. --------- Signed-off-by: Adam Gutglick <[email protected]>
1 parent 62e231a commit 4a23f65

File tree

21 files changed

+614
-318
lines changed

21 files changed

+614
-318
lines changed

Cargo.lock

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,13 @@ crossbeam-queue = "0.3.12"
8787
crossterm = "0.29"
8888
dashmap = "6.1.0"
8989
datafusion = { version = "49", default-features = false }
90+
datafusion-catalog = { version = "49" }
9091
datafusion-common = { version = "49" }
92+
datafusion-common-runtime = { version = "49" }
93+
datafusion-datasource = { version = "49", default-features = false }
94+
datafusion-execution = { version = "49" }
95+
datafusion-expr = { version = "49" }
96+
datafusion-physical-expr = { version = "49" }
9197
datafusion-physical-plan = { version = "49" }
9298
dirs = "6.0.0"
9399
divan = { package = "codspeed-divan-compat", version = "3.0" }

bench-vortex/src/benchmark_driver.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,10 @@ fn execute_queries<B: Benchmark>(
214214
&& query_idx < expected_counts.len()
215215
{
216216
assert_eq!(
217-
row_count, expected_counts[query_idx],
218-
"Row count mismatch for query {query_idx} - duckdb:{format}",
217+
row_count,
218+
expected_counts[query_idx],
219+
"Row count mismatch for query {query_idx} - {}:{format}",
220+
engine_ctx.to_engine()
219221
);
220222
}
221223

@@ -225,10 +227,7 @@ fn execute_queries<B: Benchmark>(
225227
{
226228
memory_measurements.push(MemoryMeasurement {
227229
query_idx,
228-
target: match engine_ctx {
229-
EngineCtx::DataFusion(_) => Target::new(Engine::DataFusion, format),
230-
EngineCtx::DuckDB(_) => Target::new(Engine::DuckDB, format),
231-
},
230+
target: Target::new(engine_ctx.to_engine(), format),
232231
benchmark_dataset: benchmark.dataset(),
233232
storage: url_scheme_to_storage(benchmark.data_url())?,
234233
physical_memory_delta: memory_result.physical_memory_delta,

vortex-datafusion/Cargo.toml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,18 @@ rust-version = { workspace = true }
1414
version = { workspace = true }
1515

1616
[dependencies]
17+
arrow-schema = { workspace = true }
1718
async-trait = { workspace = true }
1819
chrono = { workspace = true }
1920
dashmap = { workspace = true }
20-
datafusion = { workspace = true }
21+
datafusion-catalog = { workspace = true }
22+
datafusion-common = { workspace = true }
23+
datafusion-common-runtime = { workspace = true }
24+
datafusion-datasource = { workspace = true, default-features = false }
25+
datafusion-execution = { workspace = true }
26+
datafusion-expr = { workspace = true }
27+
datafusion-physical-expr = { workspace = true }
28+
datafusion-physical-plan = { workspace = true }
2129
futures = { workspace = true }
2230
itertools = { workspace = true }
2331
log = { workspace = true }
@@ -30,6 +38,7 @@ vortex = { workspace = true, features = ["object_store", "tokio"] }
3038

3139
[dev-dependencies]
3240
anyhow = { workspace = true }
41+
datafusion = { workspace = true }
3342
rstest = { workspace = true }
3443
tempfile = { workspace = true }
3544
tokio = { workspace = true, features = ["test-util", "rt-multi-thread", "fs"] }

vortex-datafusion/src/convert/exprs.rs

Lines changed: 90 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,67 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use datafusion::logical_expr::Operator as DFOperator;
5-
use datafusion::physical_expr::{PhysicalExpr, expressions};
4+
use std::sync::Arc;
5+
6+
use arrow_schema::{DataType, Schema};
7+
use datafusion_expr::Operator as DFOperator;
8+
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
9+
use datafusion_physical_plan::expressions as df_expr;
610
use vortex::error::{VortexResult, vortex_bail, vortex_err};
7-
use vortex::expr::{BinaryExpr, ExprRef, IntoExpr, LikeExpr, Operator, get_item, lit, root};
11+
use vortex::expr::{BinaryExpr, ExprRef, LikeExpr, Operator, and, get_item, lit, root};
812
use vortex::scalar::Scalar;
913

1014
use crate::convert::{FromDataFusion, TryFromDataFusion};
1115

16+
const SUPPORTED_BINARY_OPS: &[DFOperator] = &[
17+
DFOperator::Eq,
18+
DFOperator::NotEq,
19+
DFOperator::Gt,
20+
DFOperator::GtEq,
21+
DFOperator::Lt,
22+
DFOperator::LtEq,
23+
];
24+
25+
/// Tries to convert the expressions into a vortex conjunction. Will return Ok(None) iff the input conjunction is empty.
26+
pub(crate) fn make_vortex_predicate(
27+
predicate: &[&Arc<dyn PhysicalExpr>],
28+
) -> VortexResult<Option<ExprRef>> {
29+
let exprs = predicate
30+
.iter()
31+
.map(|e| ExprRef::try_from_df(e.as_ref()))
32+
.collect::<VortexResult<Vec<_>>>()?;
33+
34+
Ok(exprs.into_iter().reduce(and))
35+
}
36+
1237
// TODO(joe): Don't return an error when we have an unsupported node, bubble up "TRUE" as in keep
1338
// for that node, up to any `and` or `or` node.
1439
impl TryFromDataFusion<dyn PhysicalExpr> for ExprRef {
1540
fn try_from_df(df: &dyn PhysicalExpr) -> VortexResult<Self> {
16-
if let Some(binary_expr) = df.as_any().downcast_ref::<expressions::BinaryExpr>() {
41+
if let Some(binary_expr) = df.as_any().downcast_ref::<df_expr::BinaryExpr>() {
1742
let left = ExprRef::try_from_df(binary_expr.left().as_ref())?;
1843
let right = ExprRef::try_from_df(binary_expr.right().as_ref())?;
1944
let operator = Operator::try_from_df(binary_expr.op())?;
2045

2146
return Ok(BinaryExpr::new_expr(left, operator, right));
2247
}
2348

24-
if let Some(col_expr) = df.as_any().downcast_ref::<expressions::Column>() {
49+
if let Some(col_expr) = df.as_any().downcast_ref::<df_expr::Column>() {
2550
return Ok(get_item(col_expr.name().to_owned(), root()));
2651
}
2752

28-
if let Some(like) = df.as_any().downcast_ref::<expressions::LikeExpr>() {
53+
if let Some(like) = df.as_any().downcast_ref::<df_expr::LikeExpr>() {
2954
let child = ExprRef::try_from_df(like.expr().as_ref())?;
3055
let pattern = ExprRef::try_from_df(like.pattern().as_ref())?;
31-
return Ok(
32-
LikeExpr::new(child, pattern, like.negated(), like.case_insensitive()).into_expr(),
33-
);
56+
return Ok(LikeExpr::new_expr(
57+
child,
58+
pattern,
59+
like.negated(),
60+
like.case_insensitive(),
61+
));
3462
}
3563

36-
if let Some(literal) = df.as_any().downcast_ref::<expressions::Literal>() {
64+
if let Some(literal) = df.as_any().downcast_ref::<df_expr::Literal>() {
3765
let value = Scalar::from_df(literal.value());
3866
return Ok(lit(value));
3967
}
@@ -92,3 +120,55 @@ impl TryFromDataFusion<DFOperator> for Operator {
92120
}
93121
}
94122
}
123+
124+
pub(crate) fn can_be_pushed_down(expr: &PhysicalExprRef, schema: &Schema) -> bool {
125+
let expr = expr.as_any();
126+
if let Some(binary) = expr.downcast_ref::<df_expr::BinaryExpr>() {
127+
can_binary_be_pushed_down(binary, schema)
128+
} else if let Some(col) = expr.downcast_ref::<df_expr::Column>() {
129+
schema
130+
.field_with_name(col.name())
131+
.ok()
132+
.is_some_and(|field| supported_data_types(field.data_type()))
133+
} else if let Some(like) = expr.downcast_ref::<df_expr::LikeExpr>() {
134+
can_be_pushed_down(like.expr(), schema) && can_be_pushed_down(like.pattern(), schema)
135+
} else if let Some(lit) = expr.downcast_ref::<df_expr::Literal>() {
136+
supported_data_types(&lit.value().data_type())
137+
} else {
138+
log::debug!("DataFusion expression can't be pushed down: {expr:?}");
139+
false
140+
}
141+
}
142+
143+
fn can_binary_be_pushed_down(binary: &df_expr::BinaryExpr, schema: &Schema) -> bool {
144+
let is_op_supported =
145+
binary.op().is_logic_operator() || SUPPORTED_BINARY_OPS.contains(binary.op());
146+
is_op_supported
147+
&& can_be_pushed_down(binary.left(), schema)
148+
&& can_be_pushed_down(binary.right(), schema)
149+
}
150+
151+
fn supported_data_types(dt: &DataType) -> bool {
152+
use DataType::*;
153+
let is_supported = dt.is_null()
154+
|| dt.is_numeric()
155+
|| matches!(
156+
dt,
157+
Boolean
158+
| Utf8
159+
| Utf8View
160+
| Binary
161+
| BinaryView
162+
| Date32
163+
| Date64
164+
| Timestamp(_, _)
165+
| Time32(_)
166+
| Time64(_)
167+
);
168+
169+
if !is_supported {
170+
log::debug!("DataFusion data type {dt:?} is not supported");
171+
}
172+
173+
is_supported
174+
}

vortex-datafusion/src/convert/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use vortex::error::VortexResult;
55

6-
mod exprs;
6+
pub(crate) mod exprs;
77
mod scalars;
88

99
/// First-party trait for implementing conversion from DataFusion types to Vortex types.

vortex-datafusion/src/convert/scalars.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use std::sync::Arc;
55

6-
use datafusion::common::ScalarValue;
6+
use datafusion_common::ScalarValue;
77
use vortex::buffer::ByteBuffer;
88
use vortex::dtype::datetime::arrow::make_temporal_ext_dtype;
99
use vortex::dtype::datetime::{TemporalMetadata, TimeUnit, is_temporal_ext_type};

vortex-datafusion/src/lib.rs

Lines changed: 1 addition & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -5,74 +5,14 @@
55
#![deny(missing_docs)]
66
use std::fmt::Debug;
77

8-
use datafusion::arrow::datatypes::{DataType, Schema};
9-
use datafusion::common::stats::Precision as DFPrecision;
10-
use datafusion::logical_expr::Operator;
11-
use datafusion::physical_expr::PhysicalExprRef;
12-
use datafusion::physical_plan::expressions::{BinaryExpr, Column, LikeExpr, Literal};
8+
use datafusion_common::stats::Precision as DFPrecision;
139
use vortex::stats::Precision;
1410

1511
mod convert;
1612
mod persistent;
1713

1814
pub use persistent::*;
1915

20-
const SUPPORTED_BINARY_OPS: &[Operator] = &[
21-
Operator::Eq,
22-
Operator::NotEq,
23-
Operator::Gt,
24-
Operator::GtEq,
25-
Operator::Lt,
26-
Operator::LtEq,
27-
];
28-
29-
fn supported_data_types(dt: DataType) -> bool {
30-
use DataType::*;
31-
let is_supported = dt.is_integer()
32-
|| dt.is_floating()
33-
|| dt.is_null()
34-
|| matches!(
35-
dt,
36-
Boolean
37-
| Utf8
38-
| Utf8View
39-
| Binary
40-
| BinaryView
41-
| Date32
42-
| Date64
43-
| Timestamp(_, _)
44-
| Time32(_)
45-
| Time64(_)
46-
);
47-
48-
if !is_supported {
49-
log::debug!("DataFusion data type {dt:?} is not supported");
50-
}
51-
52-
is_supported
53-
}
54-
55-
fn can_be_pushed_down(expr: &PhysicalExprRef, schema: &Schema) -> bool {
56-
let expr = expr.as_any();
57-
if let Some(binary) = expr.downcast_ref::<BinaryExpr>() {
58-
(binary.op().is_logic_operator() || SUPPORTED_BINARY_OPS.contains(binary.op()))
59-
&& can_be_pushed_down(binary.left(), schema)
60-
&& can_be_pushed_down(binary.right(), schema)
61-
} else if let Some(col) = expr.downcast_ref::<Column>() {
62-
schema
63-
.column_with_name(col.name())
64-
.map(|(_, field)| supported_data_types(field.data_type().clone()))
65-
.unwrap_or(false)
66-
} else if let Some(like) = expr.downcast_ref::<LikeExpr>() {
67-
can_be_pushed_down(like.expr(), schema) && can_be_pushed_down(like.pattern(), schema)
68-
} else if let Some(lit) = expr.downcast_ref::<Literal>() {
69-
supported_data_types(lit.value().data_type())
70-
} else {
71-
log::debug!("DataFusion expression can't be pushed down: {expr:?}");
72-
false
73-
}
74-
}
75-
7616
/// Extension trait to convert our [`Precision`](vortex::stats::Precision) to Datafusion's [`Precision`](datafusion_common::stats::Precision)
7717
trait PrecisionExt<T>
7818
where

vortex-datafusion/src/persistent/cache.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::sync::Arc;
55

66
use async_trait::async_trait;
77
use chrono::{DateTime, Utc};
8-
use datafusion::common::ScalarValue;
8+
use datafusion_common::ScalarValue;
99
use moka::future::Cache;
1010
use object_store::path::Path;
1111
use object_store::{ObjectMeta, ObjectStore};

vortex-datafusion/src/persistent/config.rs

Lines changed: 0 additions & 50 deletions
This file was deleted.

0 commit comments

Comments
 (0)