Skip to content

Commit 3c85227

Browse files
committed
think this does it?
Signed-off-by: Andrew Duffy <[email protected]>
1 parent df70214 commit 3c85227

File tree

5 files changed

+42
-29
lines changed

5 files changed

+42
-29
lines changed

vortex-datafusion/src/convert/exprs.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,22 @@ use datafusion_expr::Operator as DFOperator;
99
use datafusion_functions::core::getfield::GetFieldFunc;
1010
use datafusion_physical_expr::PhysicalExpr;
1111
use datafusion_physical_expr::ScalarFunctionExpr;
12-
use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr;
1312
use datafusion_physical_expr_common::physical_expr::PhysicalExprRef;
13+
use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr;
1414
use datafusion_physical_plan::expressions as df_expr;
1515
use itertools::Itertools;
1616
use vortex::compute::LikeOptions;
17-
use vortex::dtype::arrow::FromArrowType;
1817
use vortex::dtype::DType;
1918
use vortex::dtype::Nullability;
19+
use vortex::dtype::arrow::FromArrowType;
20+
use vortex::error::VortexResult;
2021
use vortex::error::vortex_bail;
2122
use vortex::error::vortex_err;
22-
use vortex::error::VortexResult;
23+
use vortex::expr::Binary;
24+
use vortex::expr::Expression;
25+
use vortex::expr::Like;
26+
use vortex::expr::Operator;
27+
use vortex::expr::VTableExt;
2328
use vortex::expr::and;
2429
use vortex::expr::cast;
2530
use vortex::expr::get_item;
@@ -28,11 +33,6 @@ use vortex::expr::list_contains;
2833
use vortex::expr::lit;
2934
use vortex::expr::not;
3035
use vortex::expr::root;
31-
use vortex::expr::Binary;
32-
use vortex::expr::Expression;
33-
use vortex::expr::Like;
34-
use vortex::expr::Operator;
35-
use vortex::expr::VTableExt;
3636
use vortex::scalar::Scalar;
3737

