Skip to content

Commit 4c21331

Browse files
authored
Sampling queries in each DN
1 parent 8144e5c commit 4c21331

File tree

30 files changed

+248
-53
lines changed

30 files changed

+248
-53
lines changed

iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,21 @@
177177
<level>INFO</level>
178178
</filter>
179179
</appender>
180+
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="SAMPLED_QUERIES">
181+
<file>${IOTDB_HOME}/logs/log_datanode_sampled_queries.log</file>
182+
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
183+
<fileNamePattern>${IOTDB_HOME}/logs/log-datanode-sampled-queries-%d{yyyyMMdd}.log.gz</fileNamePattern>
184+
<maxHistory>30</maxHistory>
185+
</rollingPolicy>
186+
<append>true</append>
187+
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
188+
<pattern>%d %m %n</pattern>
189+
<charset>utf-8</charset>
190+
</encoder>
191+
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
192+
<level>INFO</level>
193+
</filter>
194+
</appender>
180195
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="COMPACTION">
181196
<file>${IOTDB_HOME}/logs/log_datanode_compaction.log</file>
182197
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
@@ -245,6 +260,9 @@
245260
<logger level="info" name="SLOW_SQL">
246261
<appender-ref ref="SLOW_SQL"/>
247262
</logger>
263+
<logger level="info" name="SAMPLED_QUERIES">
264+
<appender-ref ref="SAMPLED_QUERIES"/>
265+
</logger>
248266
<logger level="info" name="QUERY_FREQUENCY">
249267
<appender-ref ref="QUERY_FREQUENCY"/>
250268
</logger>

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2882,6 +2882,28 @@ public synchronized void loadHotModifiedProps(TrimProperties properties)
28822882
BinaryAllocator.getInstance().close(true);
28832883
}
28842884

