Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ vortex-utils = { workspace = true, features = ["dashmap"] }
[dev-dependencies]
anyhow = { workspace = true }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
insta = { workspace = true }
rstest = { workspace = true }
tempfile = { workspace = true }
Expand Down
249 changes: 171 additions & 78 deletions vortex-datafusion/src/convert/exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ pub(crate) fn can_be_pushed_down(df_expr: &PhysicalExprRef, schema: &Schema) ->
can_be_pushed_down(in_list.expr(), schema)
&& in_list.list().iter().all(|e| can_be_pushed_down(e, schema))
} else if let Some(scalar_fn) = expr.downcast_ref::<ScalarFunctionExpr>() {
can_scalar_fn_be_pushed_down(scalar_fn, schema)
// Only get_field expressions should be pushed down. Note, we know that
// the GetFieldFunc call should be well-formed, because the DataFusion planner
// checks that for us before we even get to the DataSource.
ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn).is_some()
} else {
tracing::debug!(%df_expr, "DataFusion expression can't be pushed down");
false
Expand Down Expand Up @@ -276,69 +279,37 @@ fn supported_data_types(dt: &DataType) -> bool {
is_supported
}

/// Checks if a GetField scalar function can be pushed down.
fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr, schema: &Schema) -> bool {
let Some(get_field_fn) = ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn)
else {
// Only get_field pushdown is supported.
return false;
};

let args = get_field_fn.args();
if args.len() != 2 {
tracing::debug!(
"Expected 2 arguments for GetField, not pushing down {} arguments",
args.len()
);
return false;
}
let source_expr = &args[0];
let field_name_expr = &args[1];
let Some(field_name) = field_name_expr
.as_any()
.downcast_ref::<df_expr::Literal>()
.and_then(|lit| lit.value().try_as_str().flatten())
else {
return false;
};

let Ok(source_dt) = source_expr.data_type(schema) else {
tracing::debug!(
field_name = field_name,
schema = ?schema,
source_expr = ?source_expr,
"Failed to get source type for GetField, not pushing down"
);
return false;
};
let DataType::Struct(fields) = source_dt else {
tracing::debug!(
field_name = field_name,
schema = ?schema,
source_expr = ?source_expr,
"Failed to get source type as struct for GetField, not pushing down"
);
return false;
};
fields.find(field_name).is_some()
}

#[cfg(test)]
mod tests {
use std::any::Any;
use std::sync::Arc;

use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit as ArrowTimeUnit};
use arrow_schema::{
DataType, Field, Schema, SchemaBuilder, SchemaRef, TimeUnit as ArrowTimeUnit,
};
use datafusion::functions::core::getfield::GetFieldFunc;
use datafusion_common::ScalarValue;
use datafusion::logical_expr::{ColumnarValue, Signature};
use datafusion_common::config::ConfigOptions;
use datafusion_expr::{Operator as DFOperator, ScalarUDF};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_datasource::file::FileSource;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{
Expr, Operator as DFOperator, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Volatility, col,
};
use datafusion_functions::expr_fn::get_field;
use datafusion_physical_expr::{PhysicalExpr, create_physical_expr};
use datafusion_physical_plan::expressions as df_expr;
use datafusion_physical_plan::filter_pushdown::PushedDown;
use insta::assert_snapshot;
use rstest::rstest;
use vortex::VortexSessionDefault;
use vortex::expr::{Expression, Operator};
use vortex::session::VortexSession;

use super::*;
use crate::VortexSource;
use crate::persistent::cache::VortexFileCache;

#[rstest::fixture]
fn test_schema() -> Schema {
Expand Down Expand Up @@ -510,7 +481,8 @@ mod tests {
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
false
)]
#[case::struct_type(DataType::Struct(vec![Field::new("field", DataType::Int32, true)].into()), false)]
#[case::struct_type(DataType::Struct(vec![Field::new("field", DataType::Int32, true)].into()
), false)]
// Dictionary types - should be supported if value type is supported
#[case::dict_utf8(
DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
Expand Down Expand Up @@ -652,33 +624,154 @@ mod tests {
"#);
}

