Skip to content

Commit e22a54f

Browse files
authored
Pipe: Fixed the potential memory shortage may lead to forever blocking (#16570)
1 parent 629860a commit e22a54f

File tree

3 files changed

+32
-2
lines changed

3 files changed

+32
-2
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
2121

22+
import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
2223
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
2324
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
2425
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
@@ -131,6 +132,8 @@ protected boolean executeOnce() {
131132
}
132133

133134
decreaseReferenceCountAndReleaseLastEvent(event, true);
135+
} catch (final PipeNonReportException e) {
136+
// Ignore, go directly next round
134137
} catch (final PipeException e) {
135138
if (!isClosed.get()) {
136139
setLastExceptionEvent(event);
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.commons.exception.pipe;
21+
22+
public class PipeNonReportException extends PipeRuntimeNonCriticalException {
23+
24+
public PipeNonReportException(final String message) {
25+
super(message);
26+
}
27+
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2323
import org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
24-
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkCriticalException;
24+
import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
2525
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
2626
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
2727
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
@@ -127,7 +127,7 @@ public void handle(
127127
LOGGER::info,
128128
"Temporary unavailable exception: will retry forever. status: %s",
129129
status);
130-
throw new PipeRuntimeSinkCriticalException(exceptionMessage);
130+
throw new PipeNonReportException(exceptionMessage);
131131
}
132132

133133
case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION

0 commit comments

Comments
 (0)