Skip to content

Commit 4478ec5

Browse files
committed
code review
1 parent 25529fa commit 4478ec5

File tree

16 files changed

+92
-47
lines changed

16 files changed

+92
-47
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2199,6 +2199,24 @@ public synchronized void loadHotModifiedProps(TrimProperties properties)
21992199
Long.parseLong(
22002200
properties.getProperty(
22012201
"max_object_file_size_in_byte", String.valueOf(conf.getMaxObjectSizeInByte()))));
2202+
2203+
// The buffer for cte materialization.
2204+
long cteBufferSizeInBytes =
2205+
Long.parseLong(
2206+
properties.getProperty(
2207+
"cte_buffer_size_in_bytes", Long.toString(conf.getCteBufferSize())));
2208+
if (cteBufferSizeInBytes > 0) {
2209+
conf.setCteBufferSize(cteBufferSizeInBytes);
2210+
}
2211+
// max number of rows for cte materialization
2212+
int maxRowsInCteBuffer =
2213+
Integer.parseInt(
2214+
properties.getProperty(
2215+
"max_rows_in_cte_buffer", Integer.toString(conf.getMaxRowsInCteBuffer())));
2216+
if (maxRowsInCteBuffer > 0) {
2217+
conf.setMaxRowsInCteBuffer(maxRowsInCteBuffer);
2218+
}
2219+
22022220
} catch (Exception e) {
22032221
if (e instanceof InterruptedException) {
22042222
Thread.currentThread().interrupt();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,10 @@ public enum ExplainType {
137137
// planning
138138
private final Map<NodeRef<Table>, Long> cteMaterializationCosts = new HashMap<>();
139139

140-
// Never materialize CTE in a subquery.
141-
private boolean subquery = false;
140+
// Indicates whether this query context is for a sub-query triggered by the main query.
141+
// Sub-queries are independent queries spawned from the main query (e.g., CTE sub-queries).
142+
// When true, CTE materialization is skipped as it's handled by the main query context.
143+
private boolean innerTriggeredQuery = false;
142144

143145
// Tables in the subquery
144146
private final Map<NodeRef<Query>, List<Identifier>> subQueryTables = new HashMap<>();
@@ -210,7 +212,7 @@ public void releaseMemoryForSchemaTree() {
210212
}
211213

212214
public void prepareForRetry() {
213-
if (!isSubquery()) {
215+
if (!isInnerTriggeredQuery()) {
214216
cleanUpCte();
215217
}
216218
this.initResultNodeContext();
@@ -223,6 +225,7 @@ private void cleanUpCte() {
223225
cteMaterializationCosts.clear();
224226
subQueryTables.clear();
225227
}
228+
226229
private void initResultNodeContext() {
227230
this.resultNodeContext = new ResultNodeContext(queryId);
228231
}
@@ -500,12 +503,12 @@ public void setUserQuery(boolean userQuery) {
500503
this.userQuery = userQuery;
501504
}
502505

503-
public boolean isSubquery() {
504-
return subquery;
506+
public boolean isInnerTriggeredQuery() {
507+
return innerTriggeredQuery;
505508
}
506509

507-
public void setSubquery(boolean subquery) {
508-
this.subquery = subquery;
510+
public void setInnerTriggeredQuery(boolean innerTriggeredQuery) {
511+
this.innerTriggeredQuery = innerTriggeredQuery;
509512
}
510513

511514
public void addCteMaterializationCost(Table table, long cost) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ public class FragmentInstanceExecution {
5959
// It will be set to null while this FI is FINISHED
6060
private List<IDriver> drivers;
6161

62+
// Indicates whether this fragment instance should be ignored for statistics collection.
63+
// This is true when the fragment instance contains ExplainAnalyzeOperator, which is
64+
// a virtual fragment used for EXPLAIN ANALYZE and should not be included in query statistics.
65+
boolean shouldIgnoreForStatistics;
66+
6267
// It will be set to null while this FI is FINISHED
6368
private ISink sink;
6469

@@ -110,6 +115,7 @@ private FragmentInstanceExecution(
110115
this.stateMachine = stateMachine;
111116
this.timeoutInMs = timeoutInMs;
112117
this.exchangeManager = exchangeManager;
118+
this.shouldIgnoreForStatistics = shouldIgnoreForStatistics();
113119
}
114120

115121
public FragmentInstanceState getInstanceState() {
@@ -141,7 +147,7 @@ public FragmentInstanceStateMachine getStateMachine() {
141147
}
142148

143149
// Check if this fragment instance should be ignored for statistics
144-
// (i.e., it contains ExplainAnalyzeOperator only)
150+
// (i.e., it contains ExplainAnalyzeOperator)
145151
private boolean shouldIgnoreForStatistics() {
146152
if (drivers == null || drivers.isEmpty()) {
147153
return false;
@@ -166,7 +172,7 @@ private boolean fillFragmentInstanceStatistics(
166172
statistics.setState(getInstanceState().toString());
167173
// Previously we ignore statistics when current data region is instance of
168174
// VirtualDataRegion. Now data region of a CteScanNode is also virtual.
169-
if (shouldIgnoreForStatistics()) {
175+
if (shouldIgnoreForStatistics) {
170176
// We don't need to output the region having ExplainAnalyzeOperator only.
171177
return false;
172178
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.iotdb.db.utils.cte.CteDataStore;
3131
import org.apache.iotdb.db.utils.cte.MemoryReader;
3232

33+
import org.apache.tsfile.common.conf.TSFileDescriptor;
3334
import org.apache.tsfile.read.common.block.TsBlock;
3435
import org.apache.tsfile.utils.RamUsageEstimator;
3536
import org.slf4j.Logger;
@@ -40,6 +41,9 @@ public class CteScanOperator implements SourceOperator {
4041
private static final long INSTANCE_SIZE =
4142
RamUsageEstimator.shallowSizeOfInstance(CteScanOperator.class);
4243

44+
private final long maxReturnSize =
45+
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
46+
4347
private final OperatorContext operatorContext;
4448
private final PlanNodeId sourceId;
4549

@@ -79,7 +83,7 @@ public void close() throws Exception {
7983
dataReader.close();
8084
}
8185
} catch (Exception e) {
82-
LOGGER.error("Fail to close fileChannel", e);
86+
LOGGER.error("Fail to close CteDataReader", e);
8387
}
8488
}
8589

@@ -95,8 +99,7 @@ public long calculateMaxPeekMemory() {
9599

96100
@Override
97101
public long calculateMaxReturnSize() {
98-
// The returned object is a reference to TsBlock in CteDataReader
99-
return RamUsageEstimator.NUM_BYTES_OBJECT_REF;
102+
return maxReturnSize;
100103
}
101104

102105
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ public ExecutionResult executeForTableModel(
427427
sql,
428428
userQuery,
429429
((queryContext, startTime) -> {
430-
queryContext.setSubquery(true);
430+
queryContext.setInnerTriggeredQuery(true);
431431
queryContext.setCteQueries(cteQueries);
432432
queryContext.setExplainType(explainType);
433433
return createQueryExecutionForTableModel(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableFunctionInvocation;
7171
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WindowFrame;
7272
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.With;
73+
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
7374
import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
7475

7576
import com.google.common.collect.ArrayListMultimap;
@@ -124,6 +125,8 @@ public class Analysis implements IAnalysis {
124125

125126
private final Map<NodeRef<Table>, Query> namedQueries = new LinkedHashMap<>();
126127

128+
// WITH clause stored during analyze phase. Required for constant folding and CTE materialization
129+
// subqueries, which cannot directly access the WITH clause
127130
private With with;
128131

129132
// map expandable query to the node being the inner recursive reference
@@ -255,6 +258,11 @@ public class Analysis implements IAnalysis {
255258

256259
private boolean isQuery = false;
257260

261+
// SqlParser is needed during query planning phase for executing uncorrelated scalar subqueries
262+
// in advance (predicate folding). The planner needs to parse and execute these subqueries
263+
// independently to utilize predicate pushdown optimization.
264+
private SqlParser sqlParser;
265+
258266
public Analysis(@Nullable Statement root, Map<NodeRef<Parameter>, Expression> parameters) {
259267
this.root = root;
260268
this.parameters = ImmutableMap.copyOf(requireNonNull(parameters, "parameters is null"));
@@ -277,6 +285,14 @@ public void setUpdateType(String updateType) {
277285
this.updateType = updateType;
278286
}
279287

288+
public SqlParser getSqlParser() {
289+
return sqlParser;
290+
}
291+
292+
public void setSqlParser(SqlParser sqlParser) {
293+
this.sqlParser = sqlParser;
294+
}
295+
280296
public Query getNamedQuery(Table table) {
281297
return namedQueries.get(NodeRef.of(table));
282298
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public Analysis analyze(Statement statement) {
8080
warningCollector);
8181

8282
Analysis analysis = new Analysis(rewrittenStatement, parameterLookup);
83+
analysis.setSqlParser(statementAnalyzerFactory.getSqlParser());
8384
// Register CTE passed by parent query.
8485
context
8586
.getCteQueries()

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Scope.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,11 @@ public class Scope {
5454
private final RelationId relationId;
5555
private final RelationType relation;
5656
private final Map<String, WithQuery> namedQueries;
57-
private final List<Identifier> tables;
57+
58+
// Tables to access for the current relation. For CTE materialization and constant folding
59+
// subqueries, non-materialized CTEs in tables must be identified, and their definitions
60+
// attached to the subquery context.
61+
private List<Identifier> tables;
5862

5963
public static Scope create() {
6064
return builder().build();
@@ -83,12 +87,8 @@ public void addTable(Table table) {
8387
tables.add(new Identifier(table.getName().getSuffix()));
8488
}
8589

86-
public void addTables(List<Identifier> tables) {
87-
this.tables.addAll(tables);
88-
}
89-
90-
public Scope copy() {
91-
return builder().like(this).build();
90+
public void setTables(List<Identifier> tables) {
91+
this.tables = tables;
9292
}
9393

9494
public List<Identifier> getTables() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3630,15 +3630,13 @@ protected Scope visitJoin(Join node, Optional<Scope> scope) {
36303630

36313631
joinConditionCheck(criteria);
36323632

3633-
Optional<Scope> leftScope = scope.map(Scope::copy);
3634-
Scope left = process(node.getLeft(), leftScope);
3635-
Optional<Scope> rightScope = scope.map(Scope::copy);
3636-
Scope right = process(node.getRight(), rightScope);
3633+
// remember current tables in the scope
3634+
List<Identifier> tables = new ArrayList<>();
3635+
scope.ifPresent(s -> tables.addAll(s.getTables()));
36373636

3638-
if (scope.isPresent()) {
3639-
leftScope.ifPresent(l -> scope.get().addTables(l.getTables()));
3640-
rightScope.ifPresent(l -> scope.get().addTables(l.getTables()));
3641-
}
3637+
Scope left = process(node.getLeft(), scope);
3638+
scope.ifPresent(s -> s.setTables(tables));
3639+
Scope right = process(node.getRight(), scope);
36423640

36433641
if (criteria instanceof JoinUsing) {
36443642
return analyzeJoinUsing(node, ((JoinUsing) criteria).getColumns(), scope, left, right);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzerFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,8 @@ public static StatementAnalyzerFactory createTestingStatementAnalyzerFactory(
7777
public AccessControl getAccessControl() {
7878
return accessControl;
7979
}
80+
81+
public SqlParser getSqlParser() {
82+
return sqlParser;
83+
}
8084
}

0 commit comments

Comments
 (0)