Skip to content

Commit 12cdfd3

Browse files
committed
fixup
Signed-off-by: Andrew Duffy <[email protected]>
1 parent 4264c02 commit 12cdfd3

File tree

2 files changed

+68
-51
lines changed

2 files changed

+68
-51
lines changed

vortex-datafusion/src/persistent/opener.rs

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,8 @@ mod tests {
435435
use datafusion_datasource::file::FileSource;
436436
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
437437
use datafusion_execution::object_store::ObjectStoreUrl;
438+
use datafusion_physical_expr::utils::conjunction;
439+
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
438440
use datafusion_physical_plan::filter_pushdown::PushedDown;
439441
use futures::pin_mut;
440442
use insta::assert_snapshot;
@@ -454,6 +456,30 @@ mod tests {
454456
use super::*;
455457
use crate::VortexSource;
456458

459+
fn check_pushdown_result(
460+
filters: Vec<PhysicalExprRef>,
461+
expected_pushed_filters: impl IntoIterator<Item = PhysicalExprRef>,
462+
pushdown_result: &FilterPushdownPropagation<Arc<dyn FileSource>>,
463+
) {
464+
assert_eq!(filters.len(), pushdown_result.filters.len());
465+
466+
for filter in &pushdown_result.filters {
467+
assert!(matches!(filter, PushedDown::No));
468+
}
469+
470+
let updated_src = pushdown_result
471+
.updated_node
472+
.as_ref()
473+
.expect("try_pushdown_filters for VortexSource should always return updated node");
474+
let vortex_src = updated_src
475+
.as_any()
476+
.downcast_ref::<VortexSource>()
477+
.expect("downcast to VortexSource");
478+
479+
let expected = conjunction(expected_pushed_filters);
480+
assert_eq!(Some(expected), vortex_src.pushed_predicate);
481+
}
482+
457483
/// Fixtures used for integration testing the FileSource and FileOpener
458484
struct TestFixtures {
459485
object_store: Arc<dyn ObjectStore>,
@@ -563,6 +589,8 @@ mod tests {
563589
let push_filters =
564590
source.try_pushdown_filters(vec![filter_partition_col], &ConfigOptions::default())?;
565591

592+
let source = push_filters.updated_node.unwrap();
593+
566594
assert!(matches!(push_filters.filters[0], PushedDown::No));
567595

568596
let base_config = FileScanConfigBuilder::new(
@@ -620,9 +648,11 @@ mod tests {
620648
let filter = col("a").lt(lit(100_i32));
621649
let filter = logical2physical(&filter, table_schema.as_ref());
622650
let pushdown_result =
623-
source.try_pushdown_filters(vec![filter], &ConfigOptions::default())?;
624-
// filter should've succeeded pushing
625-
assert!(matches!(pushdown_result.filters[0], PushedDown::Yes));
651+
source.try_pushdown_filters(vec![filter.clone()], &ConfigOptions::default())?;
652+
653+
check_pushdown_result(vec![filter.clone()], vec![filter.clone()], &pushdown_result);
654+
655+
let source = pushdown_result.updated_node.unwrap();
626656

627657
let base_config = FileScanConfigBuilder::new(
628658
ObjectStoreUrl::parse("s3://in-memory")?,
@@ -785,10 +815,11 @@ mod tests {
785815

786816
let filter = logical2physical(&col("my_struct").is_not_null(), &table_schema);
787817
let pushdown_result =
788-
source.try_pushdown_filters(vec![filter], &ConfigOptions::default())?;
818+
source.try_pushdown_filters(vec![filter.clone()], &ConfigOptions::default())?;
819+
820+
check_pushdown_result(vec![filter.clone()], vec![filter], &pushdown_result);
789821

790-
// The filter should not have been pushed
791-
assert!(matches!(pushdown_result.filters[0], PushedDown::Yes));
822+
let source = pushdown_result.updated_node.unwrap();
792823

793824
let base_config = FileScanConfigBuilder::new(
794825
ObjectStoreUrl::parse("s3://in-memory")?,

vortex-datafusion/src/persistent/source.rs

Lines changed: 31 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use datafusion_physical_plan::DisplayFormatType;
2424
use datafusion_physical_plan::PhysicalExpr;
2525
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
2626
use datafusion_physical_plan::filter_pushdown::PushedDown;
27-
use datafusion_physical_plan::filter_pushdown::PushedDownPredicate;
2827
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
2928
use object_store::ObjectStore;
3029
use object_store::path::Path;
@@ -52,11 +51,11 @@ pub struct VortexSource {
5251
pub(crate) full_predicate: Option<PhysicalExprRef>,
5352
/// Subset of predicates that can be pushed down into Vortex scan operations.
5453
/// These are expressions that Vortex can efficiently evaluate during scanning.
55-
pub(crate) vortex_predicate: Option<PhysicalExprRef>,
54+
pub(crate) pushed_predicate: Option<PhysicalExprRef>,
5655
pub(crate) batch_size: Option<usize>,
5756
pub(crate) projected_statistics: Option<Statistics>,
5857
/// This is the file schema the table expects, which is the table's schema without partition columns, and **not** the file's physical schema.
59-
pub(crate) arrow_file_schema: Option<SchemaRef>,
58+
pub(crate) table_schema: Option<SchemaRef>,
6059
pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
6160
pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
6261
_unused_df_metrics: ExecutionPlanMetricsSet,
@@ -72,10 +71,10 @@ impl VortexSource {
7271
session,
7372
file_cache,
7473
full_predicate: None,
75-
vortex_predicate: None,
74+
pushed_predicate: None,
7675
batch_size: None,
7776
projected_statistics: None,
78-
arrow_file_schema: None,
77+
table_schema: None,
7978
schema_adapter_factory: None,
8079
expr_adapter_factory: None,
8180
_unused_df_metrics: Default::default(),
@@ -144,7 +143,7 @@ impl FileSource for VortexSource {
144143
session: self.session.clone(),
145144
object_store,
146145
projection,
147-
filter: self.vortex_predicate.clone(),
146+
filter: self.pushed_predicate.clone(),
148147
file_pruning_predicate: self.full_predicate.clone(),
149148
expr_adapter_factory,
150149
schema_adapter_factory,
@@ -173,7 +172,7 @@ impl FileSource for VortexSource {
173172

174173
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
175174
let mut source = self.clone();
176-
source.arrow_file_schema = Some(schema);
175+
source.table_schema = Some(schema);
177176
Arc::new(source)
178177
}
179178

@@ -188,7 +187,7 @@ impl FileSource for VortexSource {
188187
}
189188

190189
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
191-
self.vortex_predicate.clone()
190+
self.pushed_predicate.clone()
192191
}
193192

194193
fn metrics(&self) -> &ExecutionPlanMetricsSet {
@@ -201,7 +200,7 @@ impl FileSource for VortexSource {
201200
.clone()
202201
.vortex_expect("projected_statistics must be set");
203202

204-
if self.vortex_predicate.is_some() {
203+
if self.pushed_predicate.is_some() {
205204
Ok(statistics.to_inexact())
206205
} else {
207206
Ok(statistics)
@@ -215,13 +214,13 @@ impl FileSource for VortexSource {
215214
fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
216215
match t {
217216
DisplayFormatType::Default | DisplayFormatType::Verbose => {
218-
if let Some(ref predicate) = self.vortex_predicate {
217+
if let Some(ref predicate) = self.pushed_predicate {
219218
write!(f, ", predicate: {predicate}")?;
220219
}
221220
}
222221
// Use TreeRender style key=value formatting to display the predicate
223222
DisplayFormatType::TreeRender => {
224-
if let Some(ref predicate) = self.vortex_predicate {
223+
if let Some(ref predicate) = self.pushed_predicate {
225224
writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
226225
};
227226
}
@@ -234,13 +233,16 @@ impl FileSource for VortexSource {
234233
filters: Vec<Arc<dyn PhysicalExpr>>,
235234
_config: &ConfigOptions,
236235
) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
236+
let num_filters = filters.len();
237+
237238
if filters.is_empty() {
238239
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
239240
vec![],
240241
));
241242
}
242243

243-
let Some(schema) = self.arrow_file_schema.as_ref() else {
244+
// only try and push filters if we know the schema
245+
let Some(schema) = self.table_schema.as_ref() else {
244246
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
245247
vec![PushedDown::No; filters.len()],
246248
));
@@ -257,47 +259,31 @@ impl FileSource for VortexSource {
257259
None => Some(conjunction(filters.clone())),
258260
};
259261

262+
// Update the predicate with any pushed filters
260263
let supported_filters = filters
261264
.into_iter()
262-
.map(|expr| {
263-
if can_be_pushed_down(&expr, schema) {
264-
PushedDownPredicate::supported(expr)
265-
} else {
266-
PushedDownPredicate::unsupported(expr)
267-
}
268-
})
265+
.filter(|expr| can_be_pushed_down(&expr, schema))
269266
.collect::<Vec<_>>();
270267

271-
if supported_filters
272-
.iter()
273-
.all(|p| matches!(p.discriminant, PushedDown::No))
274-
{
275-
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
276-
vec![PushedDown::No; supported_filters.len()],
277-
)
278-
.with_updated_node(Arc::new(source) as _));
279-
}
280-
281-
let supported = supported_filters
282-
.iter()
283-
.filter_map(|p| match p.discriminant {
284-
PushedDown::Yes => Some(&p.predicate),
285-
PushedDown::No => None,
286-
})
287-
.cloned();
288-
289-
let predicate = match source.vortex_predicate {
290-
Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
291-
None => conjunction(supported),
268+
let predicate = match source.pushed_predicate {
269+
Some(predicate) => conjunction(std::iter::once(predicate).chain(supported_filters)),
270+
None => conjunction(supported_filters),
292271
};
293272

294-
tracing::debug!(%predicate, "Saving predicate");
273+
tracing::debug!(%predicate, "updating predicate with new filters");
295274

296-
source.vortex_predicate = Some(predicate);
275+
source.pushed_predicate = Some(predicate);
297276

298-
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
299-
supported_filters.iter().map(|f| f.discriminant).collect(),
300-
)
277+
// NOTE: we always report no pushdown to DataFusion, which forces it to postfilter our
278+
// results. Due to schema evolution and schema adapters/expression adapters, we can't
279+
// guarantee that filters over missing columns can be executed directly in Vortex.
280+
//
281+
// But, we still return the updated source node so that the filters are used for
282+
// zone map pruning.
283+
Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![
284+
PushedDown::No;
285+
num_filters
286+
])
301287
.with_updated_node(Arc::new(source) as _))
302288
}
303289

0 commit comments

Comments
 (0)