2885+
// update query_sample_throughput_bytes_per_sec
2886+
String querySamplingRateLimitNumber =
2887+
Optional.ofNullable(
2888+
properties.getProperty(
2889+
"query_sample_throughput_bytes_per_sec",
2890+
ConfigurationFileUtils.getConfigurationDefaultValue(
2891+
"query_sample_throughput_bytes_per_sec")))
2892+
.map(String::trim)
2893+
.orElse(
2894+
ConfigurationFileUtils.getConfigurationDefaultValue(
2895+
"query_sample_throughput_bytes_per_sec"));
2896+
if (querySamplingRateLimitNumber != null) {
2897+
try {
2898+
int rateLimit = Integer.parseInt(querySamplingRateLimitNumber);
2899+
commonDescriptor.getConfig().setQuerySamplingRateLimit(rateLimit);
2900+
} catch (Exception e) {
2901+
LOGGER.warn(
2902+
"Failed to parse query_sample_throughput_bytes_per_sec {} to integer",
2903+
querySamplingRateLimitNumber);
2904+
}
2905+
}
2906+
28852907
// update trusted_uri_pattern
28862908
String trustedUriPattern =
28872909
Optional.ofNullable(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ private boolean registerDatabase(
145145
"",
146146
partitionFetcher,
147147
schemaFetcher,
148-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
148+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
149+
false);
149150
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
150151
&& result.status.code != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()
151152
&& result.status.code != TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/DeletionLoader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public void load() throws PipeException {
6868
"",
6969
PARTITION_FETCHER,
7070
SCHEMA_FETCHER,
71-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
71+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
72+
false);
7273
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
7374
LOGGER.error("Delete {} error, statement: {}.", deletion, statement);
7475
LOGGER.error("Delete result status : {}.", result.status);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public void load() {
7070
"",
7171
PARTITION_FETCHER,
7272
SCHEMA_FETCHER,
73-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
73+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
74+
false);
7475
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
7576
LOGGER.error("Load TsFile {} error, statement: {}.", tsFile.getPath(), statement);
7677
LOGGER.error("Load TsFile result status : {}.", result.status);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -888,7 +888,8 @@ private TSStatus executeStatementForTreeModel(final Statement statement) {
888888
"",
889889
ClusterPartitionFetcher.getInstance(),
890890
ClusterSchemaFetcher.getInstance(),
891-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
891+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
892+
false)
892893
.status;
893894
}
894895

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ public void onPublish(InterceptPublishMessage msg) {
169169
"",
170170
partitionFetcher,
171171
schemaFetcher,
172-
config.getQueryTimeoutThreshold());
172+
config.getQueryTimeoutThreshold(),
173+
false);
173174
tsStatus = result.status;
174175
}
175176
} catch (Exception e) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ public Response variables(SQL sql, SecurityContext securityContext) {
117117
sql.getSql(),
118118
partitionFetcher,
119119
schemaFetcher,
120-
config.getQueryTimeoutThreshold());
120+
config.getQueryTimeoutThreshold(),
121+
true);
121122
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
122123
&& result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
123124
return Response.ok()
@@ -184,7 +185,8 @@ public Response expression(ExpressionRequest expressionRequest, SecurityContext
184185
sql,
185186
partitionFetcher,
186187
schemaFetcher,
187-
config.getQueryTimeoutThreshold());
188+
config.getQueryTimeoutThreshold(),
189+
true);
188190
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
189191
&& result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
190192
return Response.ok()
@@ -247,7 +249,8 @@ public Response node(List<String> requestBody, SecurityContext securityContext)
247249
sql,
248250
partitionFetcher,
249251
schemaFetcher,
250-
config.getQueryTimeoutThreshold());
252+
config.getQueryTimeoutThreshold(),
253+
true);
251254
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
252255
&& result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
253256
return Response.ok()

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ public Response executeNonQueryStatement(SQL sql, SecurityContext securityContex
115115
sql.getSql(),
116116
partitionFetcher,
117117
schemaFetcher,
118-
config.getQueryTimeoutThreshold());
118+
config.getQueryTimeoutThreshold(),
119+
false);
119120
finish = true;
120121
return Response.ok()
121122
.entity(
@@ -190,7 +191,8 @@ public Response executeQueryStatement(SQL sql, SecurityContext securityContext)
190191
sql.getSql(),
191192
partitionFetcher,
192193
schemaFetcher,
193-
config.getQueryTimeoutThreshold());
194+
config.getQueryTimeoutThreshold(),
195+
true);
194196
finish = true;
195197
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
196198
&& result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
@@ -266,7 +268,8 @@ public Response insertTablet(
266268
"",
267269
partitionFetcher,
268270
schemaFetcher,
269-
config.getQueryTimeoutThreshold());
271+
config.getQueryTimeoutThreshold(),
272+
false);
270273

271274
return Response.ok()
272275
.entity(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ public Response variables(SQL sql, SecurityContext securityContext) {
117117
sql.getSql(),
118118
partitionFetcher,
119119
schemaFetcher,
120-
config.getQueryTimeoutThreshold());
120+
config.getQueryTimeoutThreshold(),
121+
true);
121122
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
122123
&& result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
123124
return Response.ok()
@@ -184,7 +185,8 @@ public Response expression(ExpressionRequest expressionRequest, SecurityContext
184185
sql,
185186
partitionFetcher,
186187
schemaFetcher,
187-
config.getQueryTimeoutThreshold());
188+
config.getQueryTimeoutThreshold(),
189+
true);
188190
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
189191
&& result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
190192
return Response.ok()
@@ -247,7 +249,8 @@ public Response node(List<String> requestBody, SecurityContext securityContext)
247249
sql,
248250
partitionFetcher,
249251
schemaFetcher,
250-
config.getQueryTimeoutThreshold());
252+
config.getQueryTimeoutThreshold(),
253+
true);
251254
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
252255
&& result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
253256
return Response.ok()

0 commit comments

Comments
 (0)