Skip to content

Commit 680a52e

Browse files
starocean999BiteTheDDDDt
authored andcommitted
recursive cte fe part
fix some bug in fe fix fe plan update fe code update fe update fe part update fe add ut
1 parent 8b26ec1 commit 680a52e

File tree

49 files changed

+2825
-73
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+2825
-73
lines changed

fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,7 @@ REAL: 'REAL';
442442
REBALANCE: 'REBALANCE';
443443
RECENT: 'RECENT';
444444
RECOVER: 'RECOVER';
445+
RECURSIVE: 'RECURSIVE';
445446
RECYCLE: 'RECYCLE';
446447
REFRESH: 'REFRESH';
447448
REFERENCES: 'REFERENCES';

fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1232,7 +1232,7 @@ querySpecification
12321232
;
12331233

12341234
cte
1235-
: WITH aliasQuery (COMMA aliasQuery)*
1235+
: WITH RECURSIVE? aliasQuery (COMMA aliasQuery)*
12361236
;
12371237

12381238
aliasQuery
@@ -2151,6 +2151,7 @@ nonReserved
21512151
| RANDOM
21522152
| RECENT
21532153
| RECOVER
2154+
| RECURSIVE
21542155
| RECYCLE
21552156
| REFRESH
21562157
| REPEATABLE

fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package org.apache.doris.analysis;
2222

23+
import org.apache.doris.catalog.RecursiveCteTempTable;
2324
import org.apache.doris.catalog.TableIf;
2425
import org.apache.doris.common.IdGenerator;
2526
import org.apache.doris.thrift.TDescriptorTable;
@@ -100,6 +101,10 @@ public TDescriptorTable toThrift() {
100101
}
101102

102103
for (TableIf tbl : referencedTbls.values()) {
104+
if (tbl instanceof RecursiveCteTempTable) {
105+
// skip recursive cte temp table
106+
continue;
107+
}
103108
result.addToTableDescriptors(tbl.toThrift());
104109
}
105110
thriftDescTable = result;
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.catalog;
19+
20+
import org.apache.doris.common.SystemIdGenerator;
21+
22+
import java.util.List;
23+
24+
public class RecursiveCteTempTable extends Table {
25+
public RecursiveCteTempTable(String tableName, List<Column> fullSchema) {
26+
super(SystemIdGenerator.getNextId(), tableName, TableType.RECURSIVE_CTE_TEMP_TABLE, fullSchema);
27+
}
28+
}

fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -319,13 +319,13 @@ default PrimaryKeyConstraint tryGetPrimaryKeyForForeignKeyUnsafe(
319319
default void addForeignConstraint(String name, ImmutableList<String> columns,
320320
TableIf referencedTable, ImmutableList<String> referencedColumns, boolean replay) {
321321
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
322-
ForeignKeyConstraint foreignKeyConstraint =
323-
new ForeignKeyConstraint(name, columns, referencedTable, referencedColumns);
322+
ForeignKeyConstraint foreignKeyConstraint = new ForeignKeyConstraint(name, columns, referencedTable,
323+
referencedColumns);
324324
checkConstraintNotExistenceUnsafe(name, foreignKeyConstraint, constraintMap);
325325
PrimaryKeyConstraint requirePrimaryKeyName = new PrimaryKeyConstraint(name,
326326
foreignKeyConstraint.getReferencedColumnNames());
327-
PrimaryKeyConstraint primaryKeyConstraint =
328-
tryGetPrimaryKeyForForeignKeyUnsafe(requirePrimaryKeyName, referencedTable);
327+
PrimaryKeyConstraint primaryKeyConstraint = tryGetPrimaryKeyForForeignKeyUnsafe(requirePrimaryKeyName,
328+
referencedTable);
329329
primaryKeyConstraint.addForeignTable(this);
330330
constraintMap.put(name, foreignKeyConstraint);
331331
if (!replay) {
@@ -445,10 +445,13 @@ default boolean needReadLockWhenPlan() {
445445
*/
446446
enum TableType {
447447
MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE,
448-
@Deprecated ICEBERG, @Deprecated HUDI, JDBC,
448+
@Deprecated
449+
ICEBERG, @Deprecated
450+
HUDI, JDBC,
449451
TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE,
450452
ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, PAIMON_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE,
451-
HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE, LAKESOUl_EXTERNAL_TABLE, DICTIONARY, DORIS_EXTERNAL_TABLE;
453+
HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE, LAKESOUl_EXTERNAL_TABLE, DICTIONARY, DORIS_EXTERNAL_TABLE,
454+
RECURSIVE_CTE_TEMP_TABLE;
452455

453456
public String toEngineName() {
454457
switch (this) {
@@ -489,6 +492,8 @@ public String toEngineName() {
489492
return "dictionary";
490493
case DORIS_EXTERNAL_TABLE:
491494
return "External_Doris";
495+
case RECURSIVE_CTE_TEMP_TABLE:
496+
return "RecursiveCteTempTable";
492497
default:
493498
return null;
494499
}
@@ -528,6 +533,7 @@ public String toMysqlType() {
528533
case MATERIALIZED_VIEW:
529534
case TRINO_CONNECTOR_EXTERNAL_TABLE:
530535
case DORIS_EXTERNAL_TABLE:
536+
case RECURSIVE_CTE_TEMP_TABLE:
531537
return "BASE TABLE";
532538
default:
533539
return null;
@@ -640,4 +646,3 @@ default Optional<TableValuedFunctionRefInfo> getSysTableFunctionRef(
640646
return Optional.empty();
641647
}
642648
}
643-

fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ public class CascadesContext implements ScheduleContext {
133133
private final boolean isEnableExprTrace;
134134

135135
private int groupExpressionCount = 0;
136+
private Optional<String> currentRecursiveCteName;
137+
private List<Slot> recursiveCteOutputs;
136138

137139
/**
138140
* Constructor of OptimizerContext.
@@ -142,7 +144,8 @@ public class CascadesContext implements ScheduleContext {
142144
*/
143145
private CascadesContext(Optional<CascadesContext> parent, Optional<CTEId> currentTree,
144146
StatementContext statementContext, Plan plan, Memo memo,
145-
CTEContext cteContext, PhysicalProperties requireProperties, boolean isLeadingDisableJoinReorder) {
147+
CTEContext cteContext, PhysicalProperties requireProperties, boolean isLeadingDisableJoinReorder,
148+
Optional<String> currentRecursiveCteName, List<Slot> recursiveCteOutputs) {
146149
this.parent = Objects.requireNonNull(parent, "parent should not null");
147150
this.currentTree = Objects.requireNonNull(currentTree, "currentTree should not null");
148151
this.statementContext = Objects.requireNonNull(statementContext, "statementContext should not null");
@@ -167,6 +170,8 @@ private CascadesContext(Optional<CascadesContext> parent, Optional<CTEId> curren
167170
this.isEnableExprTrace = false;
168171
}
169172
this.isLeadingDisableJoinReorder = isLeadingDisableJoinReorder;
173+
this.currentRecursiveCteName = currentRecursiveCteName;
174+
this.recursiveCteOutputs = recursiveCteOutputs;
170175
}
171176

172177
/** init a temporary context to rewrite expression */
@@ -181,7 +186,7 @@ public static CascadesContext initTempContext() {
181186
}
182187
return newContext(Optional.empty(), Optional.empty(),
183188
statementContext, DUMMY_PLAN,
184-
new CTEContext(), PhysicalProperties.ANY, false);
189+
new CTEContext(), PhysicalProperties.ANY, false, Optional.empty(), ImmutableList.of());
185190
}
186191

187192
/**
@@ -190,24 +195,25 @@ public static CascadesContext initTempContext() {
190195
public static CascadesContext initContext(StatementContext statementContext,
191196
Plan initPlan, PhysicalProperties requireProperties) {
192197
return newContext(Optional.empty(), Optional.empty(), statementContext,
193-
initPlan, new CTEContext(), requireProperties, false);
198+
initPlan, new CTEContext(), requireProperties, false, Optional.empty(), ImmutableList.of());
194199
}
195200

196201
/**
197202
* use for analyze cte. we must pass CteContext from outer since we need to get right scope of cte
198203
*/
199204
public static CascadesContext newContextWithCteContext(CascadesContext cascadesContext,
200-
Plan initPlan, CTEContext cteContext) {
205+
Plan initPlan, CTEContext cteContext, Optional<String> currentRecursiveCteName,
206+
List<Slot> recursiveCteOutputs) {
201207
return newContext(Optional.of(cascadesContext), Optional.empty(),
202208
cascadesContext.getStatementContext(), initPlan, cteContext, PhysicalProperties.ANY,
203-
cascadesContext.isLeadingDisableJoinReorder
204-
);
209+
cascadesContext.isLeadingDisableJoinReorder, currentRecursiveCteName, recursiveCteOutputs);
205210
}
206211

207212
public static CascadesContext newCurrentTreeContext(CascadesContext context) {
208213
return CascadesContext.newContext(context.getParent(), context.getCurrentTree(), context.getStatementContext(),
209214
context.getRewritePlan(), context.getCteContext(),
210-
context.getCurrentJobContext().getRequiredProperties(), context.isLeadingDisableJoinReorder);
215+
context.getCurrentJobContext().getRequiredProperties(), context.isLeadingDisableJoinReorder,
216+
Optional.empty(), ImmutableList.of());
211217
}
212218

213219
/**
@@ -216,14 +222,17 @@ public static CascadesContext newCurrentTreeContext(CascadesContext context) {
216222
public static CascadesContext newSubtreeContext(Optional<CTEId> subtree, CascadesContext context,
217223
Plan plan, PhysicalProperties requireProperties) {
218224
return CascadesContext.newContext(Optional.of(context), subtree, context.getStatementContext(),
219-
plan, context.getCteContext(), requireProperties, context.isLeadingDisableJoinReorder);
225+
plan, context.getCteContext(), requireProperties, context.isLeadingDisableJoinReorder, Optional.empty(),
226+
ImmutableList.of());
220227
}
221228

222229
private static CascadesContext newContext(Optional<CascadesContext> parent, Optional<CTEId> subtree,
223230
StatementContext statementContext, Plan initPlan, CTEContext cteContext,
224-
PhysicalProperties requireProperties, boolean isLeadingDisableJoinReorder) {
231+
PhysicalProperties requireProperties, boolean isLeadingDisableJoinReorder,
232+
Optional<String> currentRecursiveCteName, List<Slot> recursiveCteOutputs) {
225233
return new CascadesContext(parent, subtree, statementContext, initPlan, null,
226-
cteContext, requireProperties, isLeadingDisableJoinReorder);
234+
cteContext, requireProperties, isLeadingDisableJoinReorder, currentRecursiveCteName,
235+
recursiveCteOutputs);
227236
}
228237

229238
public CascadesContext getRoot() {
@@ -250,6 +259,18 @@ public synchronized boolean isTimeout() {
250259
return isTimeout;
251260
}
252261

262+
public Optional<String> getCurrentRecursiveCteName() {
263+
return currentRecursiveCteName;
264+
}
265+
266+
public List<Slot> getRecursiveCteOutputs() {
267+
return recursiveCteOutputs;
268+
}
269+
270+
public boolean isAnalyzingRecursiveCteAnchorChild() {
271+
return currentRecursiveCteName.isPresent() && recursiveCteOutputs.isEmpty();
272+
}
273+
253274
/**
254275
* Init memo with plan
255276
*/

fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,9 @@
153153
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
154154
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
155155
import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
156+
import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCte;
157+
import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCteRecursiveChild;
158+
import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCteScan;
156159
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
157160
import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat;
158161
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
@@ -202,6 +205,8 @@
202205
import org.apache.doris.planner.PartitionSortNode;
203206
import org.apache.doris.planner.PlanFragment;
204207
import org.apache.doris.planner.PlanNode;
208+
import org.apache.doris.planner.RecursiveCteNode;
209+
import org.apache.doris.planner.RecursiveCteScanNode;
205210
import org.apache.doris.planner.RepeatNode;
206211
import org.apache.doris.planner.ResultFileSink;
207212
import org.apache.doris.planner.ResultSink;
@@ -1038,6 +1043,28 @@ public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT
10381043
return planFragment;
10391044
}
10401045

1046+
@Override
1047+
public PlanFragment visitPhysicalRecursiveCteScan(PhysicalRecursiveCteScan recursiveCteScan,
1048+
PlanTranslatorContext context) {
1049+
TableIf table = recursiveCteScan.getTable();
1050+
List<Slot> slots = ImmutableList.copyOf(recursiveCteScan.getOutput());
1051+
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context);
1052+
1053+
RecursiveCteScanNode scanNode = new RecursiveCteScanNode(table != null ? table.getName() : "",
1054+
context.nextPlanNodeId(), tupleDescriptor);
1055+
scanNode.setNereidsId(recursiveCteScan.getId());
1056+
context.getNereidsIdToPlanNodeIdMap().put(recursiveCteScan.getId(), scanNode.getId());
1057+
Utils.execWithUncheckedException(scanNode::initScanRangeLocations);
1058+
1059+
translateRuntimeFilter(recursiveCteScan, scanNode, context);
1060+
1061+
context.addScanNode(scanNode, recursiveCteScan);
1062+
PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, recursiveCteScan);
1063+
context.addPlanFragment(planFragment);
1064+
updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), recursiveCteScan);
1065+
return planFragment;
1066+
}
1067+
10411068
private List<Expr> translateToExprs(List<Expression> expressions, PlanTranslatorContext context) {
10421069
List<Expr> exprs = Lists.newArrayListWithCapacity(expressions.size());
10431070
for (Expression expression : expressions) {
@@ -2166,8 +2193,10 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
21662193
if (inputPlanNode instanceof OlapScanNode) {
21672194
((OlapScanNode) inputPlanNode).updateRequiredSlots(context, requiredByProjectSlotIdSet);
21682195
}
2169-
updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet,
2170-
requiredByProjectSlotIdSet, context);
2196+
if (!(inputPlanNode instanceof RecursiveCteScanNode)) {
2197+
updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet,
2198+
requiredByProjectSlotIdSet, context);
2199+
}
21712200
} else {
21722201
if (project.child() instanceof PhysicalDeferMaterializeTopN) {
21732202
inputFragment.setOutputExprs(allProjectionExprs);
@@ -2180,6 +2209,80 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
21802209
return inputFragment;
21812210
}
21822211

2212+
@Override
2213+
public PlanFragment visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, PlanTranslatorContext context) {
2214+
List<PlanFragment> childrenFragments = new ArrayList<>();
2215+
for (Plan plan : recursiveCte.children()) {
2216+
childrenFragments.add(plan.accept(this, context));
2217+
}
2218+
2219+
TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), null, context);
2220+
List<SlotDescriptor> outputSlotDescs = new ArrayList<>(setTuple.getSlots());
2221+
2222+
RecursiveCteNode recursiveCteNode = new RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(),
2223+
recursiveCte.getCteName(), recursiveCte.isUnionAll());
2224+
List<List<Expr>> distributeExprLists = getDistributeExprs(recursiveCte.children().toArray(new Plan[0]));
2225+
recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists);
2226+
recursiveCteNode.setNereidsId(recursiveCte.getId());
2227+
List<List<Expression>> resultExpressionLists = Lists.newArrayList();
2228+
context.getNereidsIdToPlanNodeIdMap().put(recursiveCte.getId(), recursiveCteNode.getId());
2229+
for (List<SlotReference> regularChildrenOutput : recursiveCte.getRegularChildrenOutputs()) {
2230+
resultExpressionLists.add(new ArrayList<>(regularChildrenOutput));
2231+
}
2232+
2233+
for (PlanFragment childFragment : childrenFragments) {
2234+
recursiveCteNode.addChild(childFragment.getPlanRoot());
2235+
}
2236+
2237+
List<List<Expr>> materializedResultExprLists = Lists.newArrayList();
2238+
for (int i = 0; i < resultExpressionLists.size(); ++i) {
2239+
List<Expression> resultExpressionList = resultExpressionLists.get(i);
2240+
List<Expr> exprList = Lists.newArrayList();
2241+
Preconditions.checkState(resultExpressionList.size() == outputSlotDescs.size());
2242+
for (int j = 0; j < resultExpressionList.size(); ++j) {
2243+
if (outputSlotDescs.get(j).isMaterialized()) {
2244+
exprList.add(ExpressionTranslator.translate(resultExpressionList.get(j), context));
2245+
// TODO: reconsider this, we may change nullable info in previous nereids rules not here.
2246+
outputSlotDescs.get(j)
2247+
.setIsNullable(outputSlotDescs.get(j).getIsNullable() || exprList.get(j).isNullable());
2248+
}
2249+
}
2250+
materializedResultExprLists.add(exprList);
2251+
}
2252+
recursiveCteNode.setMaterializedResultExprLists(materializedResultExprLists);
2253+
Preconditions.checkState(recursiveCteNode.getMaterializedResultExprLists().size()
2254+
== recursiveCteNode.getChildren().size());
2255+
2256+
PlanFragment recursiveCteFragment;
2257+
if (childrenFragments.isEmpty()) {
2258+
recursiveCteFragment = createPlanFragment(recursiveCteNode,
2259+
DataPartition.UNPARTITIONED, recursiveCte);
2260+
context.addPlanFragment(recursiveCteFragment);
2261+
} else {
2262+
int childrenSize = childrenFragments.size();
2263+
recursiveCteFragment = childrenFragments.get(childrenSize - 1);
2264+
for (int i = childrenSize - 2; i >= 0; i--) {
2265+
context.mergePlanFragment(childrenFragments.get(i), recursiveCteFragment);
2266+
for (PlanFragment child : childrenFragments.get(i).getChildren()) {
2267+
recursiveCteFragment.addChild(child);
2268+
}
2269+
}
2270+
setPlanRoot(recursiveCteFragment, recursiveCteNode, recursiveCte);
2271+
}
2272+
2273+
recursiveCteFragment.updateDataPartition(DataPartition.UNPARTITIONED);
2274+
recursiveCteFragment.setOutputPartition(DataPartition.UNPARTITIONED);
2275+
2276+
return recursiveCteFragment;
2277+
}
2278+
2279+
@Override
2280+
public PlanFragment visitPhysicalRecursiveCteRecursiveChild(
2281+
PhysicalRecursiveCteRecursiveChild<? extends Plan> recursiveChild,
2282+
PlanTranslatorContext context) {
2283+
return recursiveChild.child().accept(this, context);
2284+
}
2285+
21832286
/**
21842287
* Returns a new fragment with a UnionNode as its root. The data partition of the
21852288
* returned fragment and how the data of the child fragments is consumed depends on the

0 commit comments

Comments
 (0)