Skip to content

Commit 5176534

Browse files
authored
Batch update inserted points metric for insertMultiTablets (apache#14146)
1 parent 2dfe81f commit 5176534

File tree

6 files changed

+196
-149
lines changed

6 files changed

+196
-149
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

Lines changed: 90 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.iotdb.commons.schema.SchemaConstant;
3232
import org.apache.iotdb.commons.service.metric.MetricService;
3333
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
34+
import org.apache.iotdb.commons.service.metric.enums.Metric;
35+
import org.apache.iotdb.commons.service.metric.enums.Tag;
3436
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
3537
import org.apache.iotdb.commons.utils.TestOnly;
3638
import org.apache.iotdb.commons.utils.TimePartitionUtils;
@@ -131,6 +133,7 @@
131133
import org.apache.iotdb.db.utils.CommonUtils;
132134
import org.apache.iotdb.db.utils.DateTimeUtils;
133135
import org.apache.iotdb.db.utils.ModificationUtils;
136+
import org.apache.iotdb.metrics.utils.MetricLevel;
134137
import org.apache.iotdb.rpc.RpcUtils;
135138
import org.apache.iotdb.rpc.TSStatusCode;
136139

@@ -1158,7 +1161,7 @@ private boolean doInsert(
11581161
InsertTabletNode insertTabletNode,
11591162
Map<Long, List<int[]>[]> splitMap,
11601163
TSStatus[] results,
1161-
long[] costsForMetrics) {
1164+
long[] infoForMetrics) {
11621165
boolean noFailure = true;
11631166
for (Entry<Long, List<int[]>[]> entry : splitMap.entrySet()) {
11641167
long timePartitionId = entry.getKey();
@@ -1173,7 +1176,7 @@ private boolean doInsert(
11731176
results,
11741177
timePartitionId,
11751178
noFailure,
1176-
costsForMetrics)
1179+
infoForMetrics)
11771180
&& noFailure;
11781181
}
11791182
List<int[]> unSequenceRangeList = rangeLists[0];
@@ -1186,7 +1189,7 @@ private boolean doInsert(
11861189
results,
11871190
timePartitionId,
11881191
noFailure,
1189-
costsForMetrics)
1192+
infoForMetrics)
11901193
&& noFailure;
11911194
}
11921195
}
@@ -1213,13 +1216,14 @@ public void insertTablet(InsertTabletNode insertTabletNode)
12131216
}
12141217
TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
12151218
Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
1216-
long[] costsForMetrics = new long[4];
1217-
boolean noFailure = executeInsertTablet(insertTabletNode, results, costsForMetrics);
1218-
1219-
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
1220-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
1221-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
1222-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
1219+
long[] infoForMetrics = new long[5];
1220+
// infoForMetrics[0]: CreateMemtableBlockTimeCost
1221+
// infoForMetrics[1]: ScheduleMemoryBlockTimeCost
1222+
// infoForMetrics[2]: ScheduleWalTimeCost
1223+
// infoForMetrics[3]: ScheduleMemTableTimeCost
1224+
// infoForMetrics[4]: InsertedPointsNumber
1225+
boolean noFailure = executeInsertTablet(insertTabletNode, results, infoForMetrics);
1226+
updateTsFileProcessorMetric(insertTabletNode, infoForMetrics);
12231227

12241228
if (!noFailure) {
12251229
throw new BatchProcessException(results);
@@ -1230,7 +1234,7 @@ public void insertTablet(InsertTabletNode insertTabletNode)
12301234
}
12311235

