Skip to content

Commit acad8b6

Browse files
committed
add IT and modify according review
Signed-off-by: Weihao Li <[email protected]>
1 parent 1688403 commit acad8b6

File tree

7 files changed

+255
-29
lines changed

7 files changed

+255
-29
lines changed
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.relational.it.query.recent.informationschema;
21+
22+
import org.apache.iotdb.commons.conf.CommonDescriptor;
23+
import org.apache.iotdb.db.queryengine.execution.QueryState;
24+
import org.apache.iotdb.it.env.EnvFactory;
25+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
26+
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
27+
import org.apache.iotdb.itbase.env.BaseEnv;
28+
29+
import org.junit.AfterClass;
30+
import org.junit.Assert;
31+
import org.junit.BeforeClass;
32+
import org.junit.Test;
33+
import org.junit.experimental.categories.Category;
34+
import org.junit.runner.RunWith;
35+
36+
import java.sql.Connection;
37+
import java.sql.ResultSet;
38+
import java.sql.ResultSetMetaData;
39+
import java.sql.SQLException;
40+
import java.sql.Statement;
41+
42+
import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.END_TIME_TABLE_MODEL;
43+
import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.NUMS;
44+
import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATEMENT_TABLE_MODEL;
45+
import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATE_TABLE_MODEL;
46+
import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.USER_TABLE_MODEL;
47+
import static org.apache.iotdb.commons.schema.table.InformationSchema.getSchemaTables;
48+
import static org.apache.iotdb.db.it.utils.TestUtils.createUser;
49+
import static org.apache.iotdb.itbase.env.BaseEnv.TABLE_SQL_DIALECT;
50+
import static org.junit.Assert.fail;
51+
52+
@RunWith(IoTDBTestRunner.class)
53+
@Category({TableLocalStandaloneIT.class})
54+
// This IT will run at least 60s, so we only run it in 1C1D
55+
public class IoTDBCurrentQueriesIT {
56+
private static final int CURRENT_QUERIES_COLUMN_NUM =
57+
getSchemaTables().get("current_queries").getColumnNum();
58+
private static final int QUERIES_COSTS_HISTOGRAM_COLUMN_NUM =
59+
getSchemaTables().get("queries_costs_histogram").getColumnNum();
60+
private static final String ADMIN_NAME =
61+
CommonDescriptor.getInstance().getConfig().getDefaultAdminName();
62+
private static final String ADMIN_PWD =
63+
CommonDescriptor.getInstance().getConfig().getAdminPassword();
64+
65+
@BeforeClass
66+
public static void setUp() throws Exception {
67+
EnvFactory.getEnv().getConfig().getDataNodeConfig().setQueryCostStatWindow(1);
68+
EnvFactory.getEnv().initClusterEnvironment();
69+
createUser("test", "test123123456");
70+
}
71+
72+
@AfterClass
73+
public static void tearDown() throws Exception {
74+
EnvFactory.getEnv().cleanClusterEnvironment();
75+
}
76+
77+
@Test
78+
public void testCurrentQueries() {
79+
try {
80+
Connection connection =
81+
EnvFactory.getEnv().getConnection(ADMIN_NAME, ADMIN_PWD, BaseEnv.TABLE_SQL_DIALECT);
82+
Statement statement = connection.createStatement();
83+
statement.execute("USE information_schema");
84+
85+
// 1. query current_queries table
86+
String sql = "SELECT * FROM current_queries";
87+
ResultSet resultSet = statement.executeQuery(sql);
88+
ResultSetMetaData metaData = resultSet.getMetaData();
89+
Assert.assertEquals(CURRENT_QUERIES_COLUMN_NUM, metaData.getColumnCount());
90+
int rowNum = 0;
91+
while (resultSet.next()) {
92+
Assert.assertEquals(QueryState.RUNNING.name(), resultSet.getString(STATE_TABLE_MODEL));
93+
Assert.assertEquals(null, resultSet.getString(END_TIME_TABLE_MODEL));
94+
Assert.assertEquals(sql, resultSet.getString(STATEMENT_TABLE_MODEL));
95+
Assert.assertEquals(ADMIN_NAME, resultSet.getString(USER_TABLE_MODEL));
96+
rowNum++;
97+
}
98+
Assert.assertEquals(1, rowNum);
99+
resultSet.close();
100+
101+
// 2. query queries_costs_histogram table
102+
sql = "SELECT * FROM queries_costs_histogram";
103+
resultSet = statement.executeQuery(sql);
104+
metaData = resultSet.getMetaData();
105+
Assert.assertEquals(QUERIES_COSTS_HISTOGRAM_COLUMN_NUM, metaData.getColumnCount());
106+
rowNum = 0;
107+
int queriesCount = 0;
108+
while (resultSet.next()) {
109+
int nums = resultSet.getInt(NUMS);
110+
if (nums > 0) {
111+
queriesCount++;
112+
}
113+
rowNum++;
114+
}
115+
Assert.assertEquals(1, queriesCount);
116+
Assert.assertEquals(61, rowNum);
117+
118+
// 3. requery current_queries table
119+
sql = "SELECT * FROM current_queries";
120+
resultSet = statement.executeQuery(sql);
121+
metaData = resultSet.getMetaData();
122+
Assert.assertEquals(CURRENT_QUERIES_COLUMN_NUM, metaData.getColumnCount());
123+
rowNum = 0;
124+
int finishedQueries = 0;
125+
while (resultSet.next()) {
126+
if (QueryState.FINISHED.name().equals(resultSet.getString(STATE_TABLE_MODEL))) {
127+
finishedQueries++;
128+
}
129+
rowNum++;
130+
}
131+
// three rows in the result, 2 FINISHED and 1 RUNNING
132+
Assert.assertEquals(3, rowNum);
133+
Assert.assertEquals(2, finishedQueries);
134+
resultSet.close();
135+
136+
// 4. test the expired QueryInfo was evicted
137+
Thread.sleep(61_001);
138+
resultSet = statement.executeQuery(sql);
139+
rowNum = 0;
140+
while (resultSet.next()) {
141+
rowNum++;
142+
}
143+
// one row in the result, current query
144+
Assert.assertEquals(1, rowNum);
145+
resultSet.close();
146+
147+
sql = "SELECT * FROM queries_costs_histogram";
148+
resultSet = statement.executeQuery(sql);
149+
queriesCount = 0;
150+
while (resultSet.next()) {
151+
int nums = resultSet.getInt(NUMS);
152+
if (nums > 0) {
153+
queriesCount++;
154+
}
155+
}
156+
// the last current_queries table query was recorded, others are evicted
157+
Assert.assertEquals(1, queriesCount);
158+
} catch (Exception e) {
159+
fail(e.getMessage());
160+
}
161+
162+
// 5. test privilege
163+
testPrivilege();
164+
}
165+
166+
private void testPrivilege() {
167+
// 1. test current_queries table
168+
try (Connection connection =
169+
EnvFactory.getEnv().getConnection("test", "test123123456", TABLE_SQL_DIALECT);
170+
Statement statement = connection.createStatement()) {
171+
String sql = "SELECT * FROM information_schema.current_queries";
172+
173+
// another user executes a query
174+
try (Connection connection2 =
175+
EnvFactory.getEnv().getConnection(ADMIN_NAME, ADMIN_PWD, BaseEnv.TABLE_SQL_DIALECT)) {
176+
ResultSet resultSet = connection2.createStatement().executeQuery(sql);
177+
resultSet.close();
178+
} catch (Exception e) {
179+
fail(e.getMessage());
180+
}
181+
182+
// current user query current_queries table
183+
ResultSet resultSet = statement.executeQuery(sql);
184+
int rowNum = 0;
185+
while (resultSet.next()) {
186+
rowNum++;
187+
}
188+
// only current query in the result
189+
Assert.assertEquals(1, rowNum);
190+
} catch (SQLException e) {
191+
fail(e.getMessage());
192+
}
193+
194+
// 2. test queries_costs_histogram table
195+
try (Connection connection =
196+
EnvFactory.getEnv().getConnection("test", "test123123456", TABLE_SQL_DIALECT);
197+
Statement statement = connection.createStatement()) {
198+
statement.executeQuery("SELECT * FROM information_schema.queries_costs_histogram");
199+
} catch (SQLException e) {
200+
Assert.assertEquals(
201+
"803: Access Denied: No permissions for this operation, please add privilege SYSTEM",
202+
e.getMessage());
203+
}
204+
}
205+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,15 +230,16 @@ public Response executeFastLastQueryStatement(
230230
} else {
231231
IClientSession clientSession = SESSION_MANAGER.getCurrSession();
232232

233+
Supplier<String> contentOfQuerySupplier = new FastLastQueryContentSupplier(prefixPathList);
233234
COORDINATOR.recordCurrentQueries(
234235
null,
235236
startTime / 1_000_000,
236237
endTime / 1_000_000,
237238
costTime,
238-
restFastLastQueryReq(prefixPathList),
239+
contentOfQuerySupplier,
239240
clientSession.getUsername(),
240241
clientSession.getClientAddress());
241-
recordQueries(() -> costTime, new FastLastQueryContentSupplier(prefixPathList), t);
242+
recordQueries(() -> costTime, contentOfQuerySupplier, t);
242243
}
243244
}
244245
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1063,7 +1063,7 @@ public TSExecuteStatementResp executeFastLastDataQueryForOnePrefixPath(
10631063
startTime / 1_000_000,
10641064
endTime / 1_000_000,
10651065
costTime,
1066-
statement,
1066+
() -> statement,
10671067
clientSession.getUsername(),
10681068
clientSession.getClientAddress());
10691069
recordQueries(() -> costTime, () -> statement, null);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.queryengine.common;
2121

