Skip to content

Commit 35ff4ab

Browse files
adriangbclaude
andauthored
Allow logical optimizer to be run without evaluating now() & refactor SimplifyInfo (#19505)
In trying to fix #19418 I kept getting turned around about what was needed where. The `SimplifyInfo` trait made it extra hard to understand. I ended up realizing that the main reason for the trait to exist was tests. Removing the trait and adding a builder style API to `SimplifyContext` made it IMO more ergonomic for tests and other call sites, easier to track the code (no trait opaqueness) and clearer what simplification capabilities are available in each site. This got rid of e.g. some places where we were calling `ExecutionProps::new()` just to pass that into `SimplifyContext` which in turn would hand out references to the default query time, a default `ContigOptions`, etc; or in `datafusion/core/src/execution/session_state.rs` where we did `let dummy_schema = DFSchema::empty()`. This let me solve several problems: - Can store optimized logical plans for prepared statements - Users can optionally run an optimizer pass on logical plans without evaluating time functions Compared to #19426 this avoids adding a config option and is actually less lines of code (negative diff). Fixes #19418, closes #19426 (replaces it). --------- Co-authored-by: Claude Opus 4.5 <[email protected]>
1 parent 142f597 commit 35ff4ab

File tree

44 files changed

+524
-483
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+524
-483
lines changed

datafusion-examples/examples/builtin_functions/function_factory.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use datafusion::error::Result;
2424
use datafusion::execution::context::{
2525
FunctionFactory, RegisterFunction, SessionContext, SessionState,
2626
};
27-
use datafusion::logical_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
27+
use datafusion::logical_expr::simplify::{ExprSimplifyResult, SimplifyContext};
2828
use datafusion::logical_expr::sort_properties::{ExprProperties, SortProperties};
2929
use datafusion::logical_expr::{
3030
ColumnarValue, CreateFunction, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl,
@@ -145,7 +145,7 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
145145
fn simplify(
146146
&self,
147147
args: Vec<Expr>,
148-
_info: &dyn SimplifyInfo,
148+
_info: &SimplifyContext,
149149
) -> Result<ExprSimplifyResult> {
150150
let replacement = Self::replacement(&self.expr, &args)?;
151151

datafusion-examples/examples/query_planning/expr_api.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,9 @@ fn simplify_demo() -> Result<()> {
175175
// the ExecutionProps carries information needed to simplify
176176
// expressions, such as the current time (to evaluate `now()`
177177
// correctly)
178-
let props = ExecutionProps::new();
179-
let context = SimplifyContext::new(&props).with_schema(schema);
178+
let context = SimplifyContext::default()
179+
.with_schema(schema)
180+
.with_current_time();
180181
let simplifier = ExprSimplifier::new(context);
181182

182183
// And then call the simplify_expr function:
@@ -191,7 +192,9 @@ fn simplify_demo() -> Result<()> {
191192

192193
// here are some other examples of what DataFusion is capable of
193194
let schema = Schema::new(vec![make_field("i", DataType::Int64)]).to_dfschema_ref()?;
194-
let context = SimplifyContext::new(&props).with_schema(schema.clone());
195+
let context = SimplifyContext::default()
196+
.with_schema(Arc::clone(&schema))
197+
.with_current_time();
195198
let simplifier = ExprSimplifier::new(context);
196199

197200
// basic arithmetic simplification
@@ -551,7 +554,9 @@ fn type_coercion_demo() -> Result<()> {
551554
assert!(physical_expr.evaluate(&batch).is_ok());
552555

553556
// 2. Type coercion with `ExprSimplifier::coerce`.
554-
let context = SimplifyContext::new(&props).with_schema(Arc::new(df_schema.clone()));
557+
let context = SimplifyContext::default()
558+
.with_schema(Arc::new(df_schema.clone()))
559+
.with_current_time();
555560
let simplifier = ExprSimplifier::new(context);
556561
let coerced_expr = simplifier.coerce(expr.clone(), &df_schema)?;
557562
let physical_expr = datafusion::physical_expr::create_physical_expr(

datafusion-examples/examples/udf/advanced_udaf.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use datafusion::logical_expr::{
3434
Accumulator, AggregateUDF, AggregateUDFImpl, EmitTo, GroupsAccumulator, Signature,
3535
expr::AggregateFunction,
3636
function::{AccumulatorArgs, AggregateFunctionSimplification, StateFieldsArgs},
37-
simplify::SimplifyInfo,
37+
simplify::SimplifyContext,
3838
};
3939
use datafusion::prelude::*;
4040

@@ -421,7 +421,7 @@ impl AggregateUDFImpl for SimplifiedGeoMeanUdaf {
421421

422422
/// Optionally replaces a UDAF with another expression during query optimization.
423423
fn simplify(&self) -> Option<AggregateFunctionSimplification> {
424-
let simplify = |aggregate_function: AggregateFunction, _: &dyn SimplifyInfo| {
424+
let simplify = |aggregate_function: AggregateFunction, _: &SimplifyContext| {
425425
// Replaces the UDAF with `GeoMeanUdaf` as a placeholder example to demonstrate the `simplify` method.
426426
// In real-world scenarios, you might create UDFs from built-in expressions.
427427
Ok(Expr::AggregateFunction(AggregateFunction::new_udf(

datafusion-examples/examples/udf/advanced_udwf.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use datafusion::logical_expr::expr::{WindowFunction, WindowFunctionParams};
3232
use datafusion::logical_expr::function::{
3333
PartitionEvaluatorArgs, WindowFunctionSimplification, WindowUDFFieldArgs,
3434
};
35-
use datafusion::logical_expr::simplify::SimplifyInfo;
35+
use datafusion::logical_expr::simplify::SimplifyContext;
3636
use datafusion::logical_expr::{
3737
Expr, LimitEffect, PartitionEvaluator, Signature, WindowFrame,
3838
WindowFunctionDefinition, WindowUDF, WindowUDFImpl,
@@ -198,7 +198,7 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
198198
/// this function will simplify `SimplifySmoothItUdf` to `AggregateUDF` for `Avg`
199199
/// default implementation will not be called (left as `todo!()`)
200200
fn simplify(&self) -> Option<WindowFunctionSimplification> {
201-
let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| {
201+
let simplify = |window_function: WindowFunction, _: &SimplifyContext| {
202202
Ok(Expr::from(WindowFunction {
203203
fun: WindowFunctionDefinition::AggregateUDF(avg_udaf()),
204204
params: WindowFunctionParams {

datafusion-examples/examples/udf/simple_udtf.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use datafusion::common::{ScalarValue, plan_err};
3232
use datafusion::datasource::TableProvider;
3333
use datafusion::datasource::memory::MemorySourceConfig;
3434
use datafusion::error::Result;
35-
use datafusion::execution::context::ExecutionProps;
3635
use datafusion::logical_expr::simplify::SimplifyContext;
3736
use datafusion::logical_expr::{Expr, TableType};
3837
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
@@ -145,8 +144,7 @@ impl TableFunctionImpl for LocalCsvTableFunc {
145144
.get(1)
146145
.map(|expr| {
147146
// try to simplify the expression, so 1+2 becomes 3, for example
148-
let execution_props = ExecutionProps::new();
149-
let info = SimplifyContext::new(&execution_props);
147+
let info = SimplifyContext::default();
150148
let expr = ExprSimplifier::new(info).simplify(expr.clone())?;
151149

152150
if let Expr::Literal(ScalarValue::Int64(Some(limit)), _) = expr {

datafusion/common/src/dfschema.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,6 +1134,12 @@ impl TryFrom<SchemaRef> for DFSchema {
11341134
}
11351135
}
11361136

1137+
impl From<DFSchema> for SchemaRef {
1138+
fn from(dfschema: DFSchema) -> Self {
1139+
Arc::clone(&dfschema.inner)
1140+
}
1141+
}
1142+
11371143
// Hashing refers to a subset of fields considered in PartialEq.
11381144
impl Hash for DFSchema {
11391145
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {

datafusion/core/src/execution/context/mod.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ use datafusion_expr::{
9393
logical_plan::{DdlStatement, Statement},
9494
planner::ExprPlanner,
9595
};
96-
use datafusion_optimizer::Analyzer;
9796
use datafusion_optimizer::analyzer::type_coercion::TypeCoercion;
9897
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
98+
use datafusion_optimizer::{Analyzer, OptimizerContext};
9999
use datafusion_optimizer::{AnalyzerRule, OptimizerRule};
100100
use datafusion_session::SessionStore;
101101

@@ -749,12 +749,19 @@ impl SessionContext {
749749
);
750750
}
751751
}
752-
// Store the unoptimized plan into the session state. Although storing the
753-
// optimized plan or the physical plan would be more efficient, doing so is
754-
// not currently feasible. This is because `now()` would be optimized to a
755-
// constant value, causing each EXECUTE to yield the same result, which is
756-
// incorrect behavior.
757-
self.state.write().store_prepared(name, fields, input)?;
752+
// Optimize the plan without evaluating expressions like now()
753+
let optimizer_context = OptimizerContext::new_with_config_options(
754+
Arc::clone(self.state().config().options()),
755+
)
756+
.without_query_execution_start_time();
757+
let plan = self.state().optimizer().optimize(
758+
Arc::unwrap_or_clone(input),
759+
&optimizer_context,
760+
|_1, _2| {},
761+
)?;
762+
self.state
763+
.write()
764+
.store_prepared(name, fields, Arc::new(plan))?;
758765
self.return_empty_dataframe()
759766
}
760767
LogicalPlan::Statement(Statement::Execute(execute)) => {
@@ -1394,7 +1401,12 @@ impl SessionContext {
13941401
})?;
13951402

13961403
let state = self.state.read();
1397-
let context = SimplifyContext::new(state.execution_props());
1404+
let context = SimplifyContext::default()
1405+
.with_schema(Arc::clone(prepared.plan.schema()))
1406+
.with_config_options(Arc::clone(state.config_options()))
1407+
.with_query_execution_start_time(
1408+
state.execution_props().query_execution_start_time,
1409+
);
13981410
let simplifier = ExprSimplifier::new(context);
13991411

14001412
// Only allow literals as parameters for now.

datafusion/core/src/execution/session_state.rs

Lines changed: 17 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,8 @@ use datafusion_expr::planner::ExprPlanner;
5757
#[cfg(feature = "sql")]
5858
use datafusion_expr::planner::{RelationPlanner, TypePlanner};
5959
use datafusion_expr::registry::{FunctionRegistry, SerializerRegistry};
60-
use datafusion_expr::simplify::SimplifyInfo;
61-
use datafusion_expr::{
62-
AggregateUDF, Explain, Expr, ExprSchemable, LogicalPlan, ScalarUDF, WindowUDF,
63-
};
60+
use datafusion_expr::simplify::SimplifyContext;
61+
use datafusion_expr::{AggregateUDF, Explain, Expr, LogicalPlan, ScalarUDF, WindowUDF};
6462
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
6563
use datafusion_optimizer::{
6664
Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule,
@@ -744,13 +742,18 @@ impl SessionState {
744742
expr: Expr,
745743
df_schema: &DFSchema,
746744
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
747-
let simplifier =
748-
ExprSimplifier::new(SessionSimplifyProvider::new(self, df_schema));
745+
let config_options = self.config_options();
746+
let simplify_context = SimplifyContext::default()
747+
.with_schema(Arc::new(df_schema.clone()))
748+
.with_config_options(Arc::clone(config_options))
749+
.with_query_execution_start_time(
750+
self.execution_props().query_execution_start_time,
751+
);
752+
let simplifier = ExprSimplifier::new(simplify_context);
749753
// apply type coercion here to ensure types match
750754
let mut expr = simplifier.coerce(expr, df_schema)?;
751755

752756
// rewrite Exprs to functions if necessary
753-
let config_options = self.config_options();
754757
for rewrite in self.analyzer.function_rewrites() {
755758
expr = expr
756759
.transform_up(|expr| rewrite.rewrite(expr, df_schema, config_options))?
@@ -1834,9 +1837,12 @@ impl ContextProvider for SessionContextProvider<'_> {
18341837
.get(name)
18351838
.cloned()
18361839
.ok_or_else(|| plan_datafusion_err!("table function '{name}' not found"))?;
1837-
let dummy_schema = DFSchema::empty();
1838-
let simplifier =
1839-
ExprSimplifier::new(SessionSimplifyProvider::new(self.state, &dummy_schema));
1840+
let simplify_context = SimplifyContext::default()
1841+
.with_config_options(Arc::clone(self.state.config_options()))
1842+
.with_query_execution_start_time(
1843+
self.state.execution_props().query_execution_start_time,
1844+
);
1845+
let simplifier = ExprSimplifier::new(simplify_context);
18401846
let args = args
18411847
.into_iter()
18421848
.map(|arg| simplifier.simplify(arg))
@@ -2063,7 +2069,7 @@ impl datafusion_execution::TaskContextProvider for SessionState {
20632069
}
20642070

20652071
impl OptimizerConfig for SessionState {
2066-
fn query_execution_start_time(&self) -> DateTime<Utc> {
2072+
fn query_execution_start_time(&self) -> Option<DateTime<Utc>> {
20672073
self.execution_props.query_execution_start_time
20682074
}
20692075

@@ -2115,35 +2121,6 @@ impl QueryPlanner for DefaultQueryPlanner {
21152121
}
21162122
}
21172123

2118-
struct SessionSimplifyProvider<'a> {
2119-
state: &'a SessionState,
2120-
df_schema: &'a DFSchema,
2121-
}
2122-
2123-
impl<'a> SessionSimplifyProvider<'a> {
2124-
fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self {
2125-
Self { state, df_schema }
2126-
}
2127-
}
2128-
2129-
impl SimplifyInfo for SessionSimplifyProvider<'_> {
2130-
fn is_boolean_type(&self, expr: &Expr) -> datafusion_common::Result<bool> {
2131-
Ok(expr.get_type(self.df_schema)? == DataType::Boolean)
2132-
}
2133-
2134-
fn nullable(&self, expr: &Expr) -> datafusion_common::Result<bool> {
2135-
expr.nullable(self.df_schema)
2136-
}
2137-
2138-
fn execution_props(&self) -> &ExecutionProps {
2139-
self.state.execution_props()
2140-
}
2141-
2142-
fn get_data_type(&self, expr: &Expr) -> datafusion_common::Result<DataType> {
2143-
expr.get_type(self.df_schema)
2144-
}
2145-
}
2146-
21472124
#[derive(Debug)]
21482125
pub(crate) struct PreparedPlan {
21492126
/// Data types of the parameters

datafusion/core/src/test_util/parquet.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,7 @@ impl TestParquetFile {
166166
let df_schema = Arc::clone(&self.schema).to_dfschema_ref()?;
167167

168168
// run coercion on the filters to coerce types etc.
169-
let props = ExecutionProps::new();
170-
let context = SimplifyContext::new(&props).with_schema(Arc::clone(&df_schema));
169+
let context = SimplifyContext::default().with_schema(Arc::clone(&df_schema));
171170
if let Some(filter) = maybe_filter {
172171
let simplifier = ExprSimplifier::new(context);
173172
let filter = simplifier.coerce(filter, &df_schema).unwrap();

datafusion/core/tests/expr_api/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use arrow::util::pretty::{pretty_format_batches, pretty_format_columns};
2424
use datafusion::prelude::*;
2525
use datafusion_common::{DFSchema, ScalarValue};
2626
use datafusion_expr::ExprFunctionExt;
27-
use datafusion_expr::execution_props::ExecutionProps;
2827
use datafusion_expr::expr::NullTreatment;
2928
use datafusion_expr::simplify::SimplifyContext;
3029
use datafusion_functions::core::expr_ext::FieldAccessor;
@@ -422,9 +421,7 @@ fn create_simplified_expr_test(expr: Expr, expected_expr: &str) {
422421
let df_schema = DFSchema::try_from(batch.schema()).unwrap();
423422

424423
// Simplify the expression first
425-
let props = ExecutionProps::new();
426-
let simplify_context =
427-
SimplifyContext::new(&props).with_schema(df_schema.clone().into());
424+
let simplify_context = SimplifyContext::default().with_schema(Arc::new(df_schema));
428425
let simplifier = ExprSimplifier::new(simplify_context).with_max_cycles(10);
429426
let simplified = simplifier.simplify(expr).unwrap();
430427
create_expr_test(simplified, expected_expr);

0 commit comments

Comments
 (0)