Skip to content

Commit 47e02c8

Browse files
authored
Pipe: Fix lost transfer time metric & Support transfer time tracking for RawTablet events converted from InsertNode/TsFile events (apache#16364)
* fix metric * fix * fix
1 parent 2ff8310 commit 47e02c8

File tree

4 files changed

+61
-0
lines changed

4 files changed

+61
-0
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ public String getDeviceId() {
162162
: null;
163163
}
164164

165+
public long getExtractTime() {
166+
return extractTime;
167+
}
168+
165169
/////////////////////////// EnrichedEvent ///////////////////////////
166170

167171
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,25 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
263263
// Actually release the occupied memory.
264264
tablet = null;
265265
eventParser = null;
266+
267+
// Update metrics of the source event
268+
if (needToReport && shouldReportOnCommit && Objects.nonNull(pipeName)) {
269+
if (sourceEvent instanceof PipeInsertNodeTabletInsertionEvent) {
270+
PipeDataNodeSinglePipeMetrics.getInstance()
271+
.updateInsertNodeTransferTimer(
272+
pipeName,
273+
creationTime,
274+
System.nanoTime()
275+
- ((PipeInsertNodeTabletInsertionEvent) sourceEvent).getExtractTime());
276+
} else if (sourceEvent instanceof PipeTsFileInsertionEvent) {
277+
PipeDataNodeSinglePipeMetrics.getInstance()
278+
.updateTsFileTransferTimer(
279+
pipeName,
280+
creationTime,
281+
System.nanoTime() - ((PipeTsFileInsertionEvent) sourceEvent).getExtractTime());
282+
}
283+
}
284+
266285
return true;
267286
}
268287

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,10 @@ public long getTimePartitionId() {
298298
return resource.getTimePartition();
299299
}
300300

301+
public long getExtractTime() {
302+
return extractTime;
303+
}
304+
301305
/////////////////////////// EnrichedEvent ///////////////////////////
302306

303307
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Map;
4040
import java.util.Objects;
4141
import java.util.concurrent.ConcurrentHashMap;
42+
import java.util.concurrent.TimeUnit;
4243

4344
public class PipeDataNodeSinglePipeMetrics implements IMetricSet {
4445

@@ -236,7 +237,24 @@ public void decreaseInsertNodeEventCount(
236237
remainingEventAndTimeOperatorMap.computeIfAbsent(
237238
pipeName + "_" + creationTime,
238239
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime));
240+
239241
operator.decreaseInsertNodeEventCount();
242+
243+
if (transferTime > 0) {
244+
operator.getInsertNodeTransferTimer().update(transferTime, TimeUnit.NANOSECONDS);
245+
}
246+
}
247+
248+
public void updateInsertNodeTransferTimer(
249+
final String pipeName, final long creationTime, final long transferTime) {
250+
if (transferTime > 0) {
251+
remainingEventAndTimeOperatorMap
252+
.computeIfAbsent(
253+
pipeName + "_" + creationTime,
254+
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime))
255+
.getInsertNodeTransferTimer()
256+
.update(transferTime, TimeUnit.NANOSECONDS);
257+
}
240258
}
241259

242260
public void increaseRawTabletEventCount(final String pipeName, final long creationTime) {
@@ -271,6 +289,22 @@ public void decreaseTsFileEventCount(
271289
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime));
272290

273291
operator.decreaseTsFileEventCount();
292+
293+
if (transferTime > 0) {
294+
operator.getTsFileTransferTimer().update(transferTime, TimeUnit.NANOSECONDS);
295+
}
296+
}
297+
298+
public void updateTsFileTransferTimer(
299+
final String pipeName, final long creationTime, final long transferTime) {
300+
if (transferTime > 0) {
301+
remainingEventAndTimeOperatorMap
302+
.computeIfAbsent(
303+
pipeName + "_" + creationTime,
304+
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime))
305+
.getTsFileTransferTimer()
306+
.update(transferTime, TimeUnit.NANOSECONDS);
307+
}
274308
}
275309

276310
public void increaseHeartbeatEventCount(final String pipeName, final long creationTime) {

0 commit comments

Comments
 (0)