3838
use crate::convert::FromDataFusion;
@@ -301,8 +301,8 @@ mod tests {
301301
use arrow_schema::Schema;
302302
use arrow_schema::TimeUnit as ArrowTimeUnit;
303303
use datafusion::functions::core::getfield::GetFieldFunc;
304-
use datafusion_common::config::ConfigOptions;
305304
use datafusion_common::ScalarValue;
305+
use datafusion_common::config::ConfigOptions;
306306
use datafusion_expr::Operator as DFOperator;
307307
use datafusion_expr::ScalarUDF;
308308
use datafusion_physical_expr::PhysicalExpr;

vortex-datafusion/src/persistent/adapter.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,30 @@
2424
//!
2525
//! `datafusion/datasource/src/schema_adapter.rs` -> for can_cast_field (which is crate-private)
2626
27-
use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
27+
use std::fmt::Debug;
28+
use std::sync::Arc;
29+
30+
use arrow_schema::DataType;
31+
use arrow_schema::Field;
32+
use arrow_schema::FieldRef;
33+
use arrow_schema::Schema;
34+
use arrow_schema::SchemaRef;
2835
use datafusion_common::ScalarValue;
2936
use datafusion_common::arrow::compute::can_cast_types;
37+
use datafusion_common::exec_err;
3038
use datafusion_common::nested_struct::validate_struct_compatibility;
31-
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
32-
use datafusion_common::{exec_err, plan_err};
39+
use datafusion_common::plan_err;
40+
use datafusion_common::tree_node::Transformed;
41+
use datafusion_common::tree_node::TransformedResult;
42+
use datafusion_common::tree_node::TreeNode;
3343
use datafusion_functions::core::getfield::GetFieldFunc;
34-
use datafusion_physical_expr::expressions::{CastExpr, Column};
35-
use datafusion_physical_expr::{ScalarFunctionExpr, expressions};
36-
use datafusion_physical_expr_adapter::{PhysicalExprAdapter, PhysicalExprAdapterFactory};
44+
use datafusion_physical_expr::ScalarFunctionExpr;
45+
use datafusion_physical_expr::expressions;
46+
use datafusion_physical_expr::expressions::CastExpr;
47+
use datafusion_physical_expr::expressions::Column;
48+
use datafusion_physical_expr_adapter::PhysicalExprAdapter;
49+
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
3750
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
38-
use std::fmt::Debug;
39-
use std::sync::Arc;
4051

4152
#[derive(Debug, Clone)]
4253
pub struct DefaultPhysicalExprAdapterFactory;

vortex-datafusion/src/persistent/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
//! Persistent implementation of a Vortex table provider.
5+
pub(crate) mod adapter;
56
mod cache;
67
mod format;
78
pub mod metrics;
89
mod opener;
910
mod sink;
1011
mod source;
11-
pub(crate) mod adapter;
1212

1313
pub use format::VortexFormat;
1414
pub use format::VortexFormatFactory;

vortex-datafusion/src/persistent/opener.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use std::ops::Range;
5+
use std::sync::Arc;
6+
use std::sync::LazyLock;
57
use std::sync::Weak;
6-
use std::sync::{Arc, LazyLock};
78

89
use arrow_schema::ArrowError;
910
use arrow_schema::Field;
@@ -16,7 +17,8 @@ use datafusion_datasource::PartitionedFile;
1617
use datafusion_datasource::file_meta::FileMeta;
1718
use datafusion_datasource::file_stream::FileOpenFuture;
1819
use datafusion_datasource::file_stream::FileOpener;
19-
use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory};
20+
use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
21+
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
2022
use datafusion_physical_expr::PhysicalExprRef;
2123
use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
2224
use datafusion_physical_expr::split_conjunction;
@@ -180,28 +182,26 @@ impl FileOpener for VortexOpener {
180182
// for schema evolution and divergence between the table's schema and individual files.
181183
filter = filter
182184
.map(|filter| {
183-
// Rewrite the filter to cast into the logical file schema field types
185+
// Rewrite the filter to properly handle values in the table schema
184186
let expr = expr_adapter_factory
185187
.create(table_schema.clone(), physical_file_schema.clone())
186188
.with_partition_values(partition_values)
187-
.rewrite(filter)
188-
.expect("rewrite");
189+
.rewrite(filter)?;
189190

190191
// Expression might now reference columns that don't exist in the file, so we can give it
191192
// another simplification pass.
192-
PhysicalExprSimplifier::new(table_schema.as_ref()).simplify(expr)
193+
PhysicalExprSimplifier::new(physical_file_schema.as_ref())
194+
.simplify(expr.clone())
193195
})
194196
.transpose()?;
195197

196-
let predicate_file_schema = physical_file_schema;
197-
198198
let (schema_mapping, adapted_projections) =
199-
schema_adapter.map_schema(&predicate_file_schema)?;
199+
schema_adapter.map_schema(&physical_file_schema)?;
200200

201201
// We use the field names from pushdown expression instead.
202202
let field_names: Vec<FieldName> = adapted_projections
203203
.into_iter()
204-
.map(|index| FieldName::from(predicate_file_schema.field(index).name().as_str()))
204+
.map(|index| FieldName::from(physical_file_schema.field(index).name().as_str()))
205205
.collect();
206206
let projection_expr = select(field_names, root());
207207

@@ -247,7 +247,7 @@ impl FileOpener for VortexOpener {
247247
.and_then(|f| {
248248
let exprs = split_conjunction(&f)
249249
.into_iter()
250-
.filter(|expr| can_be_pushed_down(expr, &predicate_file_schema))
250+
.filter(|expr| can_be_pushed_down(expr, &physical_file_schema))
251251
.collect::<Vec<_>>();
252252

253253
make_vortex_predicate(&exprs).transpose()

vortex-datafusion/tests/schema_evolution.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,13 @@ async fn test_filter_with_schema_evolution() {
142142
record_batch(
143143
&table_schema,
144144
vec![
145+
// a
145146
Arc::new(StringViewArray::from(vec![
146147
Some("one"),
147148
Some("two"),
148149
Some("three"),
149150
])) as ArrowArrayRef,
151+
// b
150152
Arc::new(StringViewArray::from(vec![
151153
Option::<&str>::None,
152154
None,

0 commit comments

Comments
 (0)