Skip to content

Commit 2ecbd48

Browse files
peter-tothblaginin
authored andcommitted
Add stacker and recursive (apache#13310)
* Add stacker and recursive * add test * fine tune set_expr_to_plan min stack size * format tomls * add recursive annotation to more custom recursive rules * fix comments * fix comment about min stack requirement --------- Co-authored-by: blaginin <[email protected]>
1 parent 3e4aaab commit 2ecbd48

File tree

18 files changed

+109
-13
lines changed

18 files changed

+109
-13
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ pbjson = { version = "0.7.0" }
141141
prost = "0.13.1"
142142
prost-derive = "0.13.1"
143143
rand = "0.8"
144+
recursive = "0.1.1"
144145
regex = "1.8"
145146
rstest = "0.23.0"
146147
serde_json = "1"

datafusion-cli/Cargo.lock

Lines changed: 47 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ object_store = { workspace = true, optional = true }
6363
parquet = { workspace = true, optional = true, default-features = true }
6464
paste = "1.0.15"
6565
pyo3 = { version = "0.22.0", optional = true }
66+
recursive = { workspace = true }
6667
sqlparser = { workspace = true }
6768
tokio = { workspace = true }
6869

datafusion/common/src/tree_node.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! [`TreeNode`] for visiting and rewriting expression and plan trees
1919
20+
use recursive::recursive;
2021
use std::sync::Arc;
2122

2223
use crate::Result;
@@ -123,6 +124,7 @@ pub trait TreeNode: Sized {
123124
/// TreeNodeVisitor::f_up(ChildNode2)
124125
/// TreeNodeVisitor::f_up(ParentNode)
125126
/// ```
127+
#[recursive]
126128
fn visit<'n, V: TreeNodeVisitor<'n, Node = Self>>(
127129
&'n self,
128130
visitor: &mut V,
@@ -172,6 +174,7 @@ pub trait TreeNode: Sized {
172174
/// TreeNodeRewriter::f_up(ChildNode2)
173175
/// TreeNodeRewriter::f_up(ParentNode)
174176
/// ```
177+
#[recursive]
175178
fn rewrite<R: TreeNodeRewriter<Node = Self>>(
176179
self,
177180
rewriter: &mut R,
@@ -194,6 +197,7 @@ pub trait TreeNode: Sized {
194197
&'n self,
195198
mut f: F,
196199
) -> Result<TreeNodeRecursion> {
200+
#[recursive]
197201
fn apply_impl<'n, N: TreeNode, F: FnMut(&'n N) -> Result<TreeNodeRecursion>>(
198202
node: &'n N,
199203
f: &mut F,
@@ -228,6 +232,7 @@ pub trait TreeNode: Sized {
228232
self,
229233
mut f: F,
230234
) -> Result<Transformed<Self>> {
235+
#[recursive]
231236
fn transform_down_impl<N: TreeNode, F: FnMut(N) -> Result<Transformed<N>>>(
232237
node: N,
233238
f: &mut F,
@@ -251,6 +256,7 @@ pub trait TreeNode: Sized {
251256
self,
252257
mut f: F,
253258
) -> Result<Transformed<Self>> {
259+
#[recursive]
254260
fn transform_up_impl<N: TreeNode, F: FnMut(N) -> Result<Transformed<N>>>(
255261
node: N,
256262
f: &mut F,
@@ -365,6 +371,7 @@ pub trait TreeNode: Sized {
365371
mut f_down: FD,
366372
mut f_up: FU,
367373
) -> Result<Transformed<Self>> {
374+
#[recursive]
368375
fn transform_down_up_impl<
369376
N: TreeNode,
370377
FD: FnMut(N) -> Result<Transformed<N>>,
@@ -2079,4 +2086,17 @@ pub(crate) mod tests {
20792086

20802087
Ok(())
20812088
}
2089+
2090+
#[test]
2091+
fn test_large_tree() {
2092+
let mut item = TestTreeNode::new_leaf("initial".to_string());
2093+
for i in 0..3000 {
2094+
item = TestTreeNode::new(vec![item], format!("parent-{}", i));
2095+
}
2096+
2097+
let mut visitor =
2098+
TestVisitor::new(Box::new(visit_continue), Box::new(visit_continue));
2099+
2100+
item.visit(&mut visitor).unwrap();
2101+
}
20822102
}

datafusion/expr/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ datafusion-functions-window-common = { workspace = true }
5050
datafusion-physical-expr-common = { workspace = true }
5151
indexmap = { workspace = true }
5252
paste = "^1.0"
53+
recursive = { workspace = true }
5354
serde_json = { workspace = true }
5455
sqlparser = { workspace = true }
5556
strum = { version = "0.26.1", features = ["derive"] }

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use crate::{
4242
LogicalPlan, Partitioning, Projection, RecursiveQuery, Repartition, Sort, Subquery,
4343
SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, Values, Window,
4444
};
45+
use recursive::recursive;
4546
use std::ops::Deref;
4647
use std::sync::Arc;
4748

@@ -745,6 +746,7 @@ impl LogicalPlan {
745746

746747
/// Visits a plan similarly to [`Self::visit`], including subqueries that
747748
/// may appear in expressions such as `IN (SELECT ...)`.
749+
#[recursive]
748750
pub fn visit_with_subqueries<V: for<'n> TreeNodeVisitor<'n, Node = Self>>(
749751
&self,
750752
visitor: &mut V,
@@ -761,6 +763,7 @@ impl LogicalPlan {
761763
/// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`,
762764
/// including subqueries that may appear in expressions such as `IN (SELECT
763765
/// ...)`.
766+
#[recursive]
764767
pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
765768
self,
766769
rewriter: &mut R,
@@ -779,6 +782,7 @@ impl LogicalPlan {
779782
&self,
780783
mut f: F,
781784
) -> Result<TreeNodeRecursion> {
785+
#[recursive]
782786
fn apply_with_subqueries_impl<
783787
F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
784788
>(
@@ -814,6 +818,7 @@ impl LogicalPlan {
814818
self,
815819
mut f: F,
816820
) -> Result<Transformed<Self>> {
821+
#[recursive]
817822
fn transform_down_with_subqueries_impl<
818823
F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
819824
>(
@@ -839,6 +844,7 @@ impl LogicalPlan {
839844
self,
840845
mut f: F,
841846
) -> Result<Transformed<Self>> {
847+
#[recursive]
842848
fn transform_up_with_subqueries_impl<
843849
F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
844850
>(
@@ -866,6 +872,7 @@ impl LogicalPlan {
866872
mut f_down: FD,
867873
mut f_up: FU,
868874
) -> Result<Transformed<Self>> {
875+
#[recursive]
869876
fn transform_down_up_with_subqueries_impl<
870877
FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
871878
FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,

datafusion/optimizer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ indexmap = { workspace = true }
4747
itertools = { workspace = true }
4848
log = { workspace = true }
4949
paste = "1.0.14"
50+
recursive = { workspace = true }
5051
regex = { workspace = true }
5152
regex-syntax = "0.8.0"
5253

datafusion/optimizer/src/analyzer/subquery.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use crate::analyzer::check_plan;
1919
use crate::utils::collect_subquery_cols;
20+
use recursive::recursive;
2021

2122
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
2223
use datafusion_common::{plan_err, Result};
@@ -128,6 +129,7 @@ fn check_correlations_in_subquery(inner_plan: &LogicalPlan) -> Result<()> {
128129
}
129130

130131
// Recursively check the unsupported outer references in the sub query plan.
132+
#[recursive]
131133
fn check_inner_plan(inner_plan: &LogicalPlan, can_contain_outer_ref: bool) -> Result<()> {
132134
if !can_contain_outer_ref && inner_plan.contains_outer_reference() {
133135
return plan_err!("Accessing outer reference columns is not allowed in the plan");

datafusion/optimizer/src/common_subexpr_eliminate.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::fmt::Debug;
2222
use std::sync::Arc;
2323

2424
use crate::{OptimizerConfig, OptimizerRule};
25+
use recursive::recursive;
2526

2627
use crate::optimizer::ApplyOrder;
2728
use crate::utils::NamePreserver;
@@ -531,6 +532,7 @@ impl OptimizerRule for CommonSubexprEliminate {
531532
None
532533
}
533534

535+
#[recursive]
534536
fn rewrite(
535537
&self,
536538
plan: LogicalPlan,

datafusion/optimizer/src/eliminate_cross_join.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
// under the License.
1717

1818
//! [`EliminateCrossJoin`] converts `CROSS JOIN` to `INNER JOIN` if join predicates are available.
19-
use std::sync::Arc;
20-
2119
use crate::{OptimizerConfig, OptimizerRule};
20+
use recursive::recursive;
21+
use std::sync::Arc;
2222

2323
use crate::join_key_set::JoinKeySet;
2424
use datafusion_common::tree_node::{Transformed, TreeNode};
@@ -80,6 +80,7 @@ impl OptimizerRule for EliminateCrossJoin {
8080
true
8181
}
8282

83+
#[recursive]
8384
fn rewrite(
8485
&self,
8586
plan: LogicalPlan,

0 commit comments

Comments
 (0)