Skip to content

Commit d6ec95f

Browse files
committed
Allow logical optimizer to be run without evaluating now()
1 parent 3aa0ab7 commit d6ec95f

File tree

10 files changed

+221
-81
lines changed

10 files changed

+221
-81
lines changed

datafusion/core/src/execution/session_state.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2063,7 +2063,7 @@ impl datafusion_execution::TaskContextProvider for SessionState {
20632063
}
20642064

20652065
impl OptimizerConfig for SessionState {
2066-
fn query_execution_start_time(&self) -> DateTime<Utc> {
2066+
fn query_execution_start_time(&self) -> Option<DateTime<Utc>> {
20672067
self.execution_props.query_execution_start_time
20682068
}
20692069

@@ -2135,13 +2135,17 @@ impl SimplifyInfo for SessionSimplifyProvider<'_> {
21352135
expr.nullable(self.df_schema)
21362136
}
21372137

2138-
fn execution_props(&self) -> &ExecutionProps {
2139-
self.state.execution_props()
2140-
}
2141-
21422138
fn get_data_type(&self, expr: &Expr) -> datafusion_common::Result<DataType> {
21432139
expr.get_type(self.df_schema)
21442140
}
2141+
2142+
fn query_execution_start_time(&self) -> Option<DateTime<Utc>> {
2143+
self.state.execution_props().query_execution_start_time
2144+
}
2145+
2146+
fn config_options(&self) -> Option<&Arc<ConfigOptions>> {
2147+
self.state.execution_props().config_options.as_ref()
2148+
}
21452149
}
21462150

21472151
#[derive(Debug)]

datafusion/core/tests/expr_api/simplification.rs

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use chrono::{DateTime, TimeZone, Utc};
2626
use datafusion::{error::Result, execution::context::ExecutionProps, prelude::*};
2727
use datafusion_common::ScalarValue;
2828
use datafusion_common::cast::as_int32_array;
29+
use datafusion_common::config::ConfigOptions;
2930
use datafusion_common::{DFSchemaRef, ToDFSchema};
3031
use datafusion_expr::expr::ScalarFunction;
3132
use datafusion_expr::logical_plan::builder::table_scan_with_filters;
@@ -66,13 +67,17 @@ impl SimplifyInfo for MyInfo {
6667
expr.nullable(self.schema.as_ref())
6768
}
6869

69-
fn execution_props(&self) -> &ExecutionProps {
70-
&self.execution_props
71-
}
72-
7370
fn get_data_type(&self, expr: &Expr) -> Result<DataType> {
7471
expr.get_type(self.schema.as_ref())
7572
}
73+
74+
fn query_execution_start_time(&self) -> Option<DateTime<Utc>> {
75+
self.execution_props.query_execution_start_time
76+
}
77+
78+
fn config_options(&self) -> Option<&Arc<ConfigOptions>> {
79+
self.execution_props.config_options.as_ref()
80+
}
7681
}
7782

