Skip to content

Commit dfd2ac3

Browse files
committed
cteDataStores -> cteQueries
1 parent c15b3fd commit dfd2ac3

File tree

8 files changed

+52
-54
lines changed

8 files changed

+52
-54
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public class MPPQueryContext implements IAuditEntity {
117117
private boolean userQuery = false;
118118

119119
private final Map<NodeRef<Table>, Long> cteMaterializationCosts = new HashMap<>();
120-
private Map<NodeRef<Table>, CteDataStore> cteDataStores = new HashMap<>();
120+
private Map<NodeRef<Table>, Query> cteQueries = new HashMap<>();
121121
// table -> (maxLineLength, 'explain' or 'explain analyze' result)
122122
// Max line length of each CTE should be remembered because we need to standardize
123123
// the output format of main query and CTE query.
@@ -493,20 +493,24 @@ public Map<NodeRef<Table>, Long> getCteMaterializationCosts() {
493493
return cteMaterializationCosts;
494494
}
495495

496-
public void addCteDataStore(Table table, CteDataStore dataStore) {
497-
cteDataStores.put(NodeRef.of(table), dataStore);
496+
public void addCteQuery(Table table, Query query) {
497+
cteQueries.put(NodeRef.of(table), query);
498498
}
499499

500-
public Map<NodeRef<Table>, CteDataStore> getCteDataStores() {
501-
return cteDataStores;
500+
public Map<NodeRef<Table>, Query> getCteQueries() {
501+
return cteQueries;
502502
}
503503

504504
public CteDataStore getCteDataStore(Table table) {
505-
return cteDataStores.get(NodeRef.of(table));
505+
Query query = cteQueries.get(NodeRef.of(table));
506+
if (query == null) {
507+
return null;
508+
}
509+
return query.getCteDataStore();
506510
}
507511

508-
public void setCteDataStores(Map<NodeRef<Table>, CteDataStore> cteDataStores) {
509-
this.cteDataStores = cteDataStores;
512+
public void setCteQueries(Map<NodeRef<Table>, Query> cteQueries) {
513+
this.cteQueries = cteQueries;
510514
}
511515

512516
public Map<Query, List<Identifier>> getSubQueryTables() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.MigrateRegion;
9494
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter;
9595
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeStatement;
96+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
9697
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Prepare;
9798
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ReconstructRegion;
9899
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RelationalAuthorStatement;
@@ -143,7 +144,6 @@
143144
import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
144145
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
145146
import org.apache.iotdb.db.utils.SetThreadName;
146-
import org.apache.iotdb.db.utils.cte.CteDataStore;
147147

148148
import org.apache.thrift.TBase;
149149
import org.apache.tsfile.utils.Accountable;
@@ -418,7 +418,7 @@ public ExecutionResult executeForTableModel(
418418
SessionInfo session,
419419
String sql,
420420
Metadata metadata,
421-
Map<NodeRef<Table>, CteDataStore> cteDataStoreMap,
421+
Map<NodeRef<Table>, Query> cteQueries,
422422
ExplainType explainType,
423423
long timeOut,
424424
boolean userQuery) {
@@ -429,7 +429,7 @@ public ExecutionResult executeForTableModel(
429429
userQuery,
430430
((queryContext, startTime) -> {
431431
queryContext.setSubquery(true);
432-
queryContext.setCteDataStores(cteDataStoreMap);
432+
queryContext.setCteQueries(cteQueries);
433433
queryContext.setExplainType(explainType);
434434
return createQueryExecutionForTableModel(
435435
statement,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,8 @@ public Analysis analyze(Statement statement) {
8282
Analysis analysis = new Analysis(rewrittenStatement, parameterLookup);
8383
// Register CTE passed by parent query.
8484
context
85-
.getCteDataStores()
86-
.forEach(
87-
(tableRef, dataStore) ->
88-
analysis.registerNamedQuery(tableRef.getNode(), dataStore.getQuery()));
85+
.getCteQueries()
86+
.forEach((tableRef, query) -> analysis.registerNamedQuery(tableRef.getNode(), query));
8987

9088
Statement innerStatement =
9189
rewrittenStatement instanceof PipeEnriched

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/CteMaterializer.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -82,26 +82,31 @@ public void materializeCTE(Analysis analysis, MPPQueryContext context) {
8282
.forEach(
8383
(tableRef, query) -> {
8484
Table table = tableRef.getNode();
85-
if (query.isMaterialized() && !query.isDone()) {
86-
CteDataStore dataStore =
87-
fetchCteQueryResult(context, table, query, analysis.getWith());
88-
if (dataStore == null) {
89-
// CTE query execution failed. Use inline instead of materialization
90-
// in the outer query
91-
query.setDone(false);
92-
return;
85+
if (query.isMaterialized()) {
86+
if (!query.isDone()) {
87+
CteDataStore dataStore =
88+
fetchCteQueryResult(context, table, query, analysis.getWith());
89+
if (dataStore == null) {
90+
// CTE query execution failed. Use inline instead of materialization
91+
// in the outer query
92+
query.setCteDataStore(null);
93+
return;
94+
}
95+
query.setCteDataStore(dataStore);
9396
}
94-
95-
context.addCteDataStore(table, dataStore);
96-
query.setDone(true);
97+
context.addCteQuery(table, query);
9798
}
9899
});
99100
}
100101

101102
public void cleanUpCTE(MPPQueryContext context) {
102-
Map<NodeRef<Table>, CteDataStore> cteDataStores = context.getCteDataStores();
103-
cteDataStores.values().forEach(CteDataStore::clear);
104-
cteDataStores.clear();
103+
Map<NodeRef<Table>, Query> cteQueries = context.getCteQueries();
104+
cteQueries.values().stream()
105+
.map(Query::getCteDataStore)
106+
.filter(java.util.Objects::nonNull)
107+
.distinct()
108+
.forEach(CteDataStore::clear);
109+
cteQueries.clear();
105110
}
106111

107112
public CteDataStore fetchCteQueryResult(
@@ -145,7 +150,7 @@ public CteDataStore fetchCteQueryResult(
145150
sessionManager.getSessionInfoOfTableModel(sessionManager.getCurrSession()),
146151
String.format("Materialize query for CTE '%s'", table.getName()),
147152
LocalExecutionPlanner.getInstance().metadata,
148-
context.getCteDataStores(),
153+
context.getCteQueries(),
149154
context.getExplainType(),
150155
context.getTimeOut(),
151156
false);
@@ -160,8 +165,7 @@ public CteDataStore fetchCteQueryResult(
160165
TableSchema tableSchema = getTableSchema(datasetHeader, table.getName().toString());
161166

162167
cteDataStore =
163-
new CteDataStore(
164-
query, tableSchema, datasetHeader.getColumnIndex2TsBlockColumnIndexList());
168+
new CteDataStore(tableSchema, datasetHeader.getColumnIndex2TsBlockColumnIndexList());
165169
while (execution.hasNextResult()) {
166170
final Optional<TsBlock> tsBlock;
167171
try {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/PredicateWithUncorrelatedScalarSubqueryReconstructor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public Optional<Literal> fetchUncorrelatedSubqueryResultForPredicate(
146146
.getSessionInfoOfTableModel(SessionManager.getInstance().getCurrSession()),
147147
"Try to Fetch Uncorrelated Scalar Subquery Result for Predicate",
148148
LocalExecutionPlanner.getInstance().metadata,
149-
context.getCteDataStores(),
149+
context.getCteQueries(),
150150
ExplainType.NONE,
151151
context.getTimeOut(),
152152
false);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Query.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
2121

22+
import org.apache.iotdb.db.utils.cte.CteDataStore;
23+
2224
import com.google.common.collect.ImmutableList;
2325

2426
import java.util.List;
@@ -39,8 +41,7 @@ public class Query extends Statement {
3941
private final Optional<Node> limit;
4042
// whether this query needs materialization
4143
private boolean materialized = false;
42-
// whether materialization is done
43-
private boolean done = false;
44+
private CteDataStore cteDataStore = null;
4445

4546
public Query(
4647
Optional<With> with,
@@ -112,11 +113,15 @@ public void setMaterialized(boolean materialized) {
112113
}
113114

114115
public boolean isDone() {
115-
return done;
116+
return cteDataStore != null;
117+
}
118+
119+
public void setCteDataStore(CteDataStore cteDataStore) {
120+
this.cteDataStore = cteDataStore;
116121
}
117122

118-
public void setDone(boolean done) {
119-
this.done = done;
123+
public CteDataStore getCteDataStore() {
124+
return this.cteDataStore;
120125
}
121126

122127
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/CteDataStore.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.iotdb.db.conf.IoTDBConfig;
2525
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2626
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
27-
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
2827

2928
import org.apache.tsfile.read.common.block.TsBlock;
3029

@@ -33,7 +32,6 @@
3332
import java.util.concurrent.atomic.AtomicInteger;
3433

3534
public class CteDataStore {
36-
private final Query query;
3735
private final TableSchema tableSchema;
3836
private final List<Integer> columnIndex2TsBlockColumnIndexList;
3937

@@ -44,9 +42,7 @@ public class CteDataStore {
4442
// reference count by CteScanOperator
4543
private final AtomicInteger count;
4644

47-
public CteDataStore(
48-
Query query, TableSchema tableSchema, List<Integer> columnIndex2TsBlockColumnIndexList) {
49-
this.query = query;
45+
public CteDataStore(TableSchema tableSchema, List<Integer> columnIndex2TsBlockColumnIndexList) {
5046
this.tableSchema = tableSchema;
5147
this.columnIndex2TsBlockColumnIndexList = columnIndex2TsBlockColumnIndexList;
5248
this.cachedData = new ArrayList<>();
@@ -87,10 +83,6 @@ public TableSchema getTableSchema() {
8783
return tableSchema;
8884
}
8985

90-
public Query getQuery() {
91-
return query;
92-
}
93-
9486
public List<Integer> getColumnIndex2TsBlockColumnIndexList() {
9587
return columnIndex2TsBlockColumnIndexList;
9688
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/CteScanOperatorTest.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
2727
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
2828
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
29-
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
3029
import org.apache.iotdb.db.utils.cte.CteDataStore;
3130

3231
import com.google.common.collect.ImmutableList;
@@ -72,14 +71,11 @@ public void setUp() {
7271
// Create a simple table schema for testing
7372
TableSchema tableSchema = createTestTableSchema();
7473

75-
// Create mock query
76-
Query mockQuery = mock(Query.class);
77-
7874
// Create column index mapping
7975
List<Integer> columnIndex2TsBlockColumnIndexList = Arrays.asList(0, 1, 2);
8076

8177
// Initialize CteDataStore
82-
cteDataStore = new CteDataStore(mockQuery, tableSchema, columnIndex2TsBlockColumnIndexList);
78+
cteDataStore = new CteDataStore(tableSchema, columnIndex2TsBlockColumnIndexList);
8379

8480
// Add test data to the data store
8581
List<TsBlock> testData = createTestTsBlocks();
@@ -105,9 +101,8 @@ public void testConstructor() throws Exception {
105101
@Test
106102
public void testEmptyDataStore() throws Exception {
107103
// Create empty data store
108-
Query mockQuery = mock(Query.class);
109104
TableSchema tableSchema = createTestTableSchema();
110-
CteDataStore emptyDataStore = new CteDataStore(mockQuery, tableSchema, Arrays.asList(0, 1, 2));
105+
CteDataStore emptyDataStore = new CteDataStore(tableSchema, Arrays.asList(0, 1, 2));
111106

112107
cteScanOperator = new CteScanOperator(operatorContext, planNodeId, emptyDataStore);
113108
// Should not have data

0 commit comments

Comments
 (0)