Skip to content

Commit 07a1593

Browse files
committed
[experiment] force DF to do postfiltering
Signed-off-by: Andrew Duffy <[email protected]>
1 parent a12e5c7 commit 07a1593

File tree

2 files changed

+22
-29
lines changed

2 files changed

+22
-29
lines changed

vortex-datafusion/src/persistent/opener.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -586,12 +586,12 @@ mod tests {
586586

587587
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
588588
let file1_path = "/path/file1.vortex";
589-
let batch1 = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
589+
let batch1 = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)]))?;
590590
let data_size1 = write_arrow_to_vortex(object_store.clone(), file1_path, batch1).await?;
591591
let file1 = PartitionedFile::new(file1_path.to_string(), data_size1);
592592

593593
let file2_path = "/path/file2.vortex";
594-
let batch2 = record_batch!(("a", Int16, vec![Some(-1), Some(-2), Some(-3)])).unwrap();
594+
let batch2 = record_batch!(("a", Int16, vec![Some(-1), Some(-2), Some(-3)]))?;
595595
let data_size2 = write_arrow_to_vortex(object_store.clone(), file2_path, batch2).await?;
596596
let file2 = PartitionedFile::new(file1_path.to_string(), data_size1);
597597

vortex-datafusion/src/persistent/source.rs

Lines changed: 20 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,11 @@ impl FileSource for VortexSource {
230230
filters: Vec<Arc<dyn PhysicalExpr>>,
231231
_config: &ConfigOptions,
232232
) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
233+
// NOTE(aduffy): we never report to DF that we "pushed down" the filter, as this can play
234+
// oddly with schema evolution. We always want DataFusion to insert a FilterExec node
235+
// above us, so that any data we don't successfully filter does get postfiltered. We do
236+
// capture any filters that we believe we can pushdown however. This lets us prune data
237+
// before we read it into memory.
233238
if filters.is_empty() {
234239
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
235240
vec![],
@@ -254,46 +259,34 @@ impl FileSource for VortexSource {
254259
};
255260

256261
let supported_filters = filters
262+
.clone()
257263
.into_iter()
258-
.map(|expr| {
259-
if can_be_pushed_down(&expr, schema) {
260-
PushedDownPredicate::supported(expr)
261-
} else {
262-
PushedDownPredicate::unsupported(expr)
263-
}
264-
})
264+
.filter(|expr| can_be_pushed_down(&expr, schema))
265265
.collect::<Vec<_>>();
266266

267-
if supported_filters
268-
.iter()
269-
.all(|p| matches!(p.discriminant, PushedDown::No))
270-
{
271-
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
272-
vec![PushedDown::No; supported_filters.len()],
273-
)
267+
if supported_filters.is_empty() {
268+
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![
269+
PushedDown::No;
270+
filters.len()
271+
])
274272
.with_updated_node(Arc::new(source) as _));
275273
}
276274

277-
let supported = supported_filters
278-
.iter()
279-
.filter_map(|p| match p.discriminant {
280-
PushedDown::Yes => Some(&p.predicate),
281-
PushedDown::No => None,
282-
})
283-
.cloned();
284-
275+
// We might need to append to the predicate multiple times.
285276
let predicate = match source.vortex_predicate {
286-
Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
287-
None => conjunction(supported),
277+
Some(predicate) => conjunction(std::iter::once(predicate).chain(supported_filters)),
278+
None => conjunction(supported_filters),
288279
};
289280

290281
tracing::debug!(%predicate, "Saving predicate");
291282

292283
source.vortex_predicate = Some(predicate);
293284

294-
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
295-
supported_filters.iter().map(|f| f.discriminant).collect(),
296-
)
285+
// Report no pushdown, but update the set of filters we try and optimistically apply at scan
286+
Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![
287+
PushedDown::No;
288+
filters.len()
289+
])
297290
.with_updated_node(Arc::new(source) as _))
298291
}
299292

0 commit comments

Comments
 (0)