Skip to content

Commit 7f84a21

Browse files
committed
Merge branch 'master' of https://github.com/apache/iotdb into internalSession
2 parents 2f8b33c + 1bf79ca commit 7f84a21

File tree

19 files changed

+444
-43
lines changed

19 files changed

+444
-43
lines changed

integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBCurrentQueriesIT.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void testCurrentQueries() {
8383
statement.execute("set configuration \"query_cost_stat_window\"='1'");
8484

8585
// 1. query current_queries table
86-
String sql = "SELECT * FROM current_queries";
86+
String sql = "SELECT * FROM current_queries WHERE state='RUNNING'";
8787
ResultSet resultSet = statement.executeQuery(sql);
8888
ResultSetMetaData metaData = resultSet.getMetaData();
8989
Assert.assertEquals(CURRENT_QUERIES_COLUMN_NUM, metaData.getColumnCount());
@@ -116,7 +116,7 @@ public void testCurrentQueries() {
116116
Assert.assertEquals(61, rowNum);
117117

118118
// 3. requery current_queries table
119-
sql = "SELECT * FROM current_queries";
119+
sql = "SELECT * FROM current_queries WHERE state='FINISHED'";
120120
resultSet = statement.executeQuery(sql);
121121
metaData = resultSet.getMetaData();
122122
Assert.assertEquals(CURRENT_QUERIES_COLUMN_NUM, metaData.getColumnCount());
@@ -128,13 +128,14 @@ public void testCurrentQueries() {
128128
}
129129
rowNum++;
130130
}
131-
// three rows in the result, 2 FINISHED and 1 RUNNING
132-
Assert.assertEquals(3, rowNum);
131+
// two rows in the result, 2 FINISHED
132+
Assert.assertEquals(2, rowNum);
133133
Assert.assertEquals(2, finishedQueries);
134134
resultSet.close();
135135

136136
// 4. test the expired QueryInfo was evicted
137137
Thread.sleep(61_001);
138+
sql = "SELECT * FROM current_queries";
138139
resultSet = statement.executeQuery(sql);
139140
rowNum = 0;
140141
while (resultSet.next()) {

integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,6 +1098,12 @@ public void testTreeViewTable() throws Exception {
10981098
} catch (final SQLException e) {
10991099
assertEquals("701: The system view does not support show create.", e.getMessage());
11001100
}
1101+
try {
1102+
statement.execute("show create table information_schema.tables");
1103+
fail();
1104+
} catch (final SQLException e) {
1105+
assertEquals("701: The system view does not support show create.", e.getMessage());
1106+
}
11011107
try {
11021108
statement.execute("create or replace view a () as root.b.**");
11031109
fail();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,15 @@
6969
import org.apache.iotdb.db.protocol.session.SessionManager;
7070
import org.apache.iotdb.db.queryengine.common.ConnectionInfo;
7171
import org.apache.iotdb.db.queryengine.common.QueryId;
72+
import org.apache.iotdb.db.queryengine.execution.QueryState;
7273
import org.apache.iotdb.db.queryengine.plan.Coordinator;
7374
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
7475
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowCreateViewTask;
7576
import org.apache.iotdb.db.queryengine.plan.relational.function.TableBuiltinTableFunction;
7677
import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
78+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
79+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
80+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.StringLiteral;
7781
import org.apache.iotdb.db.queryengine.plan.relational.sql.util.ReservedIdentifiers;
7882
import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlKeywords;
7983
import org.apache.iotdb.db.schemaengine.table.InformationSchemaUtils;
@@ -130,7 +134,10 @@ public class InformationSchemaContentSupplierFactory {
130134
private InformationSchemaContentSupplierFactory() {}
131135

132136
public static Iterator<TsBlock> getSupplier(
133-
final String tableName, final List<TSDataType> dataTypes, final UserEntity userEntity) {
137+
final String tableName,
138+
final List<TSDataType> dataTypes,
139+
Expression predicate,
140+
final UserEntity userEntity) {
134141
try {
135142
switch (tableName) {
136143
case InformationSchema.QUERIES:
@@ -168,7 +175,7 @@ public static Iterator<TsBlock> getSupplier(
168175
case InformationSchema.CONNECTIONS:
169176
return new ConnectionsSupplier(dataTypes, userEntity);
170177
case InformationSchema.CURRENT_QUERIES:
171-
return new CurrentQueriesSupplier(dataTypes, userEntity);
178+
return new CurrentQueriesSupplier(dataTypes, predicate, userEntity);
172179
case InformationSchema.QUERIES_COSTS_HISTOGRAM:
173180
return new QueriesCostsHistogramSupplier(dataTypes, userEntity);
174181
default:
@@ -1188,9 +1195,24 @@ private static class CurrentQueriesSupplier extends TsBlockSupplier {
11881195
private int nextConsumedIndex;
11891196
private List<Coordinator.StatedQueriesInfo> queriesInfo;
11901197

1191-
private CurrentQueriesSupplier(final List<TSDataType> dataTypes, final UserEntity userEntity) {
1198+
private CurrentQueriesSupplier(
1199+
final List<TSDataType> dataTypes, Expression predicate, final UserEntity userEntity) {
11921200
super(dataTypes);
1193-
queriesInfo = Coordinator.getInstance().getCurrentQueriesInfo();
1201+
1202+
if (predicate == null) {
1203+
queriesInfo = Coordinator.getInstance().getCurrentQueriesInfo();
1204+
} else if (QueryState.RUNNING
1205+
.toString()
1206+
.equals(((StringLiteral) ((ComparisonExpression) predicate).getRight()).getValue())) {
1207+
queriesInfo = Coordinator.getInstance().getRunningQueriesInfos();
1208+
} else if (QueryState.FINISHED
1209+
.toString()
1210+
.equals(((StringLiteral) ((ComparisonExpression) predicate).getRight()).getValue())) {
1211+
queriesInfo = Coordinator.getInstance().getFinishedQueriesInfos();
1212+
} else {
1213+
queriesInfo = Collections.emptyList();
1214+
}
1215+
11941216
try {
11951217
accessControl.checkUserGlobalSysPrivilege(userEntity);
11961218
} catch (final AccessDeniedException e) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -865,6 +865,39 @@ private void clearExpiredQueriesInfoTask() {
865865
}
866866
}
867867

868+
public List<StatedQueriesInfo> getRunningQueriesInfos() {
869+
long currentTime = System.currentTimeMillis();
870+
return getAllQueryExecutions().stream()
871+
.map(
872+
queryExecution ->
873+
new StatedQueriesInfo(
874+
QueryState.RUNNING,
875+
queryExecution.getQueryId(),
876+
queryExecution.getStartExecutionTime(),
877+
DEFAULT_END_TIME,
878+
(currentTime - queryExecution.getStartExecutionTime()) / 1000,
879+
queryExecution.getExecuteSQL().orElse("UNKNOWN"),
880+
queryExecution.getUser(),
881+
queryExecution.getClientHostname()))
882+
.collect(Collectors.toList());
883+
}
884+
885+
public List<StatedQueriesInfo> getFinishedQueriesInfos() {
886+
long currentTime = System.currentTimeMillis();
887+
List<StatedQueriesInfo> result = new ArrayList<>();
888+
Iterator<QueryInfo> historyQueriesIterator = currentQueriesInfo.iterator();
889+
long needRecordTime = currentTime - CONFIG.getQueryCostStatWindow() * 60 * 1_000L;
890+
while (historyQueriesIterator.hasNext()) {
891+
QueryInfo queryInfo = historyQueriesIterator.next();
892+
if (queryInfo.endTime < needRecordTime) {
893+
// out of time window, ignore it
894+
} else {
895+
result.add(new StatedQueriesInfo(QueryState.FINISHED, queryInfo));
896+
}
897+
}
898+
return result;
899+
}
900+
868901
public List<StatedQueriesInfo> getCurrentQueriesInfo() {
869902
List<IQueryExecution> runningQueries = getAllQueryExecutions();
870903
Set<String> runningQueryIdSet =

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1634,7 +1634,8 @@ public Statement visitSelectStatement(IoTDBSqlParser.SelectStatementContext ctx)
16341634
queryStatement.setOrderByComponent(
16351635
parseOrderByClause(
16361636
ctx.orderByClause(),
1637-
ImmutableSet.of(OrderByKey.TIME, OrderByKey.DEVICE, OrderByKey.TIMESERIES)));
1637+
ImmutableSet.of(OrderByKey.TIME, OrderByKey.DEVICE, OrderByKey.TIMESERIES),
1638+
true));
16381639
}
16391640

16401641
// parse FILL
@@ -1937,7 +1938,9 @@ private HavingCondition parseHavingClause(IoTDBSqlParser.HavingClauseContext ctx
19371938
// ---- Order By Clause
19381939
// all SortKeys should be contained by limitSet
19391940
private OrderByComponent parseOrderByClause(
1940-
IoTDBSqlParser.OrderByClauseContext ctx, ImmutableSet<String> limitSet) {
1941+
IoTDBSqlParser.OrderByClauseContext ctx,
1942+
ImmutableSet<String> limitSet,
1943+
boolean allowExpression) {
19411944
OrderByComponent orderByComponent = new OrderByComponent();
19421945
Set<String> sortKeySet = new HashSet<>();
19431946
for (IoTDBSqlParser.OrderByAttributeClauseContext orderByAttributeClauseContext :
@@ -1946,7 +1949,8 @@ private OrderByComponent parseOrderByClause(
19461949
if (orderByComponent.isUnique()) {
19471950
break;
19481951
}
1949-
SortItem sortItem = parseOrderByAttributeClause(orderByAttributeClauseContext, limitSet);
1952+
SortItem sortItem =
1953+
parseOrderByAttributeClause(orderByAttributeClauseContext, limitSet, allowExpression);
19501954

19511955
String sortKey = sortItem.getSortKey();
19521956
if (sortKeySet.contains(sortKey)) {
@@ -1965,7 +1969,9 @@ private OrderByComponent parseOrderByClause(
19651969
}
19661970

19671971
private SortItem parseOrderByAttributeClause(
1968-
IoTDBSqlParser.OrderByAttributeClauseContext ctx, ImmutableSet<String> limitSet) {
1972+
IoTDBSqlParser.OrderByAttributeClauseContext ctx,
1973+
ImmutableSet<String> limitSet,
1974+
boolean allowExpression) {
19691975
if (ctx.sortKey() != null) {
19701976
String sortKey = ctx.sortKey().getText().toUpperCase();
19711977
if (!limitSet.contains(sortKey)) {
@@ -1974,6 +1980,11 @@ private SortItem parseOrderByAttributeClause(
19741980
}
19751981
return new SortItem(sortKey, ctx.DESC() != null ? Ordering.DESC : Ordering.ASC);
19761982
} else {
1983+
if (!allowExpression) {
1984+
throw new SemanticException(
1985+
"ORDER BY expression is not supported for current statement, supported sort key: "
1986+
+ limitSet.toString());
1987+
}
19771988
Expression sortExpression = parseExpression(ctx.expression(), true);
19781989
return new SortItem(
19791990
sortExpression,
@@ -3719,7 +3730,8 @@ public Statement visitShowQueries(IoTDBSqlParser.ShowQueriesContext ctx) {
37193730
OrderByKey.QUERYID,
37203731
OrderByKey.DATANODEID,
37213732
OrderByKey.ELAPSEDTIME,
3722-
OrderByKey.STATEMENT)));
3733+
OrderByKey.STATEMENT),
3734+
false));
37233735
}
37243736

37253737
// parse LIMIT & OFFSET

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,6 +1294,7 @@ public Operator visitInformationSchemaTableScan(
12941294
getSupplier(
12951295
node.getQualifiedObjectName().getObjectName(),
12961296
dataTypes,
1297+
node.getPushDownPredicate(),
12971298
context
12981299
.getDriverContext()
12991300
.getFragmentInstanceContext()

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -567,10 +567,12 @@ private PlanNode processOneChildNode(PlanNode node, NodeGroupContext context) {
567567
PlanNode newNode = node.clone();
568568
PlanNode child = visit(node.getChildren().get(0), context);
569569
newNode.addChild(child);
570-
TRegionReplicaSet dataRegion = context.getNodeDistribution(child.getPlanNodeId()).getRegion();
570+
NodeDistribution nodeDistribution = context.getNodeDistribution(child.getPlanNodeId());
571571
context.putNodeDistribution(
572572
newNode.getPlanNodeId(),
573-
new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, dataRegion));
573+
new NodeDistribution(
574+
NodeDistributionType.SAME_WITH_ALL_CHILDREN,
575+
nodeDistribution == null ? null : nodeDistribution.getRegion()));
574576
return newNode;
575577
}
576578

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/ShowQueriesNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException {
105105
}
106106

107107
public static ShowQueriesNode deserialize(ByteBuffer byteBuffer) {
108-
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
109108
String allowedUsername = ReadWriteIOUtils.readString(byteBuffer);
109+
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
110110
return new ShowQueriesNode(planNodeId, null, allowedUsername);
111111
}
112112

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
2424
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
2525
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
26+
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
2627
import org.apache.iotdb.commons.partition.DataPartition;
2728
import org.apache.iotdb.commons.partition.SchemaPartition;
2829
import org.apache.iotdb.commons.schema.table.TsTable;
@@ -100,6 +101,7 @@
100101
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
101102
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
102103
import org.apache.iotdb.db.schemaengine.table.DataNodeTreeViewSchemaUtils;
104+
import org.apache.iotdb.rpc.TSStatusCode;
103105

104106
import com.google.common.collect.ImmutableList;
105107
import com.google.common.collect.ImmutableListMultimap;
@@ -1004,7 +1006,11 @@ public List<PlanNode> visitInformationSchemaTableScan(
10041006
List<TDataNodeLocation> dataNodeLocations =
10051007
dataNodeLocationSupplier.getDataNodeLocations(
10061008
node.getQualifiedObjectName().getObjectName());
1007-
checkArgument(!dataNodeLocations.isEmpty(), "DataNodeLocations shouldn't be empty");
1009+
if (dataNodeLocations.isEmpty()) {
1010+
throw new IoTDBRuntimeException(
1011+
"No available dataNodes, may be the cluster is closing",
1012+
TSStatusCode.NO_AVAILABLE_REPLICA.getStatusCode());
1013+
}
10081014

10091015
List<PlanNode> resultTableScanNodeList = new ArrayList<>();
10101016
dataNodeLocations.forEach(

0 commit comments

Comments
 (0)