12321236
private boolean executeInsertTablet(
1233-
InsertTabletNode insertTabletNode, TSStatus[] results, long[] costsForMetrics)
1237+
InsertTabletNode insertTabletNode, TSStatus[] results, long[] infoForMetrics)
12341238
throws OutOfTTLException {
12351239
boolean noFailure;
12361240
int loc = insertTabletNode.checkTTL(results, i -> getTTL(insertTabletNode));
@@ -1244,7 +1248,7 @@ private boolean executeInsertTablet(
12441248
split(insertTabletNode, start, end, splitInfo);
12451249
start = end;
12461250
}
1247-
noFailure = doInsert(insertTabletNode, splitInfo, results, costsForMetrics) && noFailure;
1251+
noFailure = doInsert(insertTabletNode, splitInfo, results, infoForMetrics) && noFailure;
12481252

12491253
if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
12501254
&& !insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
@@ -1289,7 +1293,7 @@ private boolean insertTabletToTsFileProcessor(
12891293
TSStatus[] results,
12901294
long timePartitionId,
12911295
boolean noFailure,
1292-
long[] costsForMetrics) {
1296+
long[] infoForMetrics) {
12931297
if (insertTabletNode.allMeasurementFailed()) {
12941298
if (logger.isDebugEnabled()) {
12951299
logger.debug(
@@ -1319,8 +1323,7 @@ private boolean insertTabletToTsFileProcessor(
13191323
registerToTsFile(insertTabletNode, tsFileProcessor);
13201324

13211325
try {
1322-
tsFileProcessor.insertTablet(
1323-
insertTabletNode, rangeList, results, noFailure, costsForMetrics);
1326+
tsFileProcessor.insertTablet(insertTabletNode, rangeList, results, noFailure, infoForMetrics);
13241327
} catch (WriteProcessRejectException e) {
13251328
logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());
13261329
return false;
@@ -1358,12 +1361,14 @@ private TsFileProcessor insertToTsFileProcessor(
13581361
if (tsFileProcessor == null || insertRowNode.allMeasurementFailed()) {
13591362
return null;
13601363
}
1361-
long[] costsForMetrics = new long[4];
1362-
tsFileProcessor.insert(insertRowNode, costsForMetrics);
1363-
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
1364-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
1365-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
1366-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
1364+
long[] infoForMetrics = new long[5];
1365+
// infoForMetrics[0]: CreateMemtableBlockTimeCost
1366+
// infoForMetrics[1]: ScheduleMemoryBlockTimeCost
1367+
// infoForMetrics[2]: ScheduleWalTimeCost
1368+
// infoForMetrics[3]: ScheduleMemTableTimeCost
1369+
// infoForMetrics[4]: InsertedPointsNumber
1370+
tsFileProcessor.insert(insertRowNode, infoForMetrics);
1371+
updateTsFileProcessorMetric(insertRowNode, infoForMetrics);
13671372
// register TableSchema (and maybe more) for table insertion
13681373
registerToTsFile(insertRowNode, tsFileProcessor);
13691374
return tsFileProcessor;
@@ -1374,8 +1379,10 @@ private void tryToUpdateInsertRowLastCache(InsertRowNode node) {
13741379
}
13751380

13761381
private List<InsertRowNode> insertToTsFileProcessors(
1377-
InsertRowsNode insertRowsNode, boolean[] areSequence, long[] timePartitionIds) {
1378-
long[] costsForMetrics = new long[4];
1382+
InsertRowsNode insertRowsNode,
1383+
boolean[] areSequence,
1384+
long[] timePartitionIds,
1385+
long[] infoForMetrics) {
13791386
Map<TsFileProcessor, InsertRowsNode> tsFileProcessorMap = new HashMap<>();
13801387
for (int i = 0; i < areSequence.length; i++) {
13811388
InsertRowNode insertRowNode = insertRowsNode.getInsertRowNodeList().get(i);
@@ -1416,7 +1423,7 @@ private List<InsertRowNode> insertToTsFileProcessors(
14161423
TsFileProcessor tsFileProcessor = entry.getKey();
14171424
InsertRowsNode subInsertRowsNode = entry.getValue();
14181425
try {
1419-
tsFileProcessor.insertRows(subInsertRowsNode, costsForMetrics);
1426+
tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
14201427
} catch (WriteProcessException e) {
14211428
insertRowsNode
14221429
.getResults()
@@ -1432,11 +1439,6 @@ private List<InsertRowNode> insertToTsFileProcessors(
14321439
fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
14331440
}
14341441
}
1435-
1436-
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
1437-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
1438-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
1439-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
14401442
return executedInsertRowNodeList;
14411443
}
14421444

@@ -3284,7 +3286,6 @@ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode)
32843286
return;
32853287
}
32863288
long ttl = getTTL(insertRowsOfOneDeviceNode);
3287-
long[] costsForMetrics = new long[4];
32883289
Map<TsFileProcessor, InsertRowsNode> tsFileProcessorMap = new HashMap<>();
32893290
for (int i = 0; i < insertRowsOfOneDeviceNode.getInsertRowNodeList().size(); i++) {
32903291
InsertRowNode insertRowNode = insertRowsOfOneDeviceNode.getInsertRowNodeList().get(i);
@@ -3348,11 +3349,17 @@ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode)
33483349
});
33493350
}
33503351
List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
3352+
long[] infoForMetrics = new long[5];
3353+
// infoForMetrics[0]: CreateMemtableBlockTimeCost
3354+
// infoForMetrics[1]: ScheduleMemoryBlockTimeCost
3355+
// infoForMetrics[2]: ScheduleWalTimeCost
3356+
// infoForMetrics[3]: ScheduleMemTableTimeCost
3357+
// infoForMetrics[4]: InsertedPointsNumber
33513358
for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : tsFileProcessorMap.entrySet()) {
33523359
TsFileProcessor tsFileProcessor = entry.getKey();
33533360
InsertRowsNode subInsertRowsNode = entry.getValue();
33543361
try {
3355-
tsFileProcessor.insertRows(subInsertRowsNode, costsForMetrics);
3362+
tsFileProcessor.insertRows(subInsertRowsNode, infoForMetrics);
33563363
} catch (WriteProcessException e) {
33573364
insertRowsOfOneDeviceNode
33583365
.getResults()
@@ -3368,10 +3375,7 @@ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode)
33683375
}
33693376
}
33703377

3371-
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
3372-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
3373-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
3374-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
3378+
updateTsFileProcessorMetric(insertRowsOfOneDeviceNode, infoForMetrics);
33753379
if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
33763380
&& !insertRowsOfOneDeviceNode.isGeneratedByRemoteConsensusLeader()) {
33773381
// disable updating last cache on follower
@@ -3438,8 +3442,15 @@ public void insert(InsertRowsNode insertRowsNode)
34383442
> lastFlushTimeMap.getFlushedTime(
34393443
timePartitionIds[i], insertRowNode.getDeviceID());
34403444
}
3445+
long[] infoForMetrics = new long[5];
3446+
// infoForMetrics[0]: CreateMemtableBlockTimeCost
3447+
// infoForMetrics[1]: ScheduleMemoryBlockTimeCost
3448+
// infoForMetrics[2]: ScheduleWalTimeCost
3449+
// infoForMetrics[3]: ScheduleMemTableTimeCost
3450+
// infoForMetrics[4]: InsertedPointsNumber
34413451
List<InsertRowNode> executedInsertRowNodeList =
3442-
insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds);
3452+
insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds, infoForMetrics);
3453+
updateTsFileProcessorMetric(insertRowsNode, infoForMetrics);
34433454

