Skip to content

Commit e0c2f5e

Browse files
Fix ThreadPoolMetrics concurrent NPE bug & Fix metric leaks when frequently creating and deleting database (#14388) (#14389)
Signed-off-by: OneSizeFitQuorum <[email protected]>
1 parent 7933ef9 commit e0c2f5e

File tree

14 files changed

+161
-131
lines changed

14 files changed

+161
-131
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {
5151
private final AtomicReference<Meter> dataRegionCommitMeter = new AtomicReference<>(null);
5252
private final AtomicReference<Meter> schemaRegionCommitMeter = new AtomicReference<>(null);
5353
private final IoTDBHistogram collectInvocationHistogram =
54-
(IoTDBHistogram) IoTDBMetricManager.getInstance().createHistogram(null);
54+
(IoTDBHistogram) IoTDBMetricManager.getInstance().createHistogram();
5555

5656
private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE;
5757
private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/TsFileMetrics.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iotdb.metrics.metricsets.IMetricSet;
2828
import org.apache.iotdb.metrics.type.Gauge;
2929
import org.apache.iotdb.metrics.utils.MetricLevel;
30+
import org.apache.iotdb.metrics.utils.MetricType;
3031

3132
import org.apache.tsfile.utils.Pair;
3233
import org.slf4j.Logger;
@@ -120,6 +121,10 @@ public void deleteRegion(String database, String regionId) {
120121
.forEach(map -> deleteRegionFromMap(map, database, regionId));
121122
Arrays.asList(seqFileSizeMap, unseqFileSizeMap)
122123
.forEach(map -> deleteRegionFromMap(map, database, regionId));
124+
Arrays.asList(SEQUENCE, UNSEQUENCE)
125+
.forEach(orderStr -> deleteGlobalTsFileCountGauge(orderStr, database, regionId));
126+
Arrays.asList(SEQUENCE, UNSEQUENCE)
127+
.forEach(orderStr -> deleteGlobalTsFileSizeGauge(orderStr, database, regionId));
123128
}
124129

125130
private <T> void deleteRegionFromMap(
@@ -199,6 +204,20 @@ public Gauge getOrCreateGlobalTsFileCountGauge(
199204
regionId);
200205
}
201206

207+
public void deleteGlobalTsFileCountGauge(String orderStr, String database, String regionId) {
208+
metricService
209+
.get()
210+
.remove(
211+
MetricType.GAUGE,
212+
FILE_GLOBAL_COUNT,
213+
Tag.NAME.toString(),
214+
orderStr,
215+
Tag.DATABASE.toString(),
216+
database,
217+
Tag.REGION.toString(),
218+
regionId);
219+
}
220+
202221
private void updateGlobalTsFileSizeMap(
203222
Map<String, Map<String, Pair<Long, Gauge>>> map,
204223
String orderStr,
@@ -246,6 +265,20 @@ public Gauge getOrCreateGlobalTsFileSizeGauge(String orderStr, String database,
246265
regionId);
247266
}
248267

268+
public void deleteGlobalTsFileSizeGauge(String orderStr, String database, String regionId) {
269+
metricService
270+
.get()
271+
.remove(
272+
MetricType.GAUGE,
273+
FILE_GLOBAL_SIZE,
274+
Tag.NAME.toString(),
275+
orderStr,
276+
Tag.DATABASE.toString(),
277+
database,
278+
Tag.REGION.toString(),
279+
regionId);
280+
}
281+
249282
// endregion
250283

251284
// region update level tsfile value map and gauge map

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,8 @@ public class DataRegion implements IDataRegionForQuery {
303303
private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
304304
PerformanceOverviewMetrics.getInstance();
305305

306+
private final DataRegionMetrics metrics;
307+
306308
/**
307309
* Construct a database processor.
308310
*
@@ -363,7 +365,8 @@ public DataRegion(
363365
recover();
364366
}
365367

366-
MetricService.getInstance().addMetricSet(new DataRegionMetrics(this));
368+
this.metrics = new DataRegionMetrics(this);
369+
MetricService.getInstance().addMetricSet(metrics);
367370
}
368371

369372
@TestOnly
@@ -373,6 +376,7 @@ public DataRegion(String databaseName, String id) {
373376
this.tsFileManager = new TsFileManager(databaseName, id, "");
374377
this.partitionMaxFileVersions = new HashMap<>();
375378
partitionMaxFileVersions.put(0L, 0L);
379+
this.metrics = new DataRegionMetrics(this);
376380
}
377381

378382
@Override
@@ -3542,6 +3546,7 @@ public void waitForDeleted() {
35423546
deletedCondition.await();
35433547
}
35443548
FileMetrics.getInstance().deleteRegion(databaseName, dataRegionId);
3549+
MetricService.getInstance().removeMetricSet(metrics);
35453550
} catch (InterruptedException e) {
35463551
logger.error("Interrupted When waiting for data region deleted.");
35473552
Thread.currentThread().interrupt();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionMetrics.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@
2929
import java.util.Objects;
3030

3131
public class DataRegionMetrics implements IMetricSet {
32-
private DataRegion dataRegion;
33-
private String storageGroupName;
32+
private final DataRegion dataRegion;
33+
private final String databaseName;
3434

3535
public DataRegionMetrics(DataRegion dataRegion) {
3636
this.dataRegion = dataRegion;
37-
this.storageGroupName = dataRegion.getDatabaseName();
37+
this.databaseName = dataRegion.getDatabaseName();
3838
}
3939

4040
@Override
@@ -45,7 +45,7 @@ public void bindTo(AbstractMetricService metricService) {
4545
dataRegion,
4646
DataRegion::getMemCost,
4747
Tag.NAME.toString(),
48-
"database_" + storageGroupName);
48+
"database_" + databaseName);
4949
}
5050

5151
@Override
@@ -54,7 +54,7 @@ public void unbindFrom(AbstractMetricService metricService) {
5454
MetricType.AUTO_GAUGE,
5555
Metric.MEM.toString(),
5656
Tag.NAME.toString(),
57-
"database_" + storageGroupName);
57+
"database_" + databaseName);
5858
}
5959

6060
@Override
@@ -67,11 +67,11 @@ public boolean equals(Object o) {
6767
}
6868
DataRegionMetrics that = (DataRegionMetrics) o;
6969
return Objects.equals(dataRegion, that.dataRegion)
70-
&& Objects.equals(storageGroupName, that.storageGroupName);
70+
&& Objects.equals(databaseName, that.databaseName);
7171
}
7272

7373
@Override
7474
public int hashCode() {
75-
return Objects.hash(dataRegion, storageGroupName);
75+
return Objects.hash(dataRegion, databaseName);
7676
}
7777
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfoMetrics.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@
2727
import org.apache.iotdb.metrics.utils.MetricType;
2828

2929
public class TsFileProcessorInfoMetrics implements IMetricSet {
30-
private final String storageGroupName;
30+
private final String databaseName;
3131
private final TsFileProcessorInfo tsFileProcessorInfo;
3232

3333
public TsFileProcessorInfoMetrics(
3434
String storageGroupName, TsFileProcessorInfo tsFileProcessorInfo) {
35-
this.storageGroupName = storageGroupName;
35+
this.databaseName = storageGroupName;
3636
this.tsFileProcessorInfo = tsFileProcessorInfo;
3737
}
3838

@@ -45,7 +45,7 @@ public void bindTo(AbstractMetricService metricService) {
4545
tsFileProcessorInfo,
4646
TsFileProcessorInfo::getMemCost,
4747
Tag.NAME.toString(),
48-
"chunkMetaData_" + storageGroupName);
48+
"chunkMetaData_" + databaseName);
4949
}
5050

5151
@Override
@@ -55,6 +55,6 @@ public void unbindFrom(AbstractMetricService metricService) {
5555
MetricType.AUTO_GAUGE,
5656
Metric.MEM.toString(),
5757
Tag.NAME.toString(),
58-
"chunkMetaData_" + storageGroupName);
58+
"chunkMetaData_" + databaseName);
5959
}
6060
}

iotdb-core/metrics/core/src/main/java/org/apache/iotdb/metrics/core/IoTDBMetricManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public Gauge createGauge() {
9393
}
9494

9595
@Override
96-
public Histogram createHistogram(MetricInfo metricInfo) {
96+
public Histogram createHistogram() {
9797
// create distributionSummary
9898
io.micrometer.core.instrument.DistributionSummary distributionSummary =
9999
new CumulativeDistributionSummary(

iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricManager.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ public Histogram getOrCreateHistogram(String name, MetricLevel metricLevel, Stri
252252
metrics.computeIfAbsent(
253253
metricInfo,
254254
key -> {
255-
Histogram histogram = createHistogram(metricInfo);
255+
Histogram histogram = createHistogram();
256256
nameToMetaInfo.put(name, metricInfo.getMetaInfo());
257257
notifyReporterOnAdd(histogram, metricInfo);
258258
return histogram;
@@ -263,12 +263,8 @@ public Histogram getOrCreateHistogram(String name, MetricLevel metricLevel, Stri
263263
throw new IllegalArgumentException(metricInfo + ALREADY_EXISTS);
264264
}
265265

266-
/**
267-
* Create histogram according to metric framework.
268-
*
269-
* @param metricInfo the metricInfo of metric
270-
*/
271-
protected abstract Histogram createHistogram(MetricInfo metricInfo);
266+
/** Create histogram according to metric framework. */
267+
protected abstract Histogram createHistogram();
272268

273269
/**
274270
* Get timer. return if exists, create if not.
@@ -466,7 +462,7 @@ protected boolean stop() {
466462

467463
protected abstract boolean stopFramework();
468464

469-
private boolean invalid(MetricLevel metricLevel, String name, String... tags) {
465+
public boolean invalid(MetricLevel metricLevel, String name, String... tags) {
470466
if (!isEnableMetricInGivenLevel(metricLevel)) {
471467
return true;
472468
}

iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/AbstractMetricService.java

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -259,29 +259,45 @@ public Timer getOrCreateTimerWithInternalReport(
259259
/** Count with internal report. */
260260
public void countWithInternalReportAsync(
261261
long delta, String metric, MetricLevel metricLevel, long time, String... tags) {
262-
internalReporter.writeMetricToIoTDB(
263-
metricManager.count(delta, metric, metricLevel, tags), metric, time, tags);
262+
if (metricManager.invalid(metricLevel, metric, tags)) {
263+
return;
264+
}
265+
Counter counter = metricManager.createCounter();
266+
counter.inc(delta);
267+
internalReporter.writeMetricToIoTDB(counter, metric, time, tags);
264268
}
265269

266270
/** Gauge value with internal report. */
267271
public void gaugeWithInternalReportAsync(
268272
long value, String metric, MetricLevel metricLevel, long time, String... tags) {
269-
internalReporter.writeMetricToIoTDB(
270-
metricManager.gauge(value, metric, metricLevel, tags), metric, time, tags);
273+
if (metricManager.invalid(metricLevel, metric, tags)) {
274+
return;
275+
}
276+
Gauge gauge = metricManager.createGauge();
277+
gauge.set(value);
278+
internalReporter.writeMetricToIoTDB(gauge, metric, time, tags);
271279
}
272280

273281
/** Rate with internal report. */
274282
public void rateWithInternalReportAsync(
275283
long value, String metric, MetricLevel metricLevel, long time, String... tags) {
276-
internalReporter.writeMetricToIoTDB(
277-
metricManager.rate(value, metric, metricLevel, tags), metric, time, tags);
284+
if (metricManager.invalid(metricLevel, metric, tags)) {
285+
return;
286+
}
287+
Rate rate = metricManager.createRate();
288+
rate.mark(value);
289+
internalReporter.writeMetricToIoTDB(rate, metric, time, tags);
278290
}
279291

280292
/** Histogram with internal report. */
281293
public void histogramWithInternalReportAsync(
282294
long value, String metric, MetricLevel metricLevel, long time, String... tags) {
283-
internalReporter.writeMetricToIoTDB(
284-
metricManager.histogram(value, metric, metricLevel, tags), metric, time, tags);
295+
if (metricManager.invalid(metricLevel, metric, tags)) {
296+
return;
297+
}
298+
Histogram histogram = metricManager.createHistogram();
299+
histogram.update(value);
300+
internalReporter.writeMetricToIoTDB(histogram, metric, time, tags);
285301
}
286302

287303
/** Timer with internal report. */
@@ -292,8 +308,12 @@ public void timerWithInternalReportAsync(
292308
MetricLevel metricLevel,
293309
long time,
294310
String... tags) {
295-
internalReporter.writeMetricToIoTDB(
296-
metricManager.timer(delta, timeUnit, metric, metricLevel, tags), metric, time, tags);
311+
if (metricManager.invalid(metricLevel, metric, tags)) {
312+
return;
313+
}
314+
Timer timer = metricManager.createTimer();
315+
timer.update(delta, timeUnit);
316+
internalReporter.writeMetricToIoTDB(timer, metric, time, tags);
297317
}
298318

299319
public List<Pair<String, String[]>> getAllMetricKeys() {

iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/impl/DoNothingMetricManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public Gauge createGauge() {
5757
}
5858

5959
@Override
60-
public Histogram createHistogram(MetricInfo metricInfo) {
60+
public Histogram createHistogram() {
6161
return DO_NOTHING_HISTOGRAM;
6262
}
6363

0 commit comments

Comments
 (0)