Skip to content

Commit 0ec2dbd

Browse files
authored
Pipe: Reduced the log of processor memory control (#16989)
* reduce * opti * fix
1 parent 99ee13d commit 0ec2dbd

File tree

2 files changed

+13
-17
lines changed

2 files changed

+13
-17
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
2929
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeReportableSubtask;
3030
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
31+
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
3132
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
3233
import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
3334
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
@@ -215,15 +216,17 @@ protected boolean executeOnce() throws Exception {
215216
}
216217
decreaseReferenceCountAndReleaseLastEvent(event, shouldReport);
217218
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
218-
LOGGER.info(
219-
"Temporarily out of memory in pipe event processing, will wait for the memory to release.",
220-
e);
219+
PipeLogger.log(
220+
LOGGER::info,
221+
e,
222+
"Temporarily out of memory in pipe event processing, will wait for the memory to release.");
221223
return false;
222224
} catch (final Exception e) {
223225
if (ExceptionUtils.getRootCause(e) instanceof PipeRuntimeOutOfMemoryCriticalException) {
224-
LOGGER.info(
225-
"Temporarily out of memory in pipe event processing, will wait for the memory to release.",
226-
e);
226+
PipeLogger.log(
227+
LOGGER::info,
228+
e,
229+
"Temporarily out of memory in pipe event processing, will wait for the memory to release.");
227230
return false;
228231
}
229232
if (!isClosed.get()) {
@@ -237,7 +240,9 @@ protected boolean executeOnce() throws Exception {
237240
ErrorHandlingUtils.getRootCause(e).getMessage()),
238241
e);
239242
} else {
240-
LOGGER.info("Exception in pipe event processing, ignored because pipe is dropped.", e);
243+
LOGGER.info(
244+
"Exception in pipe event processing, ignored because pipe is dropped.{}",
245+
e.getMessage() != null ? " Message: " + e.getMessage() : "");
241246
clearReferenceCountAndReleaseLastEvent(event);
242247
}
243248
}
@@ -309,15 +314,6 @@ public int getRegionId() {
309314
return regionId;
310315
}
311316

312-
public int getEventCount(final boolean ignoreHeartbeat) {
313-
// Avoid potential NPE in "getPipeName"
314-
final EnrichedEvent event =
315-
lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null;
316-
return Objects.nonNull(event) && !(ignoreHeartbeat && event instanceof PipeHeartbeatEvent)
317-
? 1
318-
: 0;
319-
}
320-
321317
//////////////////////////// Error report ////////////////////////////
322318

323319
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ protected boolean executeOnce() {
161161
e);
162162
} else {
163163
LOGGER.info(
164-
"Exception in pipe transfer, ignored because the connector subtask is dropped.{}",
164+
"Exception in pipe transfer, ignored because the sink subtask is dropped.{}",
165165
e.getMessage() != null ? " Message: " + e.getMessage() : "");
166166
clearReferenceCountAndReleaseLastEvent(event);
167167
}

0 commit comments

Comments
 (0)