34443455
if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
34453456
&& !insertRowsNode.isGeneratedByRemoteConsensusLeader()) {
@@ -3477,14 +3488,19 @@ public void insertTablets(InsertMultiTabletsNode insertMultiTabletsNode)
34773488
insertMultiTabletsNode.getSearchIndex());
34783489
return;
34793490
}
3480-
long[] costsForMetrics = new long[4];
3491+
long[] infoForMetrics = new long[5];
3492+
// infoForMetrics[0]: CreateMemtableBlockTimeCost
3493+
// infoForMetrics[1]: ScheduleMemoryBlockTimeCost
3494+
// infoForMetrics[2]: ScheduleWalTimeCost
3495+
// infoForMetrics[3]: ScheduleMemTableTimeCost
3496+
// infoForMetrics[4]: InsertedPointsNumber
34813497
for (int i = 0; i < insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) {
34823498
InsertTabletNode insertTabletNode = insertMultiTabletsNode.getInsertTabletNodeList().get(i);
34833499
TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
34843500
Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
34853501
boolean noFailure = false;
34863502
try {
3487-
noFailure = executeInsertTablet(insertTabletNode, results, costsForMetrics);
3503+
noFailure = executeInsertTablet(insertTabletNode, results, infoForMetrics);
34883504
} catch (WriteProcessException e) {
34893505
insertMultiTabletsNode
34903506
.getResults()
@@ -3506,11 +3522,7 @@ public void insertTablets(InsertMultiTabletsNode insertMultiTabletsNode)
35063522
insertMultiTabletsNode.getResults().put(i, firstStatus);
35073523
}
35083524
}
3509-
3510-
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
3511-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
3512-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
3513-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
3525+
updateTsFileProcessorMetric(insertMultiTabletsNode, infoForMetrics);
35143526

35153527
} finally {
35163528
writeUnlock();
@@ -3521,6 +3533,41 @@ public void insertTablets(InsertMultiTabletsNode insertMultiTabletsNode)
35213533
}
35223534
}
35233535

