Skip to content

Commit db20cad

Browse files
committed
example of stateless execution
1 parent e5e7636 commit db20cad

File tree

29 files changed

+450
-18
lines changed

29 files changed

+450
-18
lines changed

datafusion/execution/src/cache/cache_unit.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ mod tests {
111111
use datafusion_common::Statistics;
112112
use datafusion_common::stats::Precision;
113113
use datafusion_expr::ColumnarValue;
114-
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
114+
use datafusion_physical_expr_common::physical_expr::{
115+
ExprExecutionContext, PhysicalExpr,
116+
};
115117
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
116118
use object_store::ObjectMeta;
117119
use object_store::path::Path;
@@ -224,6 +226,13 @@ mod tests {
224226
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225227
write!(f, "MockExpr")
226228
}
229+
230+
fn execute(
231+
self: Arc<Self>,
232+
_context: &ExprExecutionContext,
233+
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
234+
Ok(self)
235+
}
227236
}
228237

229238
fn ordering() -> LexOrdering {

datafusion/ffi/src/physical_expr/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use datafusion_expr::interval_arithmetic::Interval;
3636
use datafusion_expr::sort_properties::ExprProperties;
3737
use datafusion_expr::statistics::Distribution;
3838
use datafusion_physical_expr::PhysicalExpr;
39-
use datafusion_physical_expr_common::physical_expr::fmt_sql;
39+
use datafusion_physical_expr_common::physical_expr::{ExprExecutionContext, fmt_sql};
4040

4141
use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
4242
use crate::expr::columnar_value::FFI_ColumnarValue;
@@ -705,6 +705,13 @@ impl PhysicalExpr for ForeignPhysicalExpr {
705705
fn is_volatile_node(&self) -> bool {
706706
unsafe { (self.expr.is_volatile_node)(&self.expr) }
707707
}
708+
709+
fn execute(
710+
self: Arc<Self>,
711+
_context: &ExprExecutionContext,
712+
) -> Result<Arc<dyn PhysicalExpr>> {
713+
Ok(self)
714+
}
708715
}
709716

710717
impl Eq for ForeignPhysicalExpr {}

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,21 @@ use itertools::izip;
4343
/// Shared [`PhysicalExpr`].
4444
pub type PhysicalExprRef = Arc<dyn PhysicalExpr>;
4545

46+
/// Describes execution context for the particular expression.
47+
pub struct ExprExecutionContext {
48+
/// External parameters.
49+
pub external_params: Arc<[ScalarValue]>,
50+
}
51+
52+
impl ExprExecutionContext {
53+
/// Make a new [`ExprExecutionContext`].
54+
pub fn new(external_param: impl Into<Arc<[ScalarValue]>>) -> Self {
55+
Self {
56+
external_params: external_param.into(),
57+
}
58+
}
59+
}
60+
4661
/// [`PhysicalExpr`]s represent expressions such as `A + 1` or `CAST(c1 AS int)`.
4762
///
4863
/// `PhysicalExpr` knows its type, nullability and can be evaluated directly on
@@ -430,6 +445,15 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
430445
fn is_volatile_node(&self) -> bool {
431446
false
432447
}
448+
449+
/// Make this expression executable. The most expressions are executable and do not
450+
/// require an additional work so this method could return `self`. However, there are
451+
/// expressions that should be transformed prior to execution, e.g. placeholder that
452+
/// should be resolved into scalar.
453+
fn execute(
454+
self: Arc<Self>,
455+
context: &ExprExecutionContext,
456+
) -> Result<Arc<dyn PhysicalExpr>>;
433457
}
434458

