Skip to content

Commit 8b18a87

Browse files
authored
Pipe: Unbinded the default meter for remaining insertion event count for hybrid degrading & Changed some default values of related properties (apache#15615)
1 parent db7729a commit 8b18a87

File tree

6 files changed

+45
-21
lines changed

6 files changed

+45
-21
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/overview/PipeConfigNodeRemainingTimeOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package org.apache.iotdb.confignode.manager.pipe.metric.overview;
2121

22-
import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
22+
import org.apache.iotdb.commons.enums.PipeRateAverage;
2323
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2424
import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
2525
import org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeSubtask;
@@ -56,7 +56,7 @@ class PipeConfigNodeRemainingTimeOperator extends PipeRemainingOperator {
5656
* @return The estimated remaining time
5757
*/
5858
double getRemainingTime() {
59-
final PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime =
59+
final PipeRateAverage pipeRemainingTimeCommitRateAverageTime =
6060
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();
6161

6262
// Do not calculate heartbeat event

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package org.apache.iotdb.db.pipe.metric.overview;
2121

22-
import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
22+
import org.apache.iotdb.commons.enums.PipeRateAverage;
2323
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2424
import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
2525
import org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
@@ -106,7 +106,7 @@ void decreaseHeartbeatEventCount() {
106106
lastInsertNodeEventCountSmoothingTime = System.currentTimeMillis();
107107
}
108108
return PipeConfig.getInstance()
109-
.getPipeRemainingTimeCommitRateAverageTime()
109+
.getPipeRemainingInsertNodeCountAverage()
110110
.getMeterRate(insertNodeEventCountMeter);
111111
}
112112

@@ -135,7 +135,7 @@ long getRemainingEvents() {
135135
* @return The estimated remaining time
136136
*/
137137
double getRemainingTime() {
138-
final PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime =
138+
final PipeRateAverage pipeRemainingTimeCommitRateAverageTime =
139139
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();
140140

141141
final double invocationValue = collectInvocationHistogram.getMean();

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
2424
import org.apache.iotdb.commons.cluster.NodeStatus;
2525
import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
26-
import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
26+
import org.apache.iotdb.commons.enums.PipeRateAverage;
2727
import org.apache.iotdb.commons.utils.FileUtils;
2828
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
2929
import org.apache.iotdb.rpc.RpcUtils;
@@ -280,7 +280,7 @@ public class CommonConfig {
280280
private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
281281
private int pipeMaxAllowedRemainingInsertEventCountPerPipe = 10000;
282282
private int pipeMaxAllowedTotalRemainingInsertEventCount = 50000;
283-
private int pipeRemainingEventCountSmoothingIntervalSeconds = 15;
283+
private int pipeRemainingEventCountSmoothingIntervalSeconds = 10;
284284

285285
private int pipeMetaReportMaxLogNumPerRound = 10;
286286
private int pipeMetaReportMaxLogIntervalRounds = 36;
@@ -301,8 +301,8 @@ public class CommonConfig {
301301
private long pipeListeningQueueTransferSnapshotThreshold = 1000;
302302
private int pipeSnapshotExecutionMaxBatchSize = 1000;
303303
private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
304-
private PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime =
305-
PipeRemainingTimeRateAverageTime.MEAN;
304+
private PipeRateAverage pipeRemainingTimeCommitRateAverageTime = PipeRateAverage.FIVE_MINUTES;
305+
private PipeRateAverage pipeRemainingInsertNodeCountAverage = PipeRateAverage.ONE_MINUTE;
306306
private double pipeTsFileScanParsingThreshold = 0.05;
307307
private double pipeDynamicMemoryHistoryWeight = 0.5;
308308
private double pipeDynamicMemoryAdjustmentThreshold = 0.05;
@@ -1831,12 +1831,12 @@ public void setPipeRemainingTimeCommitRateAutoSwitchSeconds(
18311831
pipeRemainingTimeCommitRateAutoSwitchSeconds);
18321832
}
18331833

1834-
public PipeRemainingTimeRateAverageTime getPipeRemainingTimeCommitRateAverageTime() {
1834+
public PipeRateAverage getPipeRemainingTimeCommitRateAverageTime() {
18351835
return pipeRemainingTimeCommitRateAverageTime;
18361836
}
18371837

18381838
public void setPipeRemainingTimeCommitRateAverageTime(
1839-
PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime) {
1839+
PipeRateAverage pipeRemainingTimeCommitRateAverageTime) {
18401840
if (Objects.equals(
18411841
this.pipeRemainingTimeCommitRateAverageTime, pipeRemainingTimeCommitRateAverageTime)) {
18421842
return;
@@ -1847,6 +1847,21 @@ public void setPipeRemainingTimeCommitRateAverageTime(
18471847
pipeRemainingTimeCommitRateAverageTime);
18481848
}
18491849

1850+
public PipeRateAverage getPipeRemainingInsertNodeCountAverage() {
1851+
return pipeRemainingInsertNodeCountAverage;
1852+
}
1853+
1854+
public void setPipeRemainingInsertNodeCountAverage(
1855+
PipeRateAverage pipeRemainingInsertNodeCountAverage) {
1856+
if (Objects.equals(
1857+
this.pipeRemainingInsertNodeCountAverage, pipeRemainingInsertNodeCountAverage)) {
1858+
return;
1859+
}
1860+
this.pipeRemainingInsertNodeCountAverage = pipeRemainingInsertNodeCountAverage;
1861+
logger.info(
1862+
"pipeRemainingInsertEventCountAverage is set to {}", pipeRemainingInsertNodeCountAverage);
1863+
}
1864+
18501865
public double getPipeTsFileScanParsingThreshold() {
18511866
return pipeTsFileScanParsingThreshold;
18521867
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRemainingTimeRateAverageTime.java renamed to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@
1919

2020
package org.apache.iotdb.commons.enums;
2121

22-
import org.apache.iotdb.commons.pipe.config.PipeConfig;
23-
2422
import com.codahale.metrics.Meter;
2523

26-
public enum PipeRemainingTimeRateAverageTime {
24+
public enum PipeRateAverage {
2725
ONE_MINUTE,
2826
FIVE_MINUTES,
2927
FIFTEEN_MINUTES,
@@ -41,9 +39,7 @@ public double getMeterRate(final Meter meter) {
4139
return meter.getMeanRate();
4240
default:
4341
throw new UnsupportedOperationException(
44-
String.format(
45-
"The type %s is not supported in average time of pipe remaining time.",
46-
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime()));
42+
String.format("The type %s is not supported in pipe rate average.", this));
4743
}
4844
}
4945
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import org.apache.iotdb.commons.conf.CommonConfig;
2323
import org.apache.iotdb.commons.conf.CommonDescriptor;
24-
import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
24+
import org.apache.iotdb.commons.enums.PipeRateAverage;
2525

2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
@@ -223,10 +223,14 @@ public long getPipeRemainingTimeCommitAutoSwitchSeconds() {
223223
return COMMON_CONFIG.getPipeRemainingTimeCommitRateAutoSwitchSeconds();
224224
}
225225

226-
public PipeRemainingTimeRateAverageTime getPipeRemainingTimeCommitRateAverageTime() {
226+
public PipeRateAverage getPipeRemainingTimeCommitRateAverageTime() {
227227
return COMMON_CONFIG.getPipeRemainingTimeCommitRateAverageTime();
228228
}
229229

230+
public PipeRateAverage getPipeRemainingInsertNodeCountAverage() {
231+
return COMMON_CONFIG.getPipeRemainingInsertNodeCountAverage();
232+
}
233+
230234
public double getPipeTsFileScanParsingThreshold() {
231235
return COMMON_CONFIG.getPipeTsFileScanParsingThreshold();
232236
}
@@ -513,6 +517,8 @@ public void printAllConfigs() {
513517
getPipeRemainingTimeCommitAutoSwitchSeconds());
514518
LOGGER.info(
515519
"PipeRemainingTimeCommitRateAverageTime: {}", getPipeRemainingTimeCommitRateAverageTime());
520+
LOGGER.info(
521+
"PipePipeRemainingInsertEventCountAverage: {}", getPipeRemainingInsertNodeCountAverage());
516522
LOGGER.info("PipeTsFileScanParsingThreshold(): {}", getPipeTsFileScanParsingThreshold());
517523

518524
LOGGER.info("PipeDynamicMemoryHistoryWeight: {}", getPipeDynamicMemoryHistoryWeight());

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.iotdb.commons.conf.CommonConfig;
2323
import org.apache.iotdb.commons.conf.ConfigurationFileUtils;
2424
import org.apache.iotdb.commons.conf.TrimProperties;
25-
import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
25+
import org.apache.iotdb.commons.enums.PipeRateAverage;
2626

2727
import java.io.IOException;
2828
import java.util.Optional;
@@ -184,12 +184,19 @@ public static void loadPipeStaticConfig(CommonConfig config, TrimProperties prop
184184
"pipe_snapshot_execution_max_batch_size",
185185
String.valueOf(config.getPipeSnapshotExecutionMaxBatchSize()))));
186186
config.setPipeRemainingTimeCommitRateAverageTime(
187-
PipeRemainingTimeRateAverageTime.valueOf(
187+
PipeRateAverage.valueOf(
188188
properties
189189
.getProperty(
190190
"pipe_remaining_time_commit_rate_average_time",
191191
String.valueOf(config.getPipeRemainingTimeCommitRateAverageTime()))
192192
.trim()));
193+
config.setPipeRemainingInsertNodeCountAverage(
194+
PipeRateAverage.valueOf(
195+
properties
196+
.getProperty(
197+
"pipe_remaining_insert_node_count_average",
198+
String.valueOf(config.getPipeRemainingInsertNodeCountAverage()))
199+
.trim()));
193200
}
194201

195202
public static void loadPipeInternalConfig(CommonConfig config, TrimProperties properties)

0 commit comments

Comments
 (0)