Skip to content

Commit ea2e22c

Browse files
authored
Implement nested recursive CTEs (#18956)
It gives a name (the table name) to each `WorkTable`. This way `WorkTableExec` can recognize its own `WorkTable` Note that it doesn't allow multiple occurrences of the same CTE name: it's not possible to implement things like "join with itself" correctly with only the work table. ## Which issue does this PR close? - Closes #18955. ## Rationale for this change Support nested recursive CTEs without co-recursion. This is useful to e.g. implement SPARQL or other graph query languages. ## What changes are included in this PR? ## Are these changes tested? Yes! There is a nested recursive query in the test file ## Are there any user-facing changes? Nested recursive queries are now allowed instead of failing with a "not implemented" error
1 parent 03904e1 commit ea2e22c

File tree

3 files changed

+70
-17
lines changed

3 files changed

+70
-17
lines changed

datafusion/physical-plan/src/recursive_query.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl RecursiveQueryExec {
8686
is_distinct: bool,
8787
) -> Result<Self> {
8888
// Each recursive query needs its own work table
89-
let work_table = Arc::new(WorkTable::new());
89+
let work_table = Arc::new(WorkTable::new(name.clone()));
9090
// Use the same work table for both the WorkTableExec and the recursive term
9191
let recursive_term = assign_work_table(recursive_term, &work_table)?;
9292
let cache = Self::compute_properties(static_term.schema());
@@ -380,8 +380,6 @@ fn assign_work_table(
380380
work_table_refs += 1;
381381
Ok(Transformed::yes(new_plan))
382382
}
383-
} else if plan.as_any().is::<RecursiveQueryExec>() {
384-
not_impl_err!("Recursive queries cannot be nested")
385383
} else {
386384
Ok(Transformed::no(plan))
387385
}

datafusion/physical-plan/src/work_table.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,15 @@ impl ReservedBatches {
5858
#[derive(Debug)]
5959
pub struct WorkTable {
6060
batches: Mutex<Option<ReservedBatches>>,
61+
name: String,
6162
}
6263

6364
impl WorkTable {
6465
/// Create a new work table.
65-
pub(super) fn new() -> Self {
66+
pub(super) fn new(name: String) -> Self {
6667
Self {
6768
batches: Mutex::new(None),
69+
name,
6870
}
6971
}
7072

@@ -113,10 +115,10 @@ impl WorkTableExec {
113115
pub fn new(name: String, schema: SchemaRef) -> Self {
114116
let cache = Self::compute_properties(Arc::clone(&schema));
115117
Self {
116-
name,
118+
name: name.clone(),
117119
schema,
118120
metrics: ExecutionPlanMetricsSet::new(),
119-
work_table: Arc::new(WorkTable::new()),
121+
work_table: Arc::new(WorkTable::new(name)),
120122
cache,
121123
}
122124
}
@@ -230,6 +232,10 @@ impl ExecutionPlan for WorkTableExec {
230232
// Down-cast to the expected state type; propagate `None` on failure
231233
let work_table = state.downcast::<WorkTable>().ok()?;
232234

235+
if work_table.name != self.name {
236+
return None; // Different table
237+
}
238+
233239
Some(Arc::new(Self {
234240
name: self.name.clone(),
235241
schema: Arc::clone(&self.schema),
@@ -248,7 +254,7 @@ mod tests {
248254

249255
#[test]
250256
fn test_work_table() {
251-
let work_table = WorkTable::new();
257+
let work_table = WorkTable::new("test".into());
252258
// Can't take from empty work_table
253259
assert!(work_table.take().is_err());
254260

datafusion/sqllogictest/test_files/cte.slt

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,25 @@ SELECT * FROM nodes
125125
3
126126
4
127127

128+
# deduplicating recursive CTE with two variables works
129+
query II
130+
WITH RECURSIVE ranges AS (
131+
SELECT min, max from (VALUES (1, 1), (2, 2)) ranges(min, max)
132+
UNION
133+
SELECT min, max + 1 as max
134+
FROM ranges
135+
WHERE max < 4
136+
)
137+
SELECT * FROM ranges
138+
----
139+
1 1
140+
2 2
141+
1 2
142+
2 3
143+
1 3
144+
2 4
145+
1 4
146+
128147
# setup
129148
statement ok
130149
CREATE EXTERNAL TABLE balance STORED as CSV LOCATION '../core/tests/data/recursive_cte/balance.csv' OPTIONS ('format.has_header' 'true');
@@ -647,21 +666,51 @@ ORDER BY
647666
3 1400 1
648667
1 2700 2
649668

650-
#expect error from recursive CTE with nested recursive terms
651-
query error DataFusion error: This feature is not implemented: Recursive queries cannot be nested
669+
#nested recursive ctes
670+
query I
652671
WITH RECURSIVE outer_cte AS (
653672
SELECT 1 as a
654673
UNION ALL (
655-
WITH RECURSIVE nested_cte AS (
674+
WITH RECURSIVE nested_cte AS (
656675
SELECT 1 as a
657676
UNION ALL
658-
SELECT a+2 as a
659-
FROM nested_cte where a < 3
660-
)
661-
SELECT outer_cte.a +2
662-
FROM outer_cte JOIN nested_cte USING(a)
663-
WHERE nested_cte.a < 4
664-
)
677+
SELECT a + 2 as a
678+
FROM nested_cte where a < 3
679+
)
680+
SELECT outer_cte.a + 2 as a
681+
FROM outer_cte JOIN nested_cte USING(a)
682+
WHERE nested_cte.a < 4
683+
)
684+
)
685+
SELECT a FROM outer_cte;
686+
----
687+
1
688+
3
689+
5
690+
691+
# Check that CTE name shadowing is returning an error
692+
query error DataFusion error: Error during planning: WITH query name "outer_cte" specified more than once
693+
WITH RECURSIVE outer_cte AS (
694+
SELECT 1 as a
695+
UNION ALL (
696+
WITH RECURSIVE nested_cte AS (
697+
SELECT 1 as a
698+
UNION ALL (
699+
WITH RECURSIVE outer_cte AS (
700+
SELECT 1 as a
701+
UNION ALL
702+
SELECT a + 2 as a
703+
FROM outer_cte where a < 3
704+
)
705+
SELECT nested_cte.a + outer_cte.a as a
706+
FROM nested_cte JOIN outer_cte USING(a)
707+
WHERE outer_cte_cte.a < 8
708+
)
709+
)
710+
SELECT outer_cte.a + nested_cte.a as a
711+
FROM outer_cte JOIN nested_cte USING(a)
712+
WHERE nested_cte.a < 8
713+
)
665714
)
666715
SELECT a FROM outer_cte;
667716

0 commit comments

Comments
 (0)