#[rstest]
#[case::valid_field("field1", true)]
#[case::missing_field("nonexistent_field", false)]
fn test_can_be_pushed_down_get_field(#[case] field_name: &str, #[case] expected: bool) {
let struct_fields = Fields::from(vec![
Field::new("field1", DataType::Utf8, true),
Field::new("field2", DataType::Int32, true),
]);
let schema = Schema::new(vec![Field::new(
"my_struct",
DataType::Struct(struct_fields),
true,
)]);
#[test]
fn test_pushdown_nested_filter() {
// schema:
// a: struct
// |- one: i32
// b:struct
// |- two: i32
let mut test_schema = SchemaBuilder::new();
test_schema.push(Field::new_struct(
"a",
vec![Field::new("one", DataType::Int32, false)],
false,
));
test_schema.push(Field::new_struct(
"b",
vec![Field::new("two", DataType::Int32, false)],
false,
));

let struct_col = Arc::new(df_expr::Column::new("my_struct", 0)) as Arc<dyn PhysicalExpr>;
let field_name_lit = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some(
field_name.to_string(),
)))) as Arc<dyn PhysicalExpr>;
let test_schema = Arc::new(test_schema.finish());
// Make sure filter is pushed down
let filter = get_field(col("b"), "two").eq(datafusion_expr::lit(10i32));

let get_field_expr = Arc::new(ScalarFunctionExpr::new(
"get_field",
Arc::new(ScalarUDF::from(GetFieldFunc::new())),
vec![struct_col, field_name_lit],
Arc::new(Field::new(field_name, DataType::Utf8, true)),
Arc::new(ConfigOptions::new()),
)) as Arc<dyn PhysicalExpr>;
let df_schema = test_schema.clone().to_dfschema().unwrap();

let physical_filter =
create_physical_expr(&filter, &df_schema, &ExecutionProps::default()).unwrap();

let source = vortex_source(&test_schema);

let prop = source
.try_pushdown_filters(vec![physical_filter], &ConfigOptions::default())
.unwrap();
assert!(matches!(prop.filters[0], PushedDown::Yes));
}

#[test]
fn test_pushdown_deeply_nested_filter() {
// schema:
// a: struct
// |- b: struct
// |- c: i32
let mut schema = SchemaBuilder::new();

let a = Field::new_struct(
"a",
vec![Field::new_struct(
"b",
vec![Field::new("c", DataType::Int32, false)],
false,
)],
false,
);
schema.push(a);

let schema = Arc::new(schema.finish());
let df_schema = schema.clone().to_dfschema().unwrap();

let source = vortex_source(&schema);

let deep_filter = get_field(get_field(col("a"), "b"), "c").eq(datafusion_expr::lit(10i32));

let physical_filter =
create_physical_expr(&deep_filter, &df_schema, &ExecutionProps::default()).unwrap();

let prop = source
.try_pushdown_filters(vec![physical_filter], &ConfigOptions::default())
.unwrap();
assert!(matches!(prop.filters[0], PushedDown::Yes));
}

#[test]
fn test_unknown_scalar_function() {
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct UnknownImpl {
signature: Signature,
}

impl ScalarUDFImpl for UnknownImpl {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"unknown"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::Int32)
}

fn invoke_with_args(
&self,
_args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(1))))
}
}

// schema:
// a: struct
// |- b: struct
// |- c: i32
let mut schema = SchemaBuilder::new();

let a = Field::new_struct(
"a",
vec![Field::new_struct(
"b",
vec![Field::new("c", DataType::Int32, false)],
false,
)],
false,
);
schema.push(a);

let schema = Arc::new(schema.finish());
let df_schema = schema.clone().to_dfschema().unwrap();

let source = vortex_source(&schema);

let unknown_func = Expr::ScalarFunction(ScalarFunction {
func: Arc::new(ScalarUDF::new_from_impl(UnknownImpl {
signature: Signature::nullary(Volatility::Immutable),
})),
args: vec![],
});