7883
impl From<DFSchemaRef> for MyInfo {
@@ -523,6 +528,51 @@ fn multiple_now() -> Result<()> {
523528
Ok(())
524529
}
525530

531+
/// Test that `now()` is simplified to a literal when execution start time is set,
532+
/// but remains as an expression when no execution start time is available.
533+
#[test]
534+
fn now_simplification_with_and_without_start_time() {
535+
let schema = expr_test_schema();
536+
537+
// Case 1: With execution start time set, now() should be simplified to a literal
538+
{
539+
let start_time = Utc::now();
540+
let info = MyInfo {
541+
schema: Arc::clone(&schema),
542+
execution_props: ExecutionProps::new()
543+
.with_query_execution_start_time(start_time),
544+
};
545+
let simplifier = ExprSimplifier::new(info);
546+
let simplified = simplifier.simplify(now()).expect("simplify should succeed");
547+
548+
// Should be a literal timestamp
549+
match simplified {
550+
Expr::Literal(ScalarValue::TimestampNanosecond(Some(ts), _), _) => {
551+
assert_eq!(ts, start_time.timestamp_nanos_opt().unwrap());
552+
}
553+
other => panic!("Expected timestamp literal, got: {other:?}"),
554+
}
555+
}
556+
557+
// Case 2: Without execution start time, now() should remain as a function call
558+
{
559+
let info = MyInfo {
560+
schema: Arc::clone(&schema),
561+
execution_props: ExecutionProps::new(), // No start time set
562+
};
563+
let simplifier = ExprSimplifier::new(info);
564+
let simplified = simplifier.simplify(now()).expect("simplify should succeed");
565+
566+
// Should remain as a scalar function (not simplified)
567+
match simplified {
568+
Expr::ScalarFunction(func) => {
569+
assert_eq!(func.name(), "now");
570+
}
571+
other => panic!("Expected now() to remain unsimplified, got: {other:?}"),
572+
}
573+
}
574+
}
575+
526576
// ------------------------------
527577
// --- Simplifier tests -----
528578
// ------------------------------
@@ -566,7 +616,8 @@ fn test_simplify_with_cycle_count(
566616
) {
567617
let info: MyInfo = MyInfo {
568618
schema: expr_test_schema(),
569-
execution_props: ExecutionProps::new(),
619+
execution_props: ExecutionProps::new()
620+
.with_query_execution_start_time(Utc::now()),
570621
};
571622
let simplifier = ExprSimplifier::new(info);
572623
let (simplified_expr, count) = simplifier

datafusion/expr/src/execution_props.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use crate::var_provider::{VarProvider, VarType};
19-
use chrono::{DateTime, TimeZone, Utc};
19+
use chrono::{DateTime, Utc};
2020
use datafusion_common::HashMap;
2121
use datafusion_common::alias::AliasGenerator;
2222
use datafusion_common::config::ConfigOptions;
@@ -33,7 +33,9 @@ use std::sync::Arc;
3333
/// done so during predicate pruning and expression simplification
3434
#[derive(Clone, Debug)]
3535
pub struct ExecutionProps {
36-
pub query_execution_start_time: DateTime<Utc>,
36+
/// The time at which the query execution started. If `None`,
37+
/// functions like `now()` will not be simplified during optimization.
38+
pub query_execution_start_time: Option<DateTime<Utc>>,
3739
/// Alias generator used by subquery optimizer rules
3840
pub alias_generator: Arc<AliasGenerator>,
3941
/// Snapshot of config options when the query started
@@ -52,9 +54,7 @@ impl ExecutionProps {
5254
/// Creates a new execution props
5355
pub fn new() -> Self {
5456
ExecutionProps {
55-
// Set this to a fixed sentinel to make it obvious if this is
56-
// not being updated / propagated correctly
57-
query_execution_start_time: Utc.timestamp_nanos(0),
57+
query_execution_start_time: None,
5858
alias_generator: Arc::new(AliasGenerator::new()),
5959
config_options: None,
6060
var_providers: None,
@@ -66,7 +66,7 @@ impl ExecutionProps {
6666
mut self,
6767
query_execution_start_time: DateTime<Utc>,
6868
) -> Self {
69-
self.query_execution_start_time = query_execution_start_time;
69+
self.query_execution_start_time = Some(query_execution_start_time);
7070
self
7171
}
7272

@@ -79,7 +79,7 @@ impl ExecutionProps {
7979
/// Marks the execution of query started timestamp.
8080
/// This also instantiates a new alias generator.
8181
pub fn mark_start_execution(&mut self, config_options: Arc<ConfigOptions>) -> &Self {
82-
self.query_execution_start_time = Utc::now();
82+
self.query_execution_start_time = Some(Utc::now());
8383
self.alias_generator = Arc::new(AliasGenerator::new());
8484
self.config_options = Some(config_options);
8585
&*self
@@ -126,7 +126,7 @@ mod test {
126126
fn debug() {
127127
let props = ExecutionProps::new();
128128
assert_eq!(
129-
"ExecutionProps { query_execution_start_time: 1970-01-01T00:00:00Z, alias_generator: AliasGenerator { next_id: 1 }, config_options: None, var_providers: None }",
129+
"ExecutionProps { query_execution_start_time: None, alias_generator: AliasGenerator { next_id: 1 }, config_options: None, var_providers: None }",
130130
format!("{props:?}")
131131
);
132132
}

datafusion/expr/src/simplify.rs

Lines changed: 64 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717

1818
//! Structs and traits to provide the information needed for expression simplification.
1919
20+
use std::sync::Arc;
21+
2022
use arrow::datatypes::DataType;
23+
use chrono::{DateTime, Utc};
24+
use datafusion_common::config::ConfigOptions;
2125
use datafusion_common::{DFSchemaRef, Result, internal_datafusion_err};
2226

2327
use crate::{Expr, ExprSchemable, execution_props::ExecutionProps};
@@ -35,43 +39,85 @@ pub trait SimplifyInfo {
3539
/// Returns true of this expr is nullable (could possibly be NULL)
3640
fn nullable(&self, expr: &Expr) -> Result<bool>;
3741

38-
/// Returns details needed for partial expression evaluation
39-
fn execution_props(&self) -> &ExecutionProps;
40-
4142
/// Returns data type of this expr needed for determining optimized int type of a value
4243
fn get_data_type(&self, expr: &Expr) -> Result<DataType>;
44+
45+
/// Returns the time at which the query execution started.
46+
/// If `None`, time-dependent functions like `now()` will not be simplified.
47+
fn query_execution_start_time(&self) -> Option<DateTime<Utc>>;
48+
49+
/// Returns the configuration options for the session.
50+
fn config_options(&self) -> Option<&Arc<ConfigOptions>>;
4351
}
4452

45-
/// Provides simplification information based on DFSchema and
46-
/// [`ExecutionProps`]. This is the default implementation used by DataFusion
53+
/// Provides simplification information based on schema, query execution time,
54+
/// and configuration options.
4755
///
4856
/// # Example
4957
/// See the `simplify_demo` in the [`expr_api` example]
5058
///
5159
/// [`expr_api` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/query_planning/expr_api.rs
5260
#[derive(Debug, Clone)]
53-
pub struct SimplifyContext<'a> {
61+
pub struct SimplifyContext {
5462
schema: Option<DFSchemaRef>,
55-
props: &'a ExecutionProps,
63+
query_execution_start_time: Option<DateTime<Utc>>,
64+
config_options: Option<Arc<ConfigOptions>>,
5665
}
5766

58-
impl<'a> SimplifyContext<'a> {
59-
/// Create a new SimplifyContext
60-
pub fn new(props: &'a ExecutionProps) -> Self {
67+
impl SimplifyContext {
68+
/// Create a new SimplifyContext from [`ExecutionProps`].
69+
///
70+
/// This constructor extracts `query_execution_start_time` and `config_options`
71+
/// from the provided `ExecutionProps`.
72+
pub fn new(props: &ExecutionProps) -> Self {
73+
Self {
74+
schema: None,
75+
query_execution_start_time: props.query_execution_start_time,
76+
config_options: props.config_options.clone(),
77+
}
78+
}
79+
80+
/// Create a new empty SimplifyContext.
81+
///
82+
/// This is useful when you don't need time-dependent simplification
83+
/// (like `now()`) or config-based simplification.
84+
pub fn new_empty() -> Self {
6185
Self {
6286
schema: None,
63-
props,
87+
query_execution_start_time: None,
88+
config_options: None,
6489
}
6590
}
6691

92+
/// Set the query execution start time
93+
pub fn with_query_execution_start_time(
94+
mut self,
95+
query_execution_start_time: Option<DateTime<Utc>>,
96+
) -> Self {
97+
self.query_execution_start_time = query_execution_start_time;
98+
self
99+
}
100+
101+
/// Set the configuration options
102+
pub fn with_config_options(mut self, config_options: Arc<ConfigOptions>) -> Self {
103+
self.config_options = Some(config_options);
104+
self
105+
}
106+
67107
/// Register a [`DFSchemaRef`] with this context
68108
pub fn with_schema(mut self, schema: DFSchemaRef) -> Self {
69109
self.schema = Some(schema);
70110
self
71111
}
72112
}
73113

74-
impl SimplifyInfo for SimplifyContext<'_> {
114+
impl Default for SimplifyContext {
115+
fn default() -> Self {
116+
Self::new_empty()
117+
}
118+
}
119+
120+
impl SimplifyInfo for SimplifyContext {
75121
/// Returns true if this Expr has boolean type
76122
fn is_boolean_type(&self, expr: &Expr) -> Result<bool> {
77123
if let Some(schema) = &self.schema
@@ -99,8 +145,12 @@ impl SimplifyInfo for SimplifyContext<'_> {
99145
expr.get_type(schema)
100146
}
101147

102-
fn execution_props(&self) -> &ExecutionProps {
103-
self.props
148+
fn query_execution_start_time(&self) -> Option<DateTime<Utc>> {
149+
self.query_execution_start_time
150+
}
151+
152+
fn config_options(&self) -> Option<&Arc<ConfigOptions>> {
153+
self.config_options.as_ref()
104154
}
105155
}
106156

datafusion/functions/src/datetime/current_date.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,15 @@ impl ScalarUDFImpl for CurrentDateFunc {
9999

100100
fn simplify(
101101
&self,
102-
_args: Vec<Expr>,
102+
args: Vec<Expr>,
103103
info: &dyn SimplifyInfo,
104104
) -> Result<ExprSimplifyResult> {
105-
let now_ts = info.execution_props().query_execution_start_time;
105+
let Some(now_ts) = info.query_execution_start_time() else {
106+
return Ok(ExprSimplifyResult::Original(args));
107+
};
106108

107109
// Get timezone from config and convert to local time
108110
let days = info
109-
.execution_props()
110111
.config_options()
111112
.and_then(|config| {
112113
config

datafusion/functions/src/datetime/current_time.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,15 @@ impl ScalarUDFImpl for CurrentTimeFunc {
9595

9696
fn simplify(
9797
&self,
98-
_args: Vec<Expr>,
98+
args: Vec<Expr>,
9999
info: &dyn SimplifyInfo,
100100
) -> Result<ExprSimplifyResult> {
101-
let now_ts = info.execution_props().query_execution_start_time;
101+
let Some(now_ts) = info.query_execution_start_time() else {
102+
return Ok(ExprSimplifyResult::Original(args));
103+
};
102104

103105
// Try to get timezone from config and convert to local time
104106
let nano = info
105-
.execution_props()
106107
.config_options()
107108
.and_then(|config| {
108109
config
@@ -145,6 +146,7 @@ mod tests {
145146
use super::*;
146147
use arrow::datatypes::{DataType, TimeUnit::Nanosecond};
147148
use chrono::{DateTime, Utc};
149+
use datafusion_common::config::ConfigOptions;
148150
use datafusion_common::{Result, ScalarValue};
149151
use datafusion_expr::execution_props::ExecutionProps;
150152
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
@@ -163,17 +165,21 @@ mod tests {
163165
Ok(true)
164166
}
165167

166-
fn execution_props(&self) -> &ExecutionProps {
167-
&self.execution_props
168-
}
169-
170168
fn get_data_type(&self, _expr: &Expr) -> Result<DataType> {
171169
Ok(Time64(Nanosecond))
172170
}
171+
172+
fn query_execution_start_time(&self) -> Option<DateTime<Utc>> {
173+
self.execution_props.query_execution_start_time
174+
}
175+
176+
fn config_options(&self) -> Option<&Arc<ConfigOptions>> {
177+
self.execution_props.config_options.as_ref()
178+
}
173179
}
174180

175181
fn set_session_timezone_env(tz: &str, start_time: DateTime<Utc>) -> MockSimplifyInfo {
176-
let mut config = datafusion_common::config::ConfigOptions::default();
182+
let mut config = ConfigOptions::default();
177183
config.execution.time_zone = if tz.is_empty() {
178184
None
179185
} else {

0 commit comments

Comments
 (0)