Skip to content

Commit 6e4ae8c

Browse files
SteveYurongSuCaideyipi
authored andcommitted
Pipe: Fix and improve async tsfile transfer error handling and logging (avoid client connection leak) (#16008)
* Pipe: Fix and improve async tsfile transfer error handling and logging Refactored IoTDBDataRegionAsyncConnector to handle exceptions during asynchronous tsfile transfer more gracefully. Now logs warnings instead of errors, invokes onError on the handler, and provides more context in log messages. Added getTsFile() method to PipeTransferTsFileHandler for better logging, and ensured memoryBlock is set to null after closing to prevent potential resource leaks. * fix * fix * bald-logger --------- Co-authored-by: Caideyipi <[email protected]> (cherry picked from commit 8b78c3a)
1 parent 5cb31e8 commit 6e4ae8c

File tree

3 files changed

+57
-23
lines changed

3 files changed

+57
-23
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -416,8 +416,7 @@ private boolean transferWithoutCheck(final TsFileInsertionEvent tsFileInsertionE
416416
}
417417
}
418418

419-
private void transfer(final PipeTransferTsFileHandler pipeTransferTsFileHandler)
420-
throws Exception {
419+
private void transfer(final PipeTransferTsFileHandler pipeTransferTsFileHandler) {
421420
transferTsFileCounter.incrementAndGet();
422421
CompletableFuture<Void> completableFuture =
423422
CompletableFuture.supplyAsync(
@@ -439,13 +438,20 @@ private void transfer(final PipeTransferTsFileHandler pipeTransferTsFileHandler)
439438
if (PipeConfig.getInstance().isTransferTsFileSync() || !isRealtimeFirst) {
440439
try {
441440
completableFuture.get();
442-
} catch (InterruptedException e) {
443-
Thread.currentThread().interrupt();
444-
LOGGER.error("Transfer tsfile event asynchronously was interrupted.", e);
445-
throw new PipeException("Transfer tsfile event asynchronously was interrupted.", e);
446-
} catch (Exception e) {
447-
LOGGER.error("Failed to transfer tsfile event asynchronously.", e);
448-
throw e;
441+
} catch (final Exception e) {
442+
if (e instanceof InterruptedException) {
443+
Thread.currentThread().interrupt();
444+
LOGGER.warn(
445+
"Transfer tsfile event {} asynchronously was interrupted.",
446+
pipeTransferTsFileHandler.getTsFile(),
447+
e);
448+
}
449+
450+
pipeTransferTsFileHandler.onError(e);
451+
LOGGER.warn(
452+
"Failed to transfer tsfile event {} asynchronously.",
453+
pipeTransferTsFileHandler.getTsFile(),
454+
e);
449455
}
450456
}
451457
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,17 @@
2626

2727
import org.apache.thrift.TException;
2828
import org.apache.thrift.async.AsyncMethodCallback;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.util.Objects;
2933

3034
public abstract class PipeTransferTrackableHandler
3135
implements AsyncMethodCallback<TPipeTransferResp>, AutoCloseable {
36+
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
3237

3338
protected final IoTDBDataRegionAsyncSink connector;
39+
protected volatile AsyncPipeDataTransferServiceClient client;
3440

3541
public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink connector) {
3642
this.connector = connector;
@@ -77,13 +83,21 @@ public void onError(final Exception exception) {
7783
protected boolean tryTransfer(
7884
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req)
7985
throws TException {
86+
if (Objects.isNull(this.client)) {
87+
this.client = client;
88+
}
8089
// track handler before checking if connector is closed
8190
connector.trackHandler(this);
8291
if (connector.isClosed()) {
8392
clearEventsReferenceCount();
8493
connector.eliminateHandler(this);
8594
client.setShouldReturnSelf(true);
86-
client.returnSelf();
95+
try {
96+
client.returnSelf();
97+
} catch (final IllegalStateException e) {
98+
LOGGER.info(
99+
"Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore.");
100+
}
87101
return false;
88102
}
89103
doTransfer(client, req);
@@ -106,6 +120,18 @@ protected abstract void doTransfer(
106120

107121
@Override
108122
public void close() {
109-
// do nothing
123+
if (Objects.isNull(client)) {
124+
return;
125+
}
126+
try {
127+
client.close();
128+
client.invalidateAll();
129+
} catch (final Exception e) {
130+
LOGGER.warn(
131+
"Failed to close or invalidate client when connector is closed. Client: {}, Exception: {}",
132+
client,
133+
e.getMessage(),
134+
e);
135+
}
110136
}
111137
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler {
8787
private final AtomicBoolean isSealSignalSent;
8888

8989
private IoTDBDataNodeAsyncClientManager clientManager;
90-
private volatile AsyncPipeDataTransferServiceClient client;
9190

9291
public PipeTransferTsFileHandler(
9392
final IoTDBDataRegionAsyncSink connector,
@@ -131,6 +130,10 @@ public PipeTransferTsFileHandler(
131130
isSealSignalSent = new AtomicBoolean(false);
132131
}
133132

133+
public File getTsFile() {
134+
return tsFile;
135+
}
136+
134137
public void transfer(
135138
final IoTDBDataNodeAsyncClientManager clientManager,
136139
final AsyncPipeDataTransferServiceClient client)
@@ -415,19 +418,16 @@ private void returnClientIfNecessary() {
415418
}
416419

417420
if (connector.isClosed()) {
418-
try {
419-
client.close();
420-
client.invalidateAll();
421-
} catch (final Exception e) {
422-
LOGGER.warn(
423-
"Failed to close or invalidate client when connector is closed. Client: {}, Exception: {}",
424-
client,
425-
e.getMessage(),
426-
e);
427-
}
421+
close();
428422
}
423+
429424
client.setShouldReturnSelf(true);
430-
client.returnSelf();
425+
try {
426+
client.returnSelf();
427+
} catch (final IllegalStateException e) {
428+
LOGGER.info(
429+
"Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore.");
430+
}
431431
client = null;
432432
}
433433

@@ -454,8 +454,10 @@ public void clearEventsReferenceCount() {
454454
@Override
455455
public void close() {
456456
super.close();
457+
457458
if (memoryBlock != null) {
458459
memoryBlock.close();
460+
memoryBlock = null;
459461
}
460462
}
461463

0 commit comments

Comments
 (0)