Skip to content

Commit c178090

Browse files
committed
first changes
Signed-off-by: Adam Gutglick <[email protected]>
1 parent c0b539c commit c178090

File tree

4 files changed

+67
-84
lines changed

4 files changed

+67
-84
lines changed

vortex-datafusion/src/convert/exprs.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ use std::sync::Arc;
66
use arrow_schema::{DataType, Schema};
77
use datafusion_expr::Operator as DFOperator;
88
use datafusion_functions::core::getfield::GetFieldFunc;
9-
use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr};
10-
use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, is_dynamic_physical_expr};
9+
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr};
10+
use datafusion_physical_expr_common::physical_expr::{
11+
is_dynamic_physical_expr, snapshot_physical_expr,
12+
};
1113
use datafusion_physical_plan::expressions as df_expr;
1214
use itertools::Itertools;
1315
use vortex::compute::LikeOptions;
@@ -24,11 +26,25 @@ use crate::convert::{FromDataFusion, TryFromDataFusion};
2426

2527
/// Tries to convert the expressions into a vortex conjunction. Will return Ok(None) iff the input conjunction is empty.
2628
pub(crate) fn make_vortex_predicate(
27-
predicate: &[&Arc<dyn PhysicalExpr>],
29+
predicate: &[Arc<dyn PhysicalExpr>],
2830
) -> VortexResult<Option<Expression>> {
2931
let exprs = predicate
3032
.iter()
31-
.map(|e| Expression::try_from_df(e.as_ref()))
33+
.filter_map(|e| {
34+
if is_dynamic_physical_expr(e) {
35+
let e = snapshot_physical_expr(e.clone()).expect("do");
36+
match Expression::try_from_df(e.as_ref()) {
37+
Ok(e) => Some(Ok(e)),
38+
Err(_) => {
39+
// If we fail to convert the expression to Vortex, its safe
40+
// to drop it as we don't declare it as pushed down
41+
None
42+
}
43+
}
44+
} else {
45+
Some(Expression::try_from_df(e.as_ref()))
46+
}
47+
})
3248
.collect::<VortexResult<Vec<_>>>()?;
3349

3450
Ok(exprs.into_iter().reduce(and))
@@ -323,15 +339,15 @@ mod tests {
323339
#[test]
324340
fn test_make_vortex_predicate_single() {
325341
let col_expr = Arc::new(df_expr::Column::new("test", 0)) as Arc<dyn PhysicalExpr>;
326-
let result = make_vortex_predicate(&[&col_expr]).unwrap();
342+
let result = make_vortex_predicate(&[col_expr]).unwrap();
327343
assert!(result.is_some());
328344
}
329345

330346
#[test]
331347
fn test_make_vortex_predicate_multiple() {
332348
let col1 = Arc::new(df_expr::Column::new("col1", 0)) as Arc<dyn PhysicalExpr>;
333349
let col2 = Arc::new(df_expr::Column::new("col2", 1)) as Arc<dyn PhysicalExpr>;
334-
let result = make_vortex_predicate(&[&col1, &col2]).unwrap();
350+
let result = make_vortex_predicate(&[col1, col2]).unwrap();
335351
assert!(result.is_some());
336352
// Result should be an AND expression combining the two columns
337353
}

vortex-datafusion/src/persistent/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ mod tests {
195195

196196
#[tokio::test]
197197
async fn create_table_ordered_by() -> anyhow::Result<()> {
198-
let dir = TempDir::new().unwrap();
198+
let dir = TempDir::new()?;
199199

200200
let factory: VortexFormatFactory = VortexFormatFactory::new();
201201
let mut session_state_builder = SessionStateBuilder::new().with_default_features();
@@ -238,6 +238,8 @@ mod tests {
238238
let (state, plan) = df.clone().into_parts();
239239
let physical_plan = state.create_physical_plan(&plan).await?;
240240

241+
dbg!(physical_plan.as_ref());
242+
241243
insta::assert_snapshot!(DisplayableExecutionPlan::new(physical_plan.as_ref())
242244
.tree_render().to_string(), @r"
243245
┌───────────────────────────┐
@@ -249,8 +251,9 @@ mod tests {
249251
┌─────────────┴─────────────┐
250252
│ DataSourceExec │
251253
│ -------------------- │
252-
files: 3
254+
│ files: 14
253255
│ format: vortex │
256+
│ predicate: true │
254257
└───────────────────────────┘
255258
");
256259

vortex-datafusion/src/persistent/opener.rs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,9 @@ pub(crate) struct VortexOpener {
4040
pub object_store: Arc<dyn ObjectStore>,
4141
/// Projection by index of the file's columns
4242
pub projection: Option<Arc<[usize]>>,
43-
/// Filter expression optimized for pushdown into Vortex scan operations.
44-
/// This may be a subset of file_pruning_predicate containing only expressions
45-
/// that Vortex can efficiently evaluate.
46-
pub filter: Option<PhysicalExprRef>,
4743
/// Filter expression used by DataFusion's FilePruner to eliminate files based on
4844
/// statistics and partition values without opening them.
49-
pub file_pruning_predicate: Option<PhysicalExprRef>,
45+
pub predicate: Option<PhysicalExprRef>,
5046
pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
5147
pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
5248
/// Hive-style partitioning columns
@@ -149,8 +145,7 @@ impl FileOpener for VortexOpener {
149145
let session = self.session.clone();
150146
let object_store = self.object_store.clone();
151147
let projection = self.projection.clone();
152-
let mut filter = self.filter.clone();
153-
let file_pruning_predicate = self.file_pruning_predicate.clone();
148+
let mut predicate = self.predicate.clone();
154149
let expr_adapter_factory = self.expr_adapter_factory.clone();
155150
let partition_fields = self.partition_fields.clone();
156151
let file_cache = self.file_cache.clone();
@@ -178,7 +173,8 @@ impl FileOpener for VortexOpener {
178173
// opening them based on:
179174
// - Partition column values (e.g., date=2024-01-01)
180175
// - File-level statistics (min/max values per column)
181-
let mut file_pruner = file_pruning_predicate
176+
let mut file_pruner = predicate
177+
.clone()
182178
.map(|predicate| {
183179
// Only create pruner if we have dynamic expressions or file statistics
184180
// to work with. Static predicates without stats won't benefit from pruning.
@@ -225,15 +221,16 @@ impl FileOpener for VortexOpener {
225221

226222
// The adapter rewrites the expression to the local file schema, allowing
227223
// for schema evolution and divergence between the table's schema and individual files.
228-
filter = filter
229-
.map(|filter| {
224+
predicate = predicate
225+
.clone()
226+
.map(|expr| {
230227
let logical_file_schema =
231228
compute_logical_file_schema(&physical_file_schema, &logical_schema);
232229

233230
let expr = expr_adapter_factory
234231
.create(logical_file_schema, physical_file_schema.clone())
235232
.with_partition_values(partition_values)
236-
.rewrite(filter)?;
233+
.rewrite(expr)?;
237234

238235
// Expression might now reference columns that don't exist in the file, so we can give it
239236
// another simplification pass.
@@ -294,11 +291,15 @@ impl FileOpener for VortexOpener {
294291
);
295292
}
296293

297-
let filter = filter
294+
let filter = predicate
298295
.and_then(|f| {
299296
let exprs = split_conjunction(&f)
300297
.into_iter()
301-
.filter(|expr| can_be_pushed_down(expr, &predicate_file_schema))
298+
.cloned()
299+
.filter(|expr| {
300+
is_dynamic_physical_expr(expr)
301+
|| can_be_pushed_down(expr, &predicate_file_schema)
302+
})
302303
.collect::<Vec<_>>();
303304

304305
make_vortex_predicate(&exprs).transpose()
@@ -521,8 +522,7 @@ mod tests {
521522
session: SESSION.clone(),
522523
object_store: object_store.clone(),
523524
projection: Some([0].into()),
524-
filter: Some(filter),
525-
file_pruning_predicate: None,
525+
predicate: Some(filter),
526526
expr_adapter_factory: expr_adapter_factory.clone(),
527527
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
528528
partition_fields: vec![Arc::new(Field::new("part", DataType::Int32, false))],
@@ -602,8 +602,7 @@ mod tests {
602602
session: SESSION.clone(),
603603
object_store: object_store.clone(),
604604
projection: Some([0].into()),
605-
filter: Some(filter),
606-
file_pruning_predicate: None,
605+
predicate: Some(filter),
607606
expr_adapter_factory: expr_adapter_factory.clone(),
608607
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
609608
partition_fields: vec![],
@@ -710,11 +709,10 @@ mod tests {
710709
session: SESSION.clone(),
711710
object_store: object_store.clone(),
712711
projection: None,
713-
filter: Some(logical2physical(
712+
predicate: Some(logical2physical(
714713
&col("my_struct").is_not_null(),
715714
&table_schema,
716715
)),
717-
file_pruning_predicate: None,
718716
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
719717
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
720718
partition_fields: vec![],

vortex-datafusion/src/persistent/source.rs

Lines changed: 23 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@ use datafusion_physical_expr_adapter::{
1717
DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory,
1818
};
1919
use datafusion_physical_expr_common::physical_expr::fmt_sql;
20-
use datafusion_physical_plan::filter_pushdown::{
21-
FilterPushdownPropagation, PushedDown, PushedDownPredicate,
22-
};
20+
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
2321
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
2422
use datafusion_physical_plan::{DisplayFormatType, PhysicalExpr};
2523
use object_store::ObjectStore;
@@ -45,10 +43,7 @@ pub struct VortexSource {
4543
pub(crate) file_cache: VortexFileCache,
4644
/// Combined predicate expression containing all filters from DataFusion query planning.
4745
/// Used with FilePruner to skip files based on statistics and partition values.
48-
pub(crate) full_predicate: Option<PhysicalExprRef>,
49-
/// Subset of predicates that can be pushed down into Vortex scan operations.
50-
/// These are expressions that Vortex can efficiently evaluate during scanning.
51-
pub(crate) vortex_predicate: Option<PhysicalExprRef>,
46+
pub(crate) predicate: Option<PhysicalExprRef>,
5247
pub(crate) batch_size: Option<usize>,
5348
pub(crate) projected_statistics: Option<Statistics>,
5449
/// This is the file schema the table expects, which is the table's schema without partition columns, and **not** the file's physical schema.
@@ -67,8 +62,7 @@ impl VortexSource {
6762
Self {
6863
session,
6964
file_cache,
70-
full_predicate: None,
71-
vortex_predicate: None,
65+
predicate: None,
7266
batch_size: None,
7367
projected_statistics: None,
7468
arrow_file_schema: None,
@@ -140,8 +134,8 @@ impl FileSource for VortexSource {
140134
session: self.session.clone(),
141135
object_store,
142136
projection,
143-
filter: self.vortex_predicate.clone(),
144-
file_pruning_predicate: self.full_predicate.clone(),
137+
138+
predicate: self.predicate.clone(),
145139
expr_adapter_factory,
146140
schema_adapter_factory,
147141
partition_fields: base_config.table_partition_cols.clone(),
@@ -184,7 +178,7 @@ impl FileSource for VortexSource {
184178
}
185179

186180
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
187-
self.vortex_predicate.clone()
181+
self.predicate.clone()
188182
}
189183

190184
fn metrics(&self) -> &ExecutionPlanMetricsSet {
@@ -197,7 +191,7 @@ impl FileSource for VortexSource {
197191
.clone()
198192
.vortex_expect("projected_statistics must be set");
199193

200-
if self.vortex_predicate.is_some() {
194+
if self.predicate.is_some() {
201195
Ok(statistics.to_inexact())
202196
} else {
203197
Ok(statistics)
@@ -211,13 +205,13 @@ impl FileSource for VortexSource {
211205
fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
212206
match t {
213207
DisplayFormatType::Default | DisplayFormatType::Verbose => {
214-
if let Some(ref predicate) = self.vortex_predicate {
208+
if let Some(ref predicate) = self.predicate {
215209
write!(f, ", predicate: {predicate}")?;
216210
}
217211
}
218212
// Use TreeRender style key=value formatting to display the predicate
219213
DisplayFormatType::TreeRender => {
220-
if let Some(ref predicate) = self.vortex_predicate {
214+
if let Some(ref predicate) = self.predicate {
221215
writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
222216
};
223217
}
@@ -238,57 +232,29 @@ impl FileSource for VortexSource {
238232

239233
let mut source = self.clone();
240234

241-
// Combine new filters with existing predicate for file pruning.
242-
// This full predicate is used by FilePruner to eliminate files.
243-
source.full_predicate = match source.full_predicate {
244-
Some(predicate) => Some(conjunction(
245-
std::iter::once(predicate).chain(filters.clone()),
246-
)),
247-
None => Some(conjunction(filters.clone())),
248-
};
249-
250-
let supported_filters = filters
251-
.into_iter()
235+
let supported = filters
236+
.iter()
252237
.map(|expr| {
253-
if can_be_pushed_down(&expr, schema) {
254-
PushedDownPredicate::supported(expr)
238+
if can_be_pushed_down(expr, schema) {
239+
PushedDown::Yes
255240
} else {
256-
PushedDownPredicate::unsupported(expr)
241+
PushedDown::No
257242
}
258243
})
259244
.collect::<Vec<_>>();
260245

261-
if supported_filters
262-
.iter()
263-
.all(|p| matches!(p.discriminant, PushedDown::No))
264-
{
265-
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
266-
vec![PushedDown::No; supported_filters.len()],
267-
)
268-
.with_updated_node(Arc::new(source) as _));
269-
}
270-
271-
let supported = supported_filters
272-
.iter()
273-
.filter_map(|p| match p.discriminant {
274-
PushedDown::Yes => Some(&p.predicate),
275-
PushedDown::No => None,
276-
})
277-
.cloned();
278-
279-
let predicate = match source.vortex_predicate {
280-
Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
281-
None => conjunction(supported),
246+
// Combine new filters with existing predicate
247+
source.predicate = match source.predicate {
248+
Some(predicate) => Some(conjunction(
249+
std::iter::once(predicate).chain(filters.clone()),
250+
)),
251+
None => Some(conjunction(filters.clone())),
282252
};
283253

284-
tracing::debug!(%predicate, "Saving predicate");
285-
286-
source.vortex_predicate = Some(predicate);
287-
288-
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
289-
supported_filters.iter().map(|f| f.discriminant).collect(),
254+
Ok(
255+
FilterPushdownPropagation::with_parent_pushdown_result(supported)
256+
.with_updated_node(Arc::new(source) as _),
290257
)
291-
.with_updated_node(Arc::new(source) as _))
292258
}
293259

294260
fn with_schema_adapter_factory(

0 commit comments

Comments
 (0)