3536+
private void updateTsFileProcessorMetric(InsertNode insertNode, long[] infoForMetrics) {
3537+
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(infoForMetrics[0]);
3538+
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(infoForMetrics[1]);
3539+
PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(infoForMetrics[2]);
3540+
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(infoForMetrics[3]);
3541+
MetricService.getInstance()
3542+
.count(
3543+
infoForMetrics[4],
3544+
Metric.QUANTITY.toString(),
3545+
MetricLevel.CORE,
3546+
Tag.NAME.toString(),
3547+
Metric.POINTS_IN.toString(),
3548+
Tag.DATABASE.toString(),
3549+
databaseName,
3550+
Tag.REGION.toString(),
3551+
dataRegionId,
3552+
Tag.TYPE.toString(),
3553+
Metric.MEMTABLE_POINT_COUNT.toString());
3554+
if (!insertNode.isGeneratedByRemoteConsensusLeader()) {
3555+
MetricService.getInstance()
3556+
.count(
3557+
infoForMetrics[4],
3558+
Metric.LEADER_QUANTITY.toString(),
3559+
MetricLevel.CORE,
3560+
Tag.NAME.toString(),
3561+
Metric.POINTS_IN.toString(),
3562+
Tag.DATABASE.toString(),
3563+
databaseName,
3564+
Tag.REGION.toString(),
3565+
dataRegionId,
3566+
Tag.TYPE.toString(),
3567+
Metric.MEMTABLE_POINT_COUNT.toString());
3568+
}
3569+
}
3570+
35243571
/**
35253572
* @return the disk space occupied by this data region, unit is MB
35263573
*/

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,12 @@
2727
import org.apache.iotdb.commons.path.NonAlignedFullPath;
2828
import org.apache.iotdb.commons.path.PartialPath;
2929
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
30-
import org.apache.iotdb.commons.service.metric.MetricService;
3130
import org.apache.iotdb.commons.service.metric.enums.Metric;
32-
import org.apache.iotdb.commons.service.metric.enums.Tag;
3331
import org.apache.iotdb.db.conf.IoTDBDescriptor;
3432
import org.apache.iotdb.db.exception.WriteProcessException;
3533
import org.apache.iotdb.db.exception.query.QueryProcessException;
3634
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
3735
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
38-
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
3936
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
4037
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
4138
import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
@@ -51,7 +48,6 @@
5148
import org.apache.iotdb.db.utils.ModificationUtils;
5249
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
5350
import org.apache.iotdb.db.utils.datastructure.TVList;
54-
import org.apache.iotdb.metrics.utils.MetricLevel;
5551

5652
import org.apache.tsfile.enums.TSDataType;
5753
import org.apache.tsfile.file.metadata.ChunkMetadata;
@@ -306,37 +302,6 @@ public int insertAlignedTablet(
306302
}
307303
}
308304

309-
public void updateMemtablePointCountMetric(InsertNode insertNode, int pointsInserted) {
310-
MetricService.getInstance()
311-
.count(
312-
pointsInserted,
313-
Metric.QUANTITY.toString(),
314-
MetricLevel.CORE,
315-
Tag.NAME.toString(),
316-
METRIC_POINT_IN,
317-
Tag.DATABASE.toString(),
318-
database,
319-
Tag.REGION.toString(),
320-
dataRegionId,
321-
Tag.TYPE.toString(),
322-
Metric.MEMTABLE_POINT_COUNT.toString());
323-
if (!insertNode.isGeneratedByRemoteConsensusLeader()) {
324-
MetricService.getInstance()
325-
.count(
326-
pointsInserted,
327-
Metric.LEADER_QUANTITY.toString(),
328-
MetricLevel.CORE,
329-
Tag.NAME.toString(),
330-
METRIC_POINT_IN,
331-
Tag.DATABASE.toString(),
332-
database,
333-
Tag.REGION.toString(),
334-
dataRegionId,
335-
Tag.TYPE.toString(),
336-
Metric.MEMTABLE_POINT_COUNT.toString());
337-
}
338-
}
339-
340305
@Override
341306
public void write(
342307
IDeviceID deviceId,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.iotdb.db.exception.WriteProcessException;
2525
import org.apache.iotdb.db.exception.query.QueryProcessException;
2626
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
27-
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
2827
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
2928
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
3029
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus;
@@ -203,6 +202,4 @@ void queryForDeviceRegionScan(
203202
void markAsNotGeneratedByPipe();
204203

205204
boolean isTotallyGeneratedByPipe();
206-
207-
void updateMemtablePointCountMetric(InsertNode insertNode, int pointsInserted);
208205
}

0 commit comments

Comments
 (0)