Skip to content

Commit f616e34

Browse files
authored
Pipe: Exclude the tsFiles / insertNodes from transfer time metric which have not be sent (#16015)
1 parent 376162a commit f616e34

File tree

4 files changed

+19
-9
lines changed

4 files changed

+19
-9
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,10 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
211211
PipeDataNodeAgent.task()
212212
.decreaseFloatingMemoryUsageInByte(pipeName, creationTime, ramBytesUsed());
213213
PipeDataNodeSinglePipeMetrics.getInstance()
214-
.decreaseInsertNodeEventCount(pipeName, creationTime, System.nanoTime() - extractTime);
214+
.decreaseInsertNodeEventCount(
215+
pipeName,
216+
creationTime,
217+
shouldReportOnCommit ? System.nanoTime() - extractTime : -1);
215218
}
216219
insertNode = null;
217220
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,10 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
337337
} finally {
338338
if (Objects.nonNull(pipeName)) {
339339
PipeDataNodeSinglePipeMetrics.getInstance()
340-
.decreaseTsFileEventCount(pipeName, creationTime, System.nanoTime() - extractTime);
340+
.decreaseTsFileEventCount(
341+
pipeName,
342+
creationTime,
343+
shouldReportOnCommit ? System.nanoTime() - extractTime : -1);
341344
}
342345
}
343346
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -269,8 +269,10 @@ public void decreaseInsertNodeEventCount(
269269
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime));
270270
operator.decreaseInsertNodeEventCount();
271271

272-
operator.getInsertNodeTransferTimer().update(transferTime, TimeUnit.NANOSECONDS);
273-
PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
272+
if (transferTime > 0) {
273+
operator.getInsertNodeTransferTimer().update(transferTime, TimeUnit.NANOSECONDS);
274+
PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
275+
}
274276
}
275277

276278
public void increaseRawTabletEventCount(final String pipeName, final long creationTime) {
@@ -305,8 +307,10 @@ public void decreaseTsFileEventCount(
305307
k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime));
306308

307309
operator.decreaseTsFileEventCount();
308-
operator.getTsFileTransferTimer().update(transferTime, TimeUnit.NANOSECONDS);
309-
PIPE_DATANODE_TSFILE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
310+
if (transferTime > 0) {
311+
operator.getTsFileTransferTimer().update(transferTime, TimeUnit.NANOSECONDS);
312+
PIPE_DATANODE_TSFILE_TRANSFER_TIME_HISTOGRAM.update(transferTime);
313+
}
310314
}
311315

312316
public void increaseHeartbeatEventCount(final String pipeName, final long creationTime) {

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,16 +202,16 @@ public synchronized boolean decreaseReferenceCount(
202202
}
203203

204204
if (referenceCount.get() == 1) {
205+
if (!shouldReport) {
206+
shouldReportOnCommit = false;
207+
}
205208
// We assume that this function will not throw any exceptions.
206209
if (!internallyDecreaseResourceReferenceCount(holderMessage)) {
207210
LOGGER.warn(
208211
"resource reference count is decreased to 0, but failed to release the resource, EnrichedEvent: {}, stack trace: {}",
209212
coreReportMessage(),
210213
Thread.currentThread().getStackTrace());
211214
}
212-
if (!shouldReport) {
213-
shouldReportOnCommit = false;
214-
}
215215
PipeEventCommitManager.getInstance().commit(this, committerKey);
216216
}
217217

0 commit comments

Comments
 (0)