// Another weird ScalarFunction that we can't push down
let deep_filter = unknown_func.eq(datafusion_expr::lit(10i32));

let physical_filter =
create_physical_expr(&deep_filter, &df_schema, &ExecutionProps::default()).unwrap();

let prop = source
.try_pushdown_filters(vec![physical_filter], &ConfigOptions::default())
.unwrap();
assert!(matches!(prop.filters[0], PushedDown::No));
}

fn vortex_source(schema: &SchemaRef) -> Arc<dyn FileSource> {
let session = VortexSession::default();
let cache = VortexFileCache::new(1024, 1024, session.clone());

assert_eq!(can_be_pushed_down(&get_field_expr, &schema), expected);
Arc::new(VortexSource::new(session, cache)).with_schema(schema.clone())
}
}
2 changes: 1 addition & 1 deletion vortex-datafusion/src/persistent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! Persistent implementation of a Vortex table provider.
mod cache;
pub(crate) mod cache;
mod format;
pub mod metrics;
mod opener;
Expand Down
4 changes: 2 additions & 2 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,12 +586,12 @@ mod tests {

let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let file1_path = "/path/file1.vortex";
let batch1 = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
let batch1 = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)]))?;
let data_size1 = write_arrow_to_vortex(object_store.clone(), file1_path, batch1).await?;
let file1 = PartitionedFile::new(file1_path.to_string(), data_size1);

let file2_path = "/path/file2.vortex";
let batch2 = record_batch!(("a", Int16, vec![Some(-1), Some(-2), Some(-3)])).unwrap();
let batch2 = record_batch!(("a", Int16, vec![Some(-1), Some(-2), Some(-3)]))?;
let data_size2 = write_arrow_to_vortex(object_store.clone(), file2_path, batch2).await?;
let file2 = PartitionedFile::new(file1_path.to_string(), data_size1);

Expand Down
47 changes: 20 additions & 27 deletions vortex-datafusion/src/persistent/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ impl FileSource for VortexSource {
filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
// NOTE(aduffy): we never report to DF that we "pushed down" the filter, as this can play
// oddly with schema evolution. We always want DataFusion to insert a FilterExec node
// above us, so that any data we don't successfully filter does get postfiltered. We do
// capture any filters that we believe we can pushdown however. This lets us prune data
// before we read it into memory.
if filters.is_empty() {
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![],
Expand All @@ -254,46 +259,34 @@ impl FileSource for VortexSource {
};

let supported_filters = filters
.clone()
.into_iter()
.map(|expr| {
if can_be_pushed_down(&expr, schema) {
PushedDownPredicate::supported(expr)
} else {
PushedDownPredicate::unsupported(expr)
}
})
.filter(|expr| can_be_pushed_down(&expr, schema))
.collect::<Vec<_>>();

if supported_filters
.iter()
.all(|p| matches!(p.discriminant, PushedDown::No))
{
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; supported_filters.len()],
)
if supported_filters.is_empty() {
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![
PushedDown::No;
filters.len()
])
.with_updated_node(Arc::new(source) as _));
}

let supported = supported_filters
.iter()
.filter_map(|p| match p.discriminant {
PushedDown::Yes => Some(&p.predicate),
PushedDown::No => None,
})
.cloned();

// We might need to append to the predicate multiple times.
let predicate = match source.vortex_predicate {
Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
None => conjunction(supported),
Some(predicate) => conjunction(std::iter::once(predicate).chain(supported_filters)),
None => conjunction(supported_filters),
};

tracing::debug!(%predicate, "Saving predicate");

source.vortex_predicate = Some(predicate);

Ok(FilterPushdownPropagation::with_parent_pushdown_result(
supported_filters.iter().map(|f| f.discriminant).collect(),
)
// Report no pushdown, but update the set of filters we try and optimistically apply at scan
Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![
PushedDown::No;
filters.len()
])
.with_updated_node(Arc::new(source) as _))
}

Expand Down
Loading