Skip to content

Commit ff98e83

Browse files
authored
Pipe: Add Thrift callback processing logic for ClosedChannelException (apache#16421)
* Add Thrift callback processing logic for ClosedChannelException * update * fix * fix
1 parent e60408c commit ff98e83

File tree

3 files changed

+10
-1
lines changed

3 files changed

+10
-1
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2323
import org.apache.iotdb.commons.client.ClientPoolFactory;
2424
import org.apache.iotdb.commons.client.IClientManager;
25+
import org.apache.iotdb.commons.client.ThriftClient;
2526
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
2627
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
2728
import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -259,6 +260,7 @@ public void onComplete(final TPipeTransferResp response) {
259260

260261
@Override
261262
public void onError(final Exception e) {
263+
ThriftClient.resolveException(e, client);
262264
PipeLogger.log(
263265
LOGGER::warn,
264266
e,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
2121

22+
import org.apache.iotdb.commons.client.ThriftClient;
2223
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
2324
import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
2425
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -61,6 +62,10 @@ public void onComplete(final TPipeTransferResp response) {
6162

6263
@Override
6364
public void onError(final Exception exception) {
65+
if (client != null) {
66+
ThriftClient.resolveException(exception, client);
67+
}
68+
6469
if (connector.isClosed()) {
6570
clearEventsReferenceCount();
6671
connector.eliminateHandler(this, true);

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.lang.reflect.InvocationTargetException;
3030
import java.net.ConnectException;
3131
import java.net.SocketException;
32+
import java.nio.channels.ClosedChannelException;
3233
import java.util.Optional;
3334

3435
/**
@@ -113,7 +114,8 @@ static boolean isConnectionBroken(Throwable cause) {
113114
|| (cause instanceof IOException
114115
&& (hasExpectedMessage(cause, "Connection reset by peer")
115116
|| hasExpectedMessage(cause, "Broken pipe")))
116-
|| (cause instanceof ConnectException && hasExpectedMessage(cause, "Connection refused"));
117+
|| (cause instanceof ConnectException && hasExpectedMessage(cause, "Connection refused")
118+
|| (cause instanceof ClosedChannelException));
117119
}
118120

119121
static boolean hasExpectedMessage(Throwable cause, String expectedMessage) {

0 commit comments

Comments
 (0)