diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index 31ffcef042f5..d8fbac9e44e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager; import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeReportableSubtask; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector; import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; @@ -215,15 +216,17 @@ protected boolean executeOnce() throws Exception { } decreaseReferenceCountAndReleaseLastEvent(event, shouldReport); } catch (final PipeRuntimeOutOfMemoryCriticalException e) { - LOGGER.info( - "Temporarily out of memory in pipe event processing, will wait for the memory to release.", - e); + PipeLogger.log( + LOGGER::info, + e, + "Temporarily out of memory in pipe event processing, will wait for the memory to release."); return false; } catch (final Exception e) { if (ExceptionUtils.getRootCause(e) instanceof PipeRuntimeOutOfMemoryCriticalException) { - LOGGER.info( - "Temporarily out of memory in pipe event processing, will wait for the memory to release.", - e); + PipeLogger.log( + LOGGER::info, + e, + "Temporarily out of memory in pipe event processing, will wait for the memory to release."); return false; } if (!isClosed.get()) { @@ -237,7 +240,9 @@ protected boolean executeOnce() throws Exception { ErrorHandlingUtils.getRootCause(e).getMessage()), e); } else { - LOGGER.info("Exception in pipe event processing, ignored because pipe is dropped.", e); + LOGGER.info( + "Exception in pipe event processing, ignored because pipe is dropped.{}", + e.getMessage() != null ? " Message: " + e.getMessage() : ""); clearReferenceCountAndReleaseLastEvent(event); } } @@ -309,15 +314,6 @@ public int getRegionId() { return regionId; } - public int getEventCount(final boolean ignoreHeartbeat) { - // Avoid potential NPE in "getPipeName" - final EnrichedEvent event = - lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null; - return Objects.nonNull(event) && !(ignoreHeartbeat && event instanceof PipeHeartbeatEvent) - ? 1 - : 0; - } - //////////////////////////// Error report //////////////////////////// @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index f5bafc5d8578..dba2269b281c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -161,7 +161,7 @@ protected boolean executeOnce() { e); } else { LOGGER.info( - "Exception in pipe transfer, ignored because the connector subtask is dropped.{}", + "Exception in pipe transfer, ignored because the sink subtask is dropped.{}", e.getMessage() != null ? " Message: " + e.getMessage() : ""); clearReferenceCountAndReleaseLastEvent(event); }