22+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2223
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
2324

2425
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -37,6 +38,8 @@ public class QueryId {
3738

3839
public static final QueryId MOCK_QUERY_ID = QueryId.valueOf("mock_query_id");
3940

41+
private static final int DATANODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
42+
4043
private final String id;
4144

4245
private int nextPlanNodeIndex;
@@ -67,13 +70,8 @@ public String getId() {
6770
return id;
6871
}
6972

70-
public int getDataNodeId() {
71-
return getDataNodeId(id);
72-
}
73-
74-
public static int getDataNodeId(String queryId) {
75-
String[] splits = queryId.split("_");
76-
return Integer.parseInt(splits[splits.length - 1]);
73+
public static int getDataNodeId() {
74+
return DATANODE_ID;
7775
}
7876

7977
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ protected void constructLine() {
210210
columnBuilders[1].writeLong(
211211
TimestampPrecisionUtils.convertToCurrPrecision(
212212
queryExecution.getStartExecutionTime(), TimeUnit.MILLISECONDS));
213-
columnBuilders[2].writeInt(QueryId.getDataNodeId(queryExecution.getQueryId()));
213+
columnBuilders[2].writeInt(QueryId.getDataNodeId());
214214
columnBuilders[3].writeFloat(
215215
(float) (currTime - queryExecution.getStartExecutionTime()) / 1000);
216216
columnBuilders[4].writeBinary(
@@ -1185,7 +1185,7 @@ public boolean hasNext() {
11851185
}
11861186

11871187
private static class CurrentQueriesSupplier extends TsBlockSupplier {
1188-
protected int nextConsumedIndex;
1188+
private int nextConsumedIndex;
11891189
private List<Coordinator.StatedQueriesInfo> queriesInfo;
11901190

11911191
private CurrentQueriesSupplier(final List<TSDataType> dataTypes, final UserEntity userEntity) {
@@ -1216,7 +1216,7 @@ protected void constructLine() {
12161216
TimestampPrecisionUtils.convertToCurrPrecision(
12171217
queryInfo.getEndTime(), TimeUnit.MILLISECONDS));
12181218
}
1219-
columnBuilders[4].writeInt(QueryId.getDataNodeId(queryInfo.getQueryId()));
1219+
columnBuilders[4].writeInt(QueryId.getDataNodeId());
12201220
columnBuilders[5].writeFloat(queryInfo.getCostTime());
12211221
columnBuilders[6].writeBinary(BytesUtils.valueOf(queryInfo.getStatement()));
12221222
columnBuilders[7].writeBinary(BytesUtils.valueOf(queryInfo.getUser()));
@@ -1232,7 +1232,7 @@ public boolean hasNext() {
12321232
}
12331233

12341234
private static class QueriesCostsHistogramSupplier extends TsBlockSupplier {
1235-
protected int nextConsumedIndex;
1235+
private int nextConsumedIndex;
12361236
private static final Binary[] BUCKETS =
12371237
new Binary[] {
12381238
BytesUtils.valueOf("[0,1)"),
@@ -1297,7 +1297,7 @@ private static class QueriesCostsHistogramSupplier extends TsBlockSupplier {
12971297
BytesUtils.valueOf("[59,60)"),
12981298
BytesUtils.valueOf("60+")
12991299
};
1300-
private int[] currentQueriesCostHistogram;
1300+
private final int[] currentQueriesCostHistogram;
13011301

13021302
private QueriesCostsHistogramSupplier(
13031303
final List<TSDataType> dataTypes, final UserEntity userEntity) {

0 commit comments

Comments
 (0)