Skip to content

Commit 2710cb3

Browse files
authored
Add state predicate push down for system table current_queries (#16904)
1 parent b71401a commit 2710cb3

File tree

8 files changed

+259
-21
lines changed

8 files changed

+259
-21
lines changed

integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBCurrentQueriesIT.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void testCurrentQueries() {
8383
statement.execute("set configuration \"query_cost_stat_window\"='1'");
8484

8585
// 1. query current_queries table
86-
String sql = "SELECT * FROM current_queries";
86+
String sql = "SELECT * FROM current_queries WHERE state='RUNNING'";
8787
ResultSet resultSet = statement.executeQuery(sql);
8888
ResultSetMetaData metaData = resultSet.getMetaData();
8989
Assert.assertEquals(CURRENT_QUERIES_COLUMN_NUM, metaData.getColumnCount());
@@ -116,7 +116,7 @@ public void testCurrentQueries() {
116116
Assert.assertEquals(61, rowNum);
117117

118118
// 3. requery current_queries table
119-
sql = "SELECT * FROM current_queries";
119+
sql = "SELECT * FROM current_queries WHERE state='FINISHED'";
120120
resultSet = statement.executeQuery(sql);
121121
metaData = resultSet.getMetaData();
122122
Assert.assertEquals(CURRENT_QUERIES_COLUMN_NUM, metaData.getColumnCount());
@@ -128,13 +128,14 @@ public void testCurrentQueries() {
128128
}
129129
rowNum++;
130130
}
131-
// three rows in the result, 2 FINISHED and 1 RUNNING
132-
Assert.assertEquals(3, rowNum);
131+
// two rows in the result, 2 FINISHED
132+
Assert.assertEquals(2, rowNum);
133133
Assert.assertEquals(2, finishedQueries);
134134
resultSet.close();
135135

136136
// 4. test the expired QueryInfo was evicted
137137
Thread.sleep(61_001);
138+
sql = "SELECT * FROM current_queries";
138139
resultSet = statement.executeQuery(sql);
139140
rowNum = 0;
140141
while (resultSet.next()) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,15 @@
6969
import org.apache.iotdb.db.protocol.session.SessionManager;
7070
import org.apache.iotdb.db.queryengine.common.ConnectionInfo;
7171
import org.apache.iotdb.db.queryengine.common.QueryId;
72+
import org.apache.iotdb.db.queryengine.execution.QueryState;
7273
import org.apache.iotdb.db.queryengine.plan.Coordinator;
7374
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
7475
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowCreateViewTask;
7576
import org.apache.iotdb.db.queryengine.plan.relational.function.TableBuiltinTableFunction;
7677
import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
78+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
79+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
80+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.StringLiteral;
7781
import org.apache.iotdb.db.queryengine.plan.relational.sql.util.ReservedIdentifiers;
7882
import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlKeywords;
7983
import org.apache.iotdb.db.schemaengine.table.InformationSchemaUtils;
@@ -130,7 +134,10 @@ public class InformationSchemaContentSupplierFactory {
130134
private InformationSchemaContentSupplierFactory() {}
131135

132136
public static Iterator<TsBlock> getSupplier(
133-
final String tableName, final List<TSDataType> dataTypes, final UserEntity userEntity) {
137+
final String tableName,
138+
final List<TSDataType> dataTypes,
139+
Expression predicate,
140+
final UserEntity userEntity) {
134141
try {
135142
switch (tableName) {
136143
case InformationSchema.QUERIES:
@@ -168,7 +175,7 @@ public static Iterator<TsBlock> getSupplier(
168175
case InformationSchema.CONNECTIONS:
169176
return new ConnectionsSupplier(dataTypes, userEntity);
170177
case InformationSchema.CURRENT_QUERIES:
171-
return new CurrentQueriesSupplier(dataTypes, userEntity);
178+
return new CurrentQueriesSupplier(dataTypes, predicate, userEntity);
172179
case InformationSchema.QUERIES_COSTS_HISTOGRAM:
173180
return new QueriesCostsHistogramSupplier(dataTypes, userEntity);
174181
default:
@@ -1188,9 +1195,24 @@ private static class CurrentQueriesSupplier extends TsBlockSupplier {
11881195
private int nextConsumedIndex;
11891196
private List<Coordinator.StatedQueriesInfo> queriesInfo;
11901197

1191-
private CurrentQueriesSupplier(final List<TSDataType> dataTypes, final UserEntity userEntity) {
1198+
private CurrentQueriesSupplier(
1199+
final List<TSDataType> dataTypes, Expression predicate, final UserEntity userEntity) {
11921200
super(dataTypes);
1193-
queriesInfo = Coordinator.getInstance().getCurrentQueriesInfo();
1201+
1202+
if (predicate == null) {
1203+
queriesInfo = Coordinator.getInstance().getCurrentQueriesInfo();
1204+
} else if (QueryState.RUNNING
1205+
.toString()
1206+
.equals(((StringLiteral) ((ComparisonExpression) predicate).getRight()).getValue())) {
1207+
queriesInfo = Coordinator.getInstance().getRunningQueriesInfos();
1208+
} else if (QueryState.FINISHED
1209+
.toString()
1210+
.equals(((StringLiteral) ((ComparisonExpression) predicate).getRight()).getValue())) {
1211+
queriesInfo = Coordinator.getInstance().getFinishedQueriesInfos();
1212+
} else {
1213+
queriesInfo = Collections.emptyList();
1214+
}
1215+
11941216
try {
11951217
accessControl.checkUserGlobalSysPrivilege(userEntity);
11961218
} catch (final AccessDeniedException e) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -865,6 +865,39 @@ private void clearExpiredQueriesInfoTask() {
865865
}
866866
}
867867

868+
public List<StatedQueriesInfo> getRunningQueriesInfos() {
869+
long currentTime = System.currentTimeMillis();
870+
return getAllQueryExecutions().stream()
871+
.map(
872+
queryExecution ->
873+
new StatedQueriesInfo(
874+
QueryState.RUNNING,
875+
queryExecution.getQueryId(),
876+
queryExecution.getStartExecutionTime(),
877+
DEFAULT_END_TIME,
878+
(currentTime - queryExecution.getStartExecutionTime()) / 1000,
879+
queryExecution.getExecuteSQL().orElse("UNKNOWN"),
880+
queryExecution.getUser(),
881+
queryExecution.getClientHostname()))
882+
.collect(Collectors.toList());
883+
}
884+
885+
public List<StatedQueriesInfo> getFinishedQueriesInfos() {
886+
long currentTime = System.currentTimeMillis();
887+
List<StatedQueriesInfo> result = new ArrayList<>();
888+
Iterator<QueryInfo> historyQueriesIterator = currentQueriesInfo.iterator();
889+
long needRecordTime = currentTime - CONFIG.getQueryCostStatWindow() * 60 * 1_000L;
890+
while (historyQueriesIterator.hasNext()) {
891+
QueryInfo queryInfo = historyQueriesIterator.next();
892+
if (queryInfo.endTime < needRecordTime) {
893+
// out of time window, ignore it
894+
} else {
895+
result.add(new StatedQueriesInfo(QueryState.FINISHED, queryInfo));
896+
}
897+
}
898+
return result;
899+
}
900+
868901
public List<StatedQueriesInfo> getCurrentQueriesInfo() {
869902
List<IQueryExecution> runningQueries = getAllQueryExecutions();
870903
Set<String> runningQueryIdSet =

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,6 +1294,7 @@ public Operator visitInformationSchemaTableScan(
12941294
getSupplier(
12951295
node.getQualifiedObjectName().getObjectName(),
12961296
dataTypes,
1297+
node.getPushDownPredicate(),
12971298
context
12981299
.getDriverContext()
12991300
.getFragmentInstanceContext()

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId;
5757
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
5858
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
59+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
5960
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
6061
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
6162
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
@@ -68,6 +69,7 @@
6869
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression;
6970
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;
7071
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral;
72+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.StringLiteral;
7173
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference;
7274
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
7375

@@ -99,6 +101,8 @@
99101
import static com.google.common.base.Preconditions.checkState;
100102
import static com.google.common.collect.ImmutableMap.toImmutableMap;
101103
import static java.util.Objects.requireNonNull;
104+
import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATE_TABLE_MODEL;
105+
import static org.apache.iotdb.commons.schema.table.InformationSchema.CURRENT_QUERIES;
102106
import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.ATTRIBUTE;
103107
import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.FIELD;
104108
import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TIME;
@@ -440,6 +444,102 @@ && extractUnique(conjunct).contains(node.getGroupIdSymbol().get())) {
440444
return output;
441445
}
442446

447+
@Override
448+
public PlanNode visitInformationSchemaTableScan(
449+
InformationSchemaTableScanNode node, RewriteContext context) {
450+
// no predicate or table is not current_queries, just return dierectly
451+
if (TRUE_LITERAL.equals(context.inheritedPredicate)) {
452+
return node;
453+
}
454+
455+
// push-down for CURRENT_QUERIES
456+
if (CURRENT_QUERIES.equals(node.getQualifiedObjectName().getObjectName())) {
457+
SplitExpression splitExpression = splitCurrentQueriesPredicate(context.inheritedPredicate);
458+
// exist expressions can push down to scan operator
459+
if (!splitExpression.getExpressionsCanPushDown().isEmpty()) {
460+
List<Expression> expressions = splitExpression.getExpressionsCanPushDown();
461+
checkState(expressions.size() == 1, "Unexpected number of expressions in table scan");
462+
node.setPushDownPredicate(expressions.get(0));
463+
}
464+
465+
// exist expressions cannot push down
466+
if (!splitExpression.getExpressionsCannotPushDown().isEmpty()) {
467+
List<Expression> expressions = splitExpression.getExpressionsCannotPushDown();
468+
return new FilterNode(
469+
queryId.genPlanNodeId(),
470+
node,
471+
expressions.size() == 1
472+
? expressions.get(0)
473+
: new LogicalExpression(LogicalExpression.Operator.AND, expressions));
474+
}
475+
return node;
476+
}
477+
478+
FilterNode filterNode =
479+
new FilterNode(queryId.genPlanNodeId(), node, context.inheritedPredicate);
480+
context.inheritedPredicate = TRUE_LITERAL;
481+
return filterNode;
482+
}
483+
484+
private SplitExpression splitCurrentQueriesPredicate(Expression predicate) {
485+
List<Expression> expressionsCanPushDown = new ArrayList<>();
486+
List<Expression> expressionsCannotPushDown = new ArrayList<>();
487+
488+
if (predicate instanceof LogicalExpression
489+
&& ((LogicalExpression) predicate).getOperator() == LogicalExpression.Operator.AND) {
490+
491+
// predicate like state = 'xxx' can be push down
492+
// Note: the optimizer CanonicalizeExpressionRewriter will ensure the predicate like 'xxx' =
493+
// state will be canonicalized to state = 'xxx'
494+
boolean hasExpressionPushDown = false;
495+
for (Expression expression : ((LogicalExpression) predicate).getTerms()) {
496+
if (isStateComparedWithConstant(expression) && !hasExpressionPushDown) {
497+
// if there are more than one state = 'xxx' terms, only add first to push-down candidate
498+
expressionsCanPushDown.add(expression);
499+
hasExpressionPushDown = true;
500+
} else {
501+
expressionsCannotPushDown.add(expression);
502+
}
503+
}
504+
505+
return new SplitExpression(
506+
Collections.emptyList(), expressionsCanPushDown, expressionsCannotPushDown, null);
507+
}
508+
509+
if (isStateComparedWithConstant(predicate)) {
510+
expressionsCanPushDown.add(predicate);
511+
} else {
512+
expressionsCannotPushDown.add(predicate);
513+
}
514+
515+
return new SplitExpression(
516+
Collections.emptyList(), expressionsCanPushDown, expressionsCannotPushDown, null);
517+
}
518+
519+
private boolean isStateComparedWithConstant(Expression expression) {
520+
if (!(expression instanceof ComparisonExpression)) {
521+
return false;
522+
}
523+
524+
ComparisonExpression comparisonExpression = (ComparisonExpression) expression;
525+
526+
if (ComparisonExpression.Operator.EQUAL != comparisonExpression.getOperator()) {
527+
return false;
528+
}
529+
530+
if (!(comparisonExpression.getLeft() instanceof SymbolReference)
531+
|| !STATE_TABLE_MODEL.equals(
532+
((SymbolReference) comparisonExpression.getLeft()).getName())) {
533+
return false;
534+
}
535+
536+
if (!(comparisonExpression.getRight() instanceof StringLiteral)) {
537+
return false;
538+
}
539+
540+
return true;
541+
}
542+
443543
@Override
444544
public PlanNode visitDeviceTableScan(
445545
DeviceTableScanNode tableScanNode, RewriteContext context) {

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/ColumnReference.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
2626
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
2727
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
28-
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
28+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
2929

3030
import java.util.Map;
3131
import java.util.Optional;
@@ -49,10 +49,10 @@ public Optional<Symbol> getAssignedSymbol(
4949
String actualTableName;
5050
Map<Symbol, ColumnSchema> assignments;
5151

52-
if (node instanceof DeviceTableScanNode) {
53-
DeviceTableScanNode deviceTableScanNode = (DeviceTableScanNode) node;
54-
actualTableName = deviceTableScanNode.getQualifiedObjectName().toString();
55-
assignments = deviceTableScanNode.getAssignments();
52+
if (node instanceof TableScanNode) {
53+
TableScanNode tableScanNode = (TableScanNode) node;
54+
actualTableName = tableScanNode.getQualifiedObjectName().toString();
55+
assignments = tableScanNode.getAssignments();
5656
}
5757
/*else if (node instanceof IndexSourceNode indexSourceNode) {
5858
tableHandle = indexSourceNode.getTableHandle();

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,14 +149,18 @@ public static PlanMatchPattern infoSchemaTableScan(
149149

150150
public static PlanMatchPattern infoSchemaTableScan(
151151
String expectedTableName, Optional<Integer> dataNodeId, List<String> outputSymbols) {
152-
return node(InformationSchemaTableScanNode.class)
153-
.with(
154-
new InformationSchemaTableScanMatcher(
155-
expectedTableName,
156-
Optional.empty(),
157-
outputSymbols,
158-
Collections.emptySet(),
159-
dataNodeId));
152+
PlanMatchPattern pattern =
153+
node(InformationSchemaTableScanNode.class)
154+
.with(
155+
new InformationSchemaTableScanMatcher(
156+
expectedTableName,
157+
Optional.empty(),
158+
outputSymbols,
159+
Collections.emptySet(),
160+
dataNodeId));
161+
outputSymbols.forEach(
162+
symbol -> pattern.withAlias(symbol, new ColumnReference(expectedTableName, symbol)));
163+
return pattern;
160164
}
161165

162166
public static PlanMatchPattern treeDeviceViewTableScan(

0 commit comments

Comments
 (0)