435459
#[deprecated(
@@ -662,7 +686,7 @@ pub fn is_volatile(expr: &Arc<dyn PhysicalExpr>) -> bool {
662686

663687
#[cfg(test)]
664688
mod test {
665-
use crate::physical_expr::PhysicalExpr;
689+
use crate::physical_expr::{ExprExecutionContext, PhysicalExpr};
666690
use arrow::array::{Array, BooleanArray, Int64Array, RecordBatch};
667691
use arrow::datatypes::{DataType, Schema};
668692
use datafusion_expr_common::columnar_value::ColumnarValue;
@@ -707,6 +731,13 @@ mod test {
707731
fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
708732
f.write_str("TestExpr")
709733
}
734+
735+
fn execute(
736+
self: Arc<Self>,
737+
_context: &ExprExecutionContext,
738+
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
739+
Ok(self)
740+
}
710741
}
711742

712743
impl Display for TestExpr {

datafusion/physical-expr/src/async_scalar_function.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ use datafusion_common::{internal_err, not_impl_err};
2525
use datafusion_expr::ScalarFunctionArgs;
2626
use datafusion_expr::async_udf::AsyncScalarUDF;
2727
use datafusion_expr_common::columnar_value::ColumnarValue;
28-
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
28+
use datafusion_physical_expr_common::physical_expr::{
29+
ExprExecutionContext, PhysicalExpr,
30+
};
2931
use std::any::Any;
3032
use std::fmt::Display;
3133
use std::hash::{Hash, Hasher};
@@ -247,4 +249,11 @@ impl PhysicalExpr for AsyncFuncExpr {
247249
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248250
write!(f, "{}", self.func)
249251
}
252+
253+
fn execute(
254+
self: Arc<Self>,
255+
_context: &ExprExecutionContext,
256+
) -> Result<Arc<dyn PhysicalExpr>> {
257+
Ok(self)
258+
}
250259
}

datafusion/physical-expr/src/expressions/binary.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use datafusion_expr::statistics::{
4242
use datafusion_expr::{ColumnarValue, Operator};
4343
use datafusion_physical_expr_common::datum::{apply, apply_cmp};
4444

45+
use datafusion_physical_expr_common::physical_expr::ExprExecutionContext;
4546
use kernels::{
4647
bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar,
4748
bitwise_shift_left_dyn, bitwise_shift_left_dyn_scalar, bitwise_shift_right_dyn,
@@ -609,6 +610,13 @@ impl PhysicalExpr for BinaryExpr {
609610
write!(f, " {} ", self.op)?;
610611
write_child(f, self.right.as_ref(), precedence)
611612
}
613+
614+
fn execute(
615+
self: Arc<Self>,
616+
_context: &ExprExecutionContext,
617+
) -> Result<Arc<dyn PhysicalExpr>> {
618+
Ok(self)
619+
}
612620
}
613621

614622
/// Casts dictionary array to result type for binary numerical operators. Such operators

datafusion/physical-expr/src/expressions/case.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use datafusion_common::{
3333
internal_datafusion_err, internal_err,
3434
};
3535
use datafusion_expr::ColumnarValue;
36+
use datafusion_physical_expr_common::physical_expr::ExprExecutionContext;
3637
use indexmap::{IndexMap, IndexSet};
3738
use std::borrow::Cow;
3839
use std::hash::Hash;
@@ -1342,6 +1343,13 @@ impl PhysicalExpr for CaseExpr {
13421343
}
13431344
write!(f, "END")
13441345
}
1346+
1347+
fn execute(
1348+
self: Arc<Self>,
1349+
_context: &ExprExecutionContext,
1350+
) -> Result<Arc<dyn PhysicalExpr>> {
1351+
Ok(self)
1352+
}
13451353
}
13461354

13471355
/// Attempts to const evaluate the given `predicate`.

datafusion/physical-expr/src/expressions/cast.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use datafusion_common::{Result, not_impl_err};
3131
use datafusion_expr_common::columnar_value::ColumnarValue;
3232
use datafusion_expr_common::interval_arithmetic::Interval;
3333
use datafusion_expr_common::sort_properties::ExprProperties;
34+
use datafusion_physical_expr_common::physical_expr::ExprExecutionContext;
3435

3536
const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions {
3637
safe: false,
@@ -237,6 +238,13 @@ impl PhysicalExpr for CastExpr {
237238

238239
write!(f, ")")
239240
}
241+
242+
fn execute(
243+
self: Arc<Self>,
244+
_context: &ExprExecutionContext,
245+
) -> Result<Arc<dyn PhysicalExpr>> {
246+
Ok(self)
247+
}
240248
}
241249

242250
/// Return a PhysicalExpression representing `expr` casted to

datafusion/physical-expr/src/expressions/cast_column.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use datafusion_common::{
2727
Result, ScalarValue, format::DEFAULT_CAST_OPTIONS, nested_struct::cast_column,
2828
};
2929
use datafusion_expr_common::columnar_value::ColumnarValue;
30+
use datafusion_physical_expr_common::physical_expr::ExprExecutionContext;
3031
use std::{
3132
any::Any,
3233
fmt::{self, Display},
@@ -180,6 +181,13 @@ impl PhysicalExpr for CastColumnExpr {
180181
fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181182
Display::fmt(self, f)
182183
}
184+
185+
fn execute(
186+
self: Arc<Self>,
187+
_context: &ExprExecutionContext,
188+
) -> Result<Arc<dyn PhysicalExpr>> {
189+
Ok(self)
190+
}
183191
}
184192

185193
#[cfg(test)]

datafusion/physical-expr/src/expressions/column.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use arrow::{
3030
use datafusion_common::tree_node::{Transformed, TreeNode};
3131
use datafusion_common::{Result, internal_err, plan_err};
3232
use datafusion_expr::ColumnarValue;
33+
use datafusion_physical_expr_common::physical_expr::ExprExecutionContext;
3334

3435
/// Represents the column at a given index in a RecordBatch
3536
///
@@ -146,6 +147,13 @@ impl PhysicalExpr for Column {
146147
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147148
write!(f, "{}", self.name)
148149
}
150+
151+
fn execute(
152+
self: Arc<Self>,
153+
_context: &ExprExecutionContext,
154+
) -> Result<Arc<dyn PhysicalExpr>> {
155+
Ok(self)
156+
}
149157
}
150158

151159
impl Column {

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use datafusion_common::{
2626
tree_node::{Transformed, TransformedResult, TreeNode},
2727
};
2828
use datafusion_expr::ColumnarValue;
29-
use datafusion_physical_expr_common::physical_expr::DynHash;
29+
use datafusion_physical_expr_common::physical_expr::{DynHash, ExprExecutionContext};
3030

3131
/// State of a dynamic filter, tracking both updates and completion.
3232
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -445,6 +445,13 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
445445
// Return the current generation of the expression.
446446
self.inner.read().generation
447447
}
448+
449+
fn execute(
450+
self: Arc<Self>,
451+
_context: &ExprExecutionContext,
452+
) -> Result<Arc<dyn PhysicalExpr>> {
453+
Ok(self)
454+
}
448455
}
449456

450457
#[cfg(test)]

0 commit comments

Comments
 (0)