Skip to content

Commit 6af0107

Browse files
committed
Pipe: Reduced the log of processor memory control (#16989)
* reduce * opti * fix
1 parent f9631fa commit 6af0107

File tree

2 files changed

+13
-17
lines changed

2 files changed

+13
-17
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
2828
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeReportableSubtask;
2929
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
30+
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
3031
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
3132
import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
3233
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
@@ -188,15 +189,17 @@ protected boolean executeOnce() throws Exception {
188189
}
189190
decreaseReferenceCountAndReleaseLastEvent(event, shouldReport);
190191
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
191-
LOGGER.info(
192-
"Temporarily out of memory in pipe event processing, will wait for the memory to release.",
193-
e);
192+
PipeLogger.log(
193+
LOGGER::info,
194+
e,
195+
"Temporarily out of memory in pipe event processing, will wait for the memory to release.");
194196
return false;
195197
} catch (final Exception e) {
196198
if (ExceptionUtils.getRootCause(e) instanceof PipeRuntimeOutOfMemoryCriticalException) {
197-
LOGGER.info(
198-
"Temporarily out of memory in pipe event processing, will wait for the memory to release.",
199-
e);
199+
PipeLogger.log(
200+
LOGGER::info,
201+
e,
202+
"Temporarily out of memory in pipe event processing, will wait for the memory to release.");
200203
return false;
201204
}
202205
if (!isClosed.get()) {
@@ -210,7 +213,9 @@ protected boolean executeOnce() throws Exception {
210213
ErrorHandlingUtils.getRootCause(e).getMessage()),
211214
e);
212215
} else {
213-
LOGGER.info("Exception in pipe event processing, ignored because pipe is dropped.", e);
216+
LOGGER.info(
217+
"Exception in pipe event processing, ignored because pipe is dropped.{}",
218+
e.getMessage() != null ? " Message: " + e.getMessage() : "");
214219
clearReferenceCountAndReleaseLastEvent(event);
215220
}
216221
}
@@ -282,15 +287,6 @@ public int getRegionId() {
282287
return regionId;
283288
}
284289

285-
public int getEventCount(final boolean ignoreHeartbeat) {
286-
// Avoid potential NPE in "getPipeName"
287-
final EnrichedEvent event =
288-
lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null;
289-
return Objects.nonNull(event) && !(ignoreHeartbeat && event instanceof PipeHeartbeatEvent)
290-
? 1
291-
: 0;
292-
}
293-
294290
//////////////////////////// Error report ////////////////////////////
295291

296292
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ protected boolean executeOnce() {
161161
e);
162162
} else {
163163
LOGGER.info(
164-
"Exception in pipe transfer, ignored because the connector subtask is dropped.{}",
164+
"Exception in pipe transfer, ignored because the sink subtask is dropped.{}",
165165
e.getMessage() != null ? " Message: " + e.getMessage() : "");
166166
clearReferenceCountAndReleaseLastEvent(event);
167167
}

0 commit comments